Documentation
¶
Index ¶
- Constants
- func NewJob(name string, steps ...Step) *jobBuilder
- func NewStep(name string, handler ...interface{}) *stepBuilder
- func Register(job Job) error
- func Restart(ctx context.Context, jobId interface{}) (int64, error)
- func RestartAsync(ctx context.Context, jobId interface{}) (int64, error)
- func SetDB(sqlDb *sql.DB)
- func SetLogger(l logs.Logger)
- func SetMaxRunningJobs(size int)
- func SetMaxRunningSteps(size int)
- func SetTransactionManager(txMgr TransactionManager)
- func Start(ctx context.Context, jobName string, params string) (int64, error)
- func StartAsync(ctx context.Context, jobName string, params string) (int64, error)
- func Stop(ctx context.Context, jobId interface{}) error
- func Unregister(job Job)
- type Aggregator
- type BatchContext
- func (ctx *BatchContext) DeepCopy() *BatchContext
- func (ctx *BatchContext) Exists(key string) bool
- func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}
- func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)
- func (ctx *BatchContext) GetInt(key string, def ...int) (int, error)
- func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)
- func (ctx *BatchContext) GetString(key string, def ...string) (string, error)
- func (ctx *BatchContext) MarshalJSON() ([]byte, error)
- func (ctx *BatchContext) Merge(other *BatchContext)
- func (ctx *BatchContext) Put(key string, value interface{})
- func (ctx *BatchContext) Remove(key string)
- func (ctx *BatchContext) UnmarshalJSON(b []byte) error
- type BatchError
- type ChunkContext
- type ChunkListener
- type DefaultTxManager
- type FilePath
- type Future
- type Handler
- type ItemReader
- type Job
- type JobExecution
- type JobListener
- type OpenCloser
- type PartitionListener
- type Partitioner
- type PartitionerFactory
- type Processor
- type Reader
- type Step
- type StepExecution
- type StepListener
- type Task
- type TransactionManager
- type Writer
Constants ¶
const ( //ErrCodeRetry an error indicating the caller should retry ErrCodeRetry = "retry" //ErrCodeStop an error indicating the job is to be stopped ErrCodeStop = "stop" //ErrCodeConcurrency an error indicating conflict modification ErrCodeConcurrency = "concurrency" //ErrCodeDbFail an error indicating database access failed ErrCodeDbFail = "db_fail" //ErrCodeGeneral general error ErrCodeGeneral = "general" )
const ( DefaultJobPoolSize = 10 DefaultStepTaskPoolSize = 1000 )
task pool
const ( //ItemReaderKeyList the key of keyList in StepContext ItemReaderKeyList = "gobatch.ItemReader.key.list" //ItemReaderCurrentIndex the key of current offset of step's keyList in StepContext ItemReaderCurrentIndex = "gobatch.ItemReader.current.index" //ItemReaderMaxIndex the key of max index of step's keyList in StepContext ItemReaderMaxIndex = "gobatch.ItemReader.max.index" )
const ( //DefaultChunkSize default number of record per chunk to read DefaultChunkSize = 10 //DefaultPartitions default number of partitions to construct a step DefaultPartitions = 1 //DefaultMinPartitionSize default min number of record to process in a sub step of a partitionStep DefaultMinPartitionSize = 1 //DefaultMaxPartitionSize default max number of record to process in a sub step of a partitionStep DefaultMaxPartitionSize = 2147483647 )
Variables ¶
This section is empty.
Functions ¶
func NewStep ¶
func NewStep(name string, handler ...interface{}) *stepBuilder
NewStep initialize a step builder
func RestartAsync ¶
RestartAsync restart job by job name or job execution id asynchronously
func SetMaxRunningJobs ¶
func SetMaxRunningJobs(size int)
SetMaxRunningJobs set max number of parallel jobs for GoBatch
func SetMaxRunningSteps ¶
func SetMaxRunningSteps(size int)
SetMaxRunningSteps set max number of parallel steps for GoBatch
func SetTransactionManager ¶
func SetTransactionManager(txMgr TransactionManager)
SetTransactionManager register a TransactionManager instance for GoBatch
func StartAsync ¶
StartAsync start job by job name and params asynchronously
Types ¶
type Aggregator ¶
type Aggregator interface {
//Aggregate aggregate result from all sub step executions
Aggregate(execution *StepExecution, subExecutions []*StepExecution) BatchError
}
Aggregator merge results of sub step executions of a chunk step
type BatchContext ¶
type BatchContext struct {
// contains filtered or unexported fields
}
BatchContext contains properties during a job or step execution
func (*BatchContext) DeepCopy ¶
func (ctx *BatchContext) DeepCopy() *BatchContext
func (*BatchContext) Exists ¶
func (ctx *BatchContext) Exists(key string) bool
func (*BatchContext) Get ¶
func (ctx *BatchContext) Get(key string, def ...interface{}) interface{}
func (*BatchContext) GetBool ¶
func (ctx *BatchContext) GetBool(key string, def ...bool) (bool, error)
func (*BatchContext) GetInt64 ¶
func (ctx *BatchContext) GetInt64(key string, def ...int64) (int64, error)
func (*BatchContext) GetString ¶
func (ctx *BatchContext) GetString(key string, def ...string) (string, error)
func (*BatchContext) MarshalJSON ¶
func (ctx *BatchContext) MarshalJSON() ([]byte, error)
func (*BatchContext) Merge ¶
func (ctx *BatchContext) Merge(other *BatchContext)
func (*BatchContext) Put ¶
func (ctx *BatchContext) Put(key string, value interface{})
func (*BatchContext) Remove ¶
func (ctx *BatchContext) Remove(key string)
func (*BatchContext) UnmarshalJSON ¶
func (ctx *BatchContext) UnmarshalJSON(b []byte) error
type BatchError ¶
type BatchError interface {
//Code code of the error
Code() string
//Message readable message of the error
Message() string
//Error error interface
Error() string
//StackTrace goroutine stack trace
StackTrace() string
}
BatchError represent an error during GoBatch executing
func NewBatchError ¶
func NewBatchError(code string, msg string, args ...interface{}) BatchError
NewBatchError new instance
type ChunkContext ¶
type ChunkContext struct {
StepExecution *StepExecution
Tx interface{}
End bool
}
type ChunkListener ¶
type ChunkListener interface {
//BeforeChunk execute before start of a chunk in a chunkStep
BeforeChunk(context *ChunkContext) BatchError
//AfterChunk execute after end of a chunk in a chunkStep
AfterChunk(context *ChunkContext) BatchError
//OnError execute when an error occured during a chunk in a chunkStep
OnError(context *ChunkContext, err BatchError)
}
ChunkListener job listener
type DefaultTxManager ¶
type DefaultTxManager struct {
// contains filtered or unexported fields
}
DefaultTxManager default TransactionManager implementation
func (*DefaultTxManager) BeginTx ¶
func (tm *DefaultTxManager) BeginTx() (interface{}, BatchError)
BeginTx begin a transaction
func (*DefaultTxManager) Commit ¶
func (tm *DefaultTxManager) Commit(tx interface{}) BatchError
Commit commit a transaction
func (*DefaultTxManager) Rollback ¶
func (tm *DefaultTxManager) Rollback(tx interface{}) BatchError
Rollback rollback a transaction
type Handler ¶
type Handler interface {
//Handle implement handler logic here
Handle(execution *StepExecution) BatchError
}
Handler is a interface for doing work in a simple step
type ItemReader ¶
type ItemReader interface {
//ReadKeys read all keys of some kind of data
ReadKeys() ([]interface{}, error)
//ReadItem read value by one key from ReadKeys result
ReadItem(key interface{}) (interface{}, error)
}
ItemReader is for loading large amount of data from a datasource like database, used in a chunk step. When the step executing, it first loads all data keys by calling ReadKeys() once, then load full data by key one by one in every chunk.
type Job ¶
type Job interface {
Name() string
Start(ctx context.Context, execution *JobExecution) BatchError
Stop(ctx context.Context, execution *JobExecution) BatchError
GetSteps() []Step
}
Job job interface used by GoBatch
type JobExecution ¶
type JobExecution struct {
JobExecutionId int64
JobInstanceId int64
JobName string
JobParams map[string]interface{}
JobStatus status.BatchStatus
StepExecutions []*StepExecution
JobContext *BatchContext
CreateTime time.Time
StartTime time.Time
EndTime time.Time
FailError BatchError
Version int64
}
JobExecution represents context of a job execution
func (*JobExecution) AddStepExecution ¶
func (e *JobExecution) AddStepExecution(execution *StepExecution)
AddStepExecution add a step execution in this job
type JobListener ¶
type JobListener interface {
//BeforeJob execute before job start
BeforeJob(execution *JobExecution) BatchError
//AfterJob execute after job end either normally or abnormally
AfterJob(execution *JobExecution) BatchError
}
JobListener job listener
type OpenCloser ¶
type OpenCloser interface {
//Open do initialization for Reader or Writer
Open(execution *StepExecution) BatchError
//Close do cleanups for Reader or Writer
Close(execution *StepExecution) BatchError
}
OpenCloser is used doing initialization and cleanups for Reader or Writer
type PartitionListener ¶
type PartitionListener interface {
//BeforePartition execute before enter into Partitioner.Partition() in a partitionStep
BeforePartition(execution *StepExecution) BatchError
//AfterPartition execute after return from Partitioner.Partition() in a partitionStep
AfterPartition(execution *StepExecution, subExecutions []*StepExecution) BatchError
//OnError execute when an error return from Partitioner.Partition() in a partitionStep
OnError(execution *StepExecution, err BatchError)
}
PartitionListener job listener
type Partitioner ¶
type Partitioner interface {
//Partition generate sub step executions from specified step execution and partitions count
Partition(execution *StepExecution, partitions uint) ([]*StepExecution, BatchError)
//GetPartitionNames generate sub step names from specified step execution and partitions count
GetPartitionNames(execution *StepExecution, partitions uint) []string
}
Partitioner split an execution of step into multiple sub executions.
type PartitionerFactory ¶
type PartitionerFactory interface {
GetPartitioner(minPartitionSize, maxPartitionSize uint) Partitioner
}
PartitionerFactory can create Partitioners, it is used by Reader usually.
type Processor ¶
type Processor interface {
//Process process an item from reader and return a result item
Process(item interface{}, chunkCtx *ChunkContext) (interface{}, BatchError)
}
Processor is for processing data in a chunk step
type Reader ¶
type Reader interface {
//Read each call of Read() will return a data item, if there is no more data, a nil item will be returned.
//If there is an error, nil item and a BatchError will return
Read(chunkCtx *ChunkContext) (interface{}, BatchError)
}
Reader is for loading data in a chunk step
type Step ¶
type Step interface {
Name() string
Exec(ctx context.Context, execution *StepExecution) BatchError
// contains filtered or unexported methods
}
Step step interface
type StepExecution ¶
type StepExecution struct {
StepExecutionId int64
StepName string
StepStatus status.BatchStatus
StepContext *BatchContext
StepContextId int64
StepExecutionContext *BatchContext
JobExecution *JobExecution
CreateTime time.Time
StartTime time.Time
EndTime time.Time
ReadCount int64
WriteCount int64
CommitCount int64
FilterCount int64
ReadSkipCount int64
WriteSkipCount int64
ProcessSkipCount int64
RollbackCount int64
FailError BatchError
LastUpdated time.Time
Version int64
}
StepExecution represents context of a step execution
type StepListener ¶
type StepListener interface {
//BeforeStep execute before step start
BeforeStep(execution *StepExecution) BatchError
//AfterStep execute after step end either normally or abnormally
AfterStep(execution *StepExecution) BatchError
}
StepListener job listener
type Task ¶
type Task func(execution *StepExecution) BatchError
Task is a function type for doing work in a simple step
type TransactionManager ¶
type TransactionManager interface {
BeginTx() (tx interface{}, err BatchError)
Commit(tx interface{}) BatchError
Rollback(tx interface{}) BatchError
}
TransactionManager used by chunk step to execute chunk process in a transaction.
func NewTransactionManager ¶
func NewTransactionManager(db *sql.DB) TransactionManager
NewTransactionManager create a TransactionManager instance
type Writer ¶
type Writer interface {
//Write write items generated by processor in a chunk
Write(items []interface{}, chunkCtx *ChunkContext) BatchError
}
Writer is for writing data in a chunk step

