greenplumn CJob 源码
greenplumn CJob 代码
文件路径:/src/backend/gporca/libgpopt/include/gpopt/search/CJob.h
//---------------------------------------------------------------------------
// Greenplum Database
// Copyright (C) 2011 EMC Corp.
//
// @filename:
// CJob.h
//
// @doc:
// Interface class for optimization job abstraction
//---------------------------------------------------------------------------
#ifndef GPOPT_CJob_H
#define GPOPT_CJob_H
#include "gpos/base.h"
#include "gpos/common/CList.h"
#include "gpos/common/DbgPrintMixin.h"
#include "gpos/task/ITask.h"
namespace gpopt
{
using namespace gpos;
// prototypes
class CJobQueue;
class CScheduler;
class CSchedulerContext;
//---------------------------------------------------------------------------
// @class:
// CJob
//
// @doc:
// Superclass of all optimization jobs
//
// The creation of different types of jobs happens inside CJobFactory
// class such that each job is given a unique id.
//
// Job Dependencies:
// Each job has one parent given by the member variable CJob::m_pjParent.
// Thus, the dependency graph that defines dependencies is effectively a
// tree. The root optimization is scheduled in CEngine::ScheduleMainJob()
//
// A job can have any number of dependent (child) jobs. Execution within a
// job cannot proceed as long as there is one or more dependent jobs that
// are not finished yet. Pausing a child job does not also allow the
// parent job to proceed. The number of job dependencies (children) is
// maintained by the member variable CJob::m_ulpRefs. The increment and
// decrement of number of children are atomic operations performed by
// CJob::IncRefs() and CJob::UlpDecrRefs() functions, respectively.
//
// Job Queue:
// Each job maintains a job queue CJob::m_pjq of other identical jobs that
// are created while a given job is executing. For example, when exploring
// a group, a group exploration job J1 would be executing. Concurrently,
// another group exploration job J2 (for the same group) may be triggered
// by another worker. The job J2 would be added in a pending state to the
// job queue of J1. When J1 terminates, all jobs in its queue are notified
// to pick up J1 results.
//
// Job reentrance:
// All optimization jobs are designed to be reentrant. This means that
// there is a mechanism to support pausing a given job J1, moving
// execution to another job J2, and then possibly returning to J1 to
// resume execution. The re-entry point to J1 must be the same as the
// point where J1 was paused. This mechanism is implemented using a state
// machine.
//
// Job Execution:
// Each job defines two enumerations: EState to define the different
// states during job execution and EEvent to define the different events
// that cause moving from one state to another. These two enumerations are
// used to define job state machine m_jsm, which is an object of
// CJobStateMachine class. Note that the states, events and state machines
// are job-specific. This is why each job class has its own definitions of
// the job states, events & state machine.
//
// See CJobStateMachine.h for more information about how jobs are executed
// using the state machine. Also see CScheduler.h for more information
// about how job are scheduled.
//
//---------------------------------------------------------------------------
class CJob : public DbgPrintMixin<CJob>
{
// friends
friend class CJobFactory;
friend class CJobQueue;
friend class CScheduler;
public:
// job type
enum EJobType
{
EjtTest = 0,
EjtGroupOptimization,
EjtGroupImplementation,
EjtGroupExploration,
EjtGroupExpressionOptimization,
EjtGroupExpressionImplementation,
EjtGroupExpressionExploration,
EjtTransformation,
EjtInvalid,
EjtSentinel = EjtInvalid
};
private:
#ifdef GPOS_DEBUG
// enum for job state
enum EJobState
{
EjsInit = 0,
EjsWaiting,
EjsRunning,
EjsSuspended,
EjsCompleted
};
#endif // GPOS_DEBUG
// parent job
CJob *m_pjParent{nullptr};
// assigned job queue
CJobQueue *m_pjq{nullptr};
// reference counter
ULONG_PTR m_ulpRefs{0};
// job id - set by job factory
ULONG m_id{0};
// job type
EJobType m_ejt;
// flag indicating if job is initialized
BOOL m_fInit{false};
#ifdef GPOS_DEBUG
// job state
EJobState m_ejs{EjsInit};
#endif // GPOS_DEBUG
//-------------------------------------------------------------------
// Interface for CJobFactory
//-------------------------------------------------------------------
// set type
void
SetJobType(EJobType ejt)
{
m_ejt = ejt;
}
//-------------------------------------------------------------------
// Interface for CScheduler
//-------------------------------------------------------------------
// parent accessor
CJob *
PjParent() const
{
return m_pjParent;
}
// set parent
void
SetParent(CJob *pj)
{
GPOS_ASSERT(this != pj);
m_pjParent = pj;
}
// increment reference counter
void
IncRefs()
{
m_ulpRefs++;
}
// decrement reference counter
ULONG_PTR
UlpDecrRefs()
{
GPOS_ASSERT(0 < m_ulpRefs && "Decrement counter from 0");
return m_ulpRefs--;
}
// notify parent of job completion;
// return true if parent is runnable;
BOOL FResumeParent() const;
#ifdef GPOS_DEBUG
// reference counter accessor
ULONG_PTR
UlpRefs() const
{
return m_ulpRefs;
}
// check if job type is valid
BOOL
FValidType() const
{
return (EjtTest <= m_ejt && EjtSentinel > m_ejt);
}
// get state
EJobState
Ejs() const
{
return m_ejs;
}
// set state
void
SetState(EJobState ejs)
{
m_ejs = ejs;
}
#endif // GPOS_DEBUG
protected:
// id accessor
ULONG
Id() const
{
return m_id;
}
// ctor
CJob() = default;
// dtor
virtual ~CJob()
{
GPOS_ASSERT_IMP(!ITask::Self()->HasPendingExceptions(), 0 == m_ulpRefs);
}
// reset job
virtual void Reset();
// check if job is initialized
BOOL
FInit() const
{
return m_fInit;
}
// mark job as initialized
void
SetInit()
{
GPOS_ASSERT(false == m_fInit);
m_fInit = true;
}
public:
CJob(const CJob &) = delete;
// actual job execution given a scheduling context
// returns true if job completes, false if it is suspended
virtual BOOL FExecute(CSchedulerContext *psc) = 0;
// type accessor
EJobType
Ejt() const
{
return m_ejt;
}
// job queue accessor
CJobQueue *
Pjq() const
{
return m_pjq;
}
// set job queue
void
SetJobQueue(CJobQueue *pjq)
{
GPOS_ASSERT(nullptr != pjq);
m_pjq = pjq;
}
// cleanup internal state
virtual void
Cleanup()
{
}
#ifdef GPOS_DEBUG
// print job description
virtual IOstream &OsPrint(IOstream &os) const;
// link for running job list
SLink m_linkRunning;
// link for suspended job list
SLink m_linkSuspended;
#endif // GPOS_DEBUG
// link for job queueing
SLink m_linkQueue;
}; // class CJob
} // namespace gpopt
#endif // !GPOPT_CJob_H
// EOF
相关信息
相关文章
greenplumn CGroupExpression 源码
greenplumn CJobGroupExploration 源码
greenplumn CJobGroupExpression 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦