Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultBuildSelect(columns []string, table *schema.Table, lastPk, batchSize uint64) squirrel.SelectBuilder
- func GetMd5HashesSql(schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (string, []interface{}, error)
- func Int64Value(value interface{}) (int64, bool)
- func MaskedDSN(c *mysql.Config) string
- func MaxPrimaryKeys(db *sql.DB, tables []*schema.Table, logger *logrus.Entry) (map[*schema.Table]uint64, []*schema.Table, error)
- func QuotedTableName(table *schema.Table) string
- func QuotedTableNameFromString(database, table string) string
- func Uint64Value(value interface{}) (uint64, bool)
- func WaitForThrottle(t Throttler)
- func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, ...) (err error)
- func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, ...) (err error)
- type AtomicBoolean
- type BatchWriter
- type BinlogDeleteEvent
- type BinlogInsertEvent
- type BinlogStreamer
- func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)
- func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error
- func (s *BinlogStreamer) FlushAndStop()
- func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position
- func (s *BinlogStreamer) Initialize() (err error)
- func (s *BinlogStreamer) IsAlmostCaughtUp() bool
- func (s *BinlogStreamer) Run()
- type BinlogUpdateEvent
- type BinlogWriter
- type ChecksumTableVerifier
- type Config
- type ControlServer
- func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) Initialize() (err error)
- func (this *ControlServer) Run(wg *sync.WaitGroup)
- func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) Shutdown() error
- type CopyFilter
- type CountMetric
- type Cursor
- type CursorConfig
- type DMLEvent
- func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DMLEvent, error)
- func NewBinlogDeleteEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- func NewBinlogInsertEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- func NewBinlogUpdateEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- type DMLEventBase
- type DataIterator
- type DataIteratorState
- func (this *DataIteratorState) CompletedTables() map[string]bool
- func (this *DataIteratorState) EstimatedPKProcessedPerSecond() float64
- func (this *DataIteratorState) LastSuccessfulPrimaryKeys() map[string]uint64
- func (this *DataIteratorState) MarkTableAsCompleted(table string)
- func (this *DataIteratorState) TargetPrimaryKeys() map[string]uint64
- func (this *DataIteratorState) UpdateLastSuccessfulPK(table string, pk uint64)
- func (this *DataIteratorState) UpdateTargetPK(table string, pk uint64)
- type DatabaseConfig
- type ErrorHandler
- type Ferry
- func (f *Ferry) FlushBinlogAndStopStreaming()
- func (f *Ferry) Initialize() (err error)
- func (f *Ferry) Run()
- func (f *Ferry) RunStandaloneDataCopy(tables []*schema.Table) error
- func (f *Ferry) Start() error
- func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()
- func (f *Ferry) WaitUntilRowCopyIsComplete()
- type GaugeMetric
- type IncompleteVerificationError
- type IterativeVerifier
- func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, ...) (map[uint64][]byte, error)
- func (v *IterativeVerifier) Initialize() error
- func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error)
- func (v *IterativeVerifier) SanityCheckParameters() error
- func (v *IterativeVerifier) StartInBackground() error
- func (v *IterativeVerifier) VerifyBeforeCutover() error
- func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)
- func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)
- func (v *IterativeVerifier) Wait()
- type LagThrottler
- type LagThrottlerConfig
- type MetricBase
- type MetricTag
- type Metrics
- func (m *Metrics) AddConsumer()
- func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64)
- func (m *Metrics) DoneConsumer()
- func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64)
- func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func())
- func (m *Metrics) StopAndFlush()
- func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64)
- type PKPositionLog
- type PanicErrorHandler
- type PauserThrottler
- type ReverifyBatch
- type ReverifyEntry
- type ReverifyStore
- type RowBatch
- type RowData
- type SqlDBWithFakeRollback
- type SqlPreparer
- type SqlPreparerAndRollbacker
- type Status
- type TLSConfig
- type TableFilter
- type TableIdentifier
- type TableSchemaCache
- type TableStatus
- type Throttler
- type ThrottlerBase
- type TimerMetric
- type VerificationResult
- type VerificationResultAndStatus
- type Verifier
- type WorkerPool
Constants ¶
const ( StateStarting = "starting" StateCopying = "copying" StateWaitingForCutover = "wait-for-cutover" StateCutover = "cutover" StateDone = "done" )
Variables ¶
var ( VersionString string = "?.?.?+??????????????+???????" WebUiBasedir string = "" )
Functions ¶
func DefaultBuildSelect ¶
func GetMd5HashesSql ¶
func Int64Value ¶
func MaxPrimaryKeys ¶
func QuotedTableName ¶
func Uint64Value ¶
func WaitForThrottle ¶
func WaitForThrottle(t Throttler)
func WithRetries ¶
Types ¶
type AtomicBoolean ¶
type AtomicBoolean int32
func (*AtomicBoolean) Get ¶
func (a *AtomicBoolean) Get() bool
func (*AtomicBoolean) Set ¶
func (a *AtomicBoolean) Set(b bool)
type BatchWriter ¶
type BatchWriter struct {
DB *sql.DB
DatabaseRewrites map[string]string
TableRewrites map[string]string
WriteRetries int
// contains filtered or unexported fields
}
func (*BatchWriter) Initialize ¶
func (w *BatchWriter) Initialize()
func (*BatchWriter) WriteRowBatch ¶
func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error
type BinlogDeleteEvent ¶
type BinlogDeleteEvent struct {
*DMLEventBase
// contains filtered or unexported fields
}
func (*BinlogDeleteEvent) AsSQLString ¶
func (e *BinlogDeleteEvent) AsSQLString(target *schema.Table) (string, error)
func (*BinlogDeleteEvent) NewValues ¶
func (e *BinlogDeleteEvent) NewValues() RowData
func (*BinlogDeleteEvent) OldValues ¶
func (e *BinlogDeleteEvent) OldValues() RowData
func (*BinlogDeleteEvent) PK ¶
func (e *BinlogDeleteEvent) PK() (uint64, error)
type BinlogInsertEvent ¶
type BinlogInsertEvent struct {
*DMLEventBase
// contains filtered or unexported fields
}
func (*BinlogInsertEvent) AsSQLString ¶
func (e *BinlogInsertEvent) AsSQLString(target *schema.Table) (string, error)
func (*BinlogInsertEvent) NewValues ¶
func (e *BinlogInsertEvent) NewValues() RowData
func (*BinlogInsertEvent) OldValues ¶
func (e *BinlogInsertEvent) OldValues() RowData
func (*BinlogInsertEvent) PK ¶
func (e *BinlogInsertEvent) PK() (uint64, error)
type BinlogStreamer ¶
type BinlogStreamer struct {
Db *sql.DB
Config *Config
ErrorHandler ErrorHandler
Filter CopyFilter
TableSchema TableSchemaCache
// contains filtered or unexported fields
}
func (*BinlogStreamer) AddEventListener ¶
func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)
func (*BinlogStreamer) ConnectBinlogStreamerToMysql ¶
func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error
func (*BinlogStreamer) FlushAndStop ¶
func (s *BinlogStreamer) FlushAndStop()
func (*BinlogStreamer) GetLastStreamedBinlogPosition ¶
func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position
func (*BinlogStreamer) Initialize ¶
func (s *BinlogStreamer) Initialize() (err error)
func (*BinlogStreamer) IsAlmostCaughtUp ¶
func (s *BinlogStreamer) IsAlmostCaughtUp() bool
func (*BinlogStreamer) Run ¶
func (s *BinlogStreamer) Run()
type BinlogUpdateEvent ¶
type BinlogUpdateEvent struct {
*DMLEventBase
// contains filtered or unexported fields
}
func (*BinlogUpdateEvent) AsSQLString ¶
func (e *BinlogUpdateEvent) AsSQLString(target *schema.Table) (string, error)
func (*BinlogUpdateEvent) NewValues ¶
func (e *BinlogUpdateEvent) NewValues() RowData
func (*BinlogUpdateEvent) OldValues ¶
func (e *BinlogUpdateEvent) OldValues() RowData
func (*BinlogUpdateEvent) PK ¶
func (e *BinlogUpdateEvent) PK() (uint64, error)
type BinlogWriter ¶
type BinlogWriter struct {
DB *sql.DB
DatabaseRewrites map[string]string
TableRewrites map[string]string
Throttler Throttler
BatchSize int
WriteRetries int
ErrorHandler ErrorHandler
// contains filtered or unexported fields
}
func (*BinlogWriter) BufferBinlogEvents ¶
func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error
func (*BinlogWriter) Initialize ¶
func (b *BinlogWriter) Initialize() error
func (*BinlogWriter) Run ¶
func (b *BinlogWriter) Run()
func (*BinlogWriter) Stop ¶
func (b *BinlogWriter) Stop()
type ChecksumTableVerifier ¶
type ChecksumTableVerifier struct {
Tables []*schema.Table
DatabaseRewrites map[string]string
TableRewrites map[string]string
SourceDB *sql.DB
TargetDB *sql.DB
// contains filtered or unexported fields
}
func (*ChecksumTableVerifier) Result ¶
func (v *ChecksumTableVerifier) Result() (VerificationResultAndStatus, error)
func (*ChecksumTableVerifier) StartInBackground ¶
func (v *ChecksumTableVerifier) StartInBackground() error
func (*ChecksumTableVerifier) Verify ¶
func (v *ChecksumTableVerifier) Verify() (VerificationResult, error)
func (*ChecksumTableVerifier) Wait ¶
func (v *ChecksumTableVerifier) Wait()
type Config ¶
type Config struct {
// Source database connection configuration
//
// Required
Source DatabaseConfig
// Target database connection configuration
//
// Required
Target DatabaseConfig
// Map database name on the source database (key of the map) to a
// different name on the target database (value of the associated key).
// This allows one to move data and change the database name in the
// process.
//
// Optional: defaults to empty map/no rewrites
DatabaseRewrites map[string]string
// Map the table name on the source dataabase to a different name on
// the target database. See DatabaseRewrite.
//
// Optional: defaults to empty map/no rewrites
TableRewrites map[string]string
// The maximum number of retries for writes if the writes failed on
// the target database.
//
// Optional: defaults to 5.
DBWriteRetries int
// Filter out the databases/tables when detecting the source databases
// and tables.
//
// Required
TableFilter TableFilter
// Filter out unwanted data/events from being copied.
//
// Optional: defaults to nil/no filter.
CopyFilter CopyFilter
// The server id used by Ghostferry to connect to MySQL as a replication
// slave. This id must be unique on the MySQL server. If 0 is specified,
// a random id will be generated upon connecting to the MySQL server.
//
// Optional: defaults to an automatically generated one
MyServerId uint32
// The maximum number of binlog events to write at once. Note this is a
// maximum: if there are not a lot of binlog events, they will be written
// one at a time such the binlog streamer lag is as low as possible. This
// batch size will only be hit if there is a log of binlog at the same time.
//
// Optional: defaults to 100
BinlogEventBatchSize int
// The batch size used to iterate the data during data copy. This batch size
// is always used: if this is specified to be 100, 100 rows will be copied
// per iteration.
//
// With the current implementation of Ghostferry, we need to lock the rows
// we select. This means, the larger this number is, the longer we need to
// hold this lock. On the flip side, the smaller this number is, the slower
// the copy will likely be.
//
// Optional: defaults to 200
DataIterationBatchSize uint64
// The maximum number of retries for reads if the reads fail on the source
// database.
//
// Optional: defaults to 5
DBReadRetries int
// This specify the number of concurrent goroutines, each iterating over
// a single table.
//
// At this point in time, parallelize iteration within a single table. This
// may be possible to add to the future.
//
// Optional: defaults to 4
DataIterationConcurrency int
// This specifies if Ghostferry will pause before cutover or not.
//
// Optional: defaults to false
AutomaticCutover bool
// Config for the ControlServer
ServerBindAddr string
WebBasedir string
}
func (*Config) ValidateConfig ¶
type ControlServer ¶
type ControlServer struct {
F *Ferry
Verifier Verifier
Addr string
Basedir string
// contains filtered or unexported fields
}
func (*ControlServer) HandleCutover ¶
func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleIndex ¶
func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandlePause ¶
func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleStop ¶
func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleUnpause ¶
func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleVerify ¶
func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)
func (*ControlServer) Initialize ¶
func (this *ControlServer) Initialize() (err error)
func (*ControlServer) Run ¶
func (this *ControlServer) Run(wg *sync.WaitGroup)
func (*ControlServer) ServeHTTP ¶
func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*ControlServer) Shutdown ¶
func (this *ControlServer) Shutdown() error
type CopyFilter ¶
type CopyFilter interface {
// BuildSelect is used to set up the query used for batch data copying,
// allowing for restricting copying to a subset of data. Returning an error
// here will cause the query to be retried, until the retry limit is
// reached, at which point the ferry will be aborted. BuildSelect is passed
// the columns to be selected, table being copied, the last primary key value
// from the previous batch, and the batch size. Call DefaultBuildSelect to
// generate the default query, which may be used as a starting point.
BuildSelect([]string, *schema.Table, uint64, uint64) (sq.SelectBuilder, error)
// ApplicableEvent is used to filter events for rows that have been
// filtered in ConstrainSelect. ApplicableEvent should return true if the
// event is for a row that would be selected by ConstrainSelect, and false
// otherwise.
// Returning an error here will cause the ferry to be aborted.
ApplicableEvent(DMLEvent) (bool, error)
}
CopyFilter provides an interface for restricting the copying to a subset of data. This typically involves adding a WHERE condition in the ConstrainSelect function, and returning false for unwanted rows in ApplicableEvent.
type CountMetric ¶
type CountMetric struct {
MetricBase
Value int64
}
type Cursor ¶
type Cursor struct {
CursorConfig
Table *schema.Table
MaxPrimaryKey uint64
RowLock bool
// contains filtered or unexported fields
}
type CursorConfig ¶
type CursorConfig struct {
DB *sql.DB
Throttler Throttler
ColumnsToSelect []string
BuildSelect func([]string, *schema.Table, uint64, uint64) (squirrel.SelectBuilder, error)
BatchSize uint64
ReadRetries int
}
func (*CursorConfig) NewCursor ¶
func (c *CursorConfig) NewCursor(table *schema.Table, maxPk uint64) *Cursor
returns a new Cursor with an embedded copy of itself
func (*CursorConfig) NewCursorWithoutRowLock ¶
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, maxPk uint64) *Cursor
returns a new Cursor with an embedded copy of itself
type DMLEvent ¶
type DMLEvent interface {
Database() string
Table() string
TableSchema() *schema.Table
AsSQLString(target *schema.Table) (string, error)
OldValues() RowData
NewValues() RowData
PK() (uint64, error)
}
func NewBinlogDMLEvents ¶
func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DMLEvent, error)
func NewBinlogDeleteEvents ¶
func NewBinlogInsertEvents ¶
func NewBinlogUpdateEvents ¶
type DMLEventBase ¶
type DMLEventBase struct {
// contains filtered or unexported fields
}
The base of DMLEvent to provide the necessary methods. This desires a copy of the struct in case we want to deal with schema changes in the future.
func (*DMLEventBase) Database ¶
func (e *DMLEventBase) Database() string
func (*DMLEventBase) Table ¶
func (e *DMLEventBase) Table() string
func (*DMLEventBase) TableSchema ¶
func (e *DMLEventBase) TableSchema() *schema.Table
type DataIterator ¶
type DataIterator struct {
DB *sql.DB
Tables []*schema.Table
Concurrency int
ErrorHandler ErrorHandler
CursorConfig *CursorConfig
CurrentState *DataIteratorState
// contains filtered or unexported fields
}
func (*DataIterator) AddBatchListener ¶
func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error)
func (*DataIterator) AddDoneListener ¶
func (d *DataIterator) AddDoneListener(listener func() error)
func (*DataIterator) Initialize ¶
func (d *DataIterator) Initialize() error
func (*DataIterator) Run ¶
func (d *DataIterator) Run()
type DataIteratorState ¶
type DataIteratorState struct {
// contains filtered or unexported fields
}
func (*DataIteratorState) CompletedTables ¶
func (this *DataIteratorState) CompletedTables() map[string]bool
func (*DataIteratorState) EstimatedPKProcessedPerSecond ¶
func (this *DataIteratorState) EstimatedPKProcessedPerSecond() float64
func (*DataIteratorState) LastSuccessfulPrimaryKeys ¶
func (this *DataIteratorState) LastSuccessfulPrimaryKeys() map[string]uint64
func (*DataIteratorState) MarkTableAsCompleted ¶
func (this *DataIteratorState) MarkTableAsCompleted(table string)
func (*DataIteratorState) TargetPrimaryKeys ¶
func (this *DataIteratorState) TargetPrimaryKeys() map[string]uint64
func (*DataIteratorState) UpdateLastSuccessfulPK ¶
func (this *DataIteratorState) UpdateLastSuccessfulPK(table string, pk uint64)
func (*DataIteratorState) UpdateTargetPK ¶
func (this *DataIteratorState) UpdateTargetPK(table string, pk uint64)
type DatabaseConfig ¶
type DatabaseConfig struct {
Host string
Port uint16
User string
Pass string
Collation string
Params map[string]string
TLS *TLSConfig
}
func (*DatabaseConfig) MySQLConfig ¶
func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error)
func (*DatabaseConfig) Validate ¶
func (c *DatabaseConfig) Validate() error
type ErrorHandler ¶
type Ferry ¶
type Ferry struct {
*Config
SourceDB *sql.DB
TargetDB *sql.DB
BinlogStreamer *BinlogStreamer
BinlogWriter *BinlogWriter
DataIterator *DataIterator
BatchWriter *BatchWriter
ErrorHandler ErrorHandler
Throttler Throttler
Tables TableSchemaCache
StartTime time.Time
DoneTime time.Time
OverallState string
// contains filtered or unexported fields
}
func (*Ferry) FlushBinlogAndStopStreaming ¶
func (f *Ferry) FlushBinlogAndStopStreaming()
After you stop writing to the source and made sure that all inflight transactions to the source are completed, call this method to ensure that the binlog streaming has caught up and stop the binlog streaming.
This method will actually not shutdown the BinlogStreamer immediately. You will know that the BinlogStreamer finished when .Run() returns.
func (*Ferry) Initialize ¶
Initialize all the components of Ghostferry and connect to the Database
func (*Ferry) Run ¶
func (f *Ferry) Run()
Spawns the background tasks that actually perform the run. Wait for the background tasks to finish.
func (*Ferry) RunStandaloneDataCopy ¶
func (*Ferry) Start ¶
Determine the binlog coordinates, table mapping for the pending Ghostferry run.
func (*Ferry) WaitUntilBinlogStreamerCatchesUp ¶
func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()
func (*Ferry) WaitUntilRowCopyIsComplete ¶
func (f *Ferry) WaitUntilRowCopyIsComplete()
Call this method and perform the cutover after this method returns.
type GaugeMetric ¶
type GaugeMetric struct {
MetricBase
Value float64
}
type IncompleteVerificationError ¶
type IncompleteVerificationError struct{}
func (IncompleteVerificationError) Error ¶
func (e IncompleteVerificationError) Error() string
type IterativeVerifier ¶
type IterativeVerifier struct {
CursorConfig *CursorConfig
BinlogStreamer *BinlogStreamer
TableSchemaCache TableSchemaCache
SourceDB *sql.DB
TargetDB *sql.DB
Tables []*schema.Table
IgnoredTables []string
DatabaseRewrites map[string]string
TableRewrites map[string]string
Concurrency int
// contains filtered or unexported fields
}
func (*IterativeVerifier) GetHashes ¶
func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error)
func (*IterativeVerifier) Initialize ¶
func (v *IterativeVerifier) Initialize() error
func (*IterativeVerifier) Result ¶
func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error)
func (*IterativeVerifier) SanityCheckParameters ¶
func (v *IterativeVerifier) SanityCheckParameters() error
func (*IterativeVerifier) StartInBackground ¶
func (v *IterativeVerifier) StartInBackground() error
func (*IterativeVerifier) VerifyBeforeCutover ¶
func (v *IterativeVerifier) VerifyBeforeCutover() error
func (*IterativeVerifier) VerifyDuringCutover ¶
func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)
func (*IterativeVerifier) VerifyOnce ¶
func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)
func (*IterativeVerifier) Wait ¶
func (v *IterativeVerifier) Wait()
type LagThrottler ¶
type LagThrottler struct {
ThrottlerBase
PauserThrottler
DB *sql.DB
// contains filtered or unexported fields
}
func NewLagThrottler ¶
func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error)
func (*LagThrottler) Throttled ¶
func (t *LagThrottler) Throttled() bool
type LagThrottlerConfig ¶
type LagThrottlerConfig struct {
Connection DatabaseConfig
MaxLag int
Query string
UpdateInterval string
}
type MetricBase ¶
type Metrics ¶
type Metrics struct {
Prefix string
DefaultTags []MetricTag
Sink chan interface{}
// contains filtered or unexported fields
}
func SetGlobalMetrics ¶
func (*Metrics) AddConsumer ¶
func (m *Metrics) AddConsumer()
func (*Metrics) DoneConsumer ¶
func (m *Metrics) DoneConsumer()
func (*Metrics) StopAndFlush ¶
func (m *Metrics) StopAndFlush()
type PKPositionLog ¶
type PanicErrorHandler ¶
type PanicErrorHandler struct {
Ferry *Ferry
// contains filtered or unexported fields
}
func (*PanicErrorHandler) Fatal ¶
func (this *PanicErrorHandler) Fatal(from string, err error)
type PauserThrottler ¶
type PauserThrottler struct {
ThrottlerBase
// contains filtered or unexported fields
}
func (*PauserThrottler) SetPaused ¶
func (t *PauserThrottler) SetPaused(paused bool)
func (*PauserThrottler) Throttled ¶
func (t *PauserThrottler) Throttled() bool
type ReverifyBatch ¶
type ReverifyBatch struct {
Pks []uint64
Table TableIdentifier
}
type ReverifyEntry ¶
type ReverifyStore ¶
type ReverifyStore struct {
MapStore map[TableIdentifier]map[uint64]struct{}
BatchStore []ReverifyBatch
RowCount uint64
EmitLogPerRowCount uint64
// contains filtered or unexported fields
}
func NewReverifyStore ¶
func NewReverifyStore() *ReverifyStore
func (*ReverifyStore) Add ¶
func (r *ReverifyStore) Add(entry ReverifyEntry)
func (ReverifyStore) FlushAndBatchByTable ¶
func (r ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch
type RowBatch ¶
type RowBatch struct {
// contains filtered or unexported fields
}
func (*RowBatch) AsSQLQuery ¶
func (*RowBatch) TableSchema ¶
func (*RowBatch) ValuesContainPk ¶
type SqlDBWithFakeRollback ¶
func (*SqlDBWithFakeRollback) Rollback ¶
func (d *SqlDBWithFakeRollback) Rollback() error
type SqlPreparer ¶
both `sql.Tx` and `sql.DB` allow a SQL query to be `Prepare`d
type SqlPreparerAndRollbacker ¶
type SqlPreparerAndRollbacker interface {
SqlPreparer
Rollback() error
}
sql.DB does not implement Rollback, but can use SqlDBWithFakeRollback to perform a noop.
type Status ¶
type Status struct {
GhostferryVersion string
SourceHostPort string
TargetHostPort string
OverallState string
StartTime time.Time
CurrentTime time.Time
TimeTaken time.Duration
ETA time.Duration
BinlogStreamerLag time.Duration
PKsPerSecond uint64
AutomaticCutover bool
BinlogStreamerStopRequested bool
LastSuccessfulBinlogPos mysql.Position
TargetBinlogPos mysql.Position
Throttled bool
CompletedTableCount int
TotalTableCount int
TableStatuses []*TableStatus
AllTableNames []string
AllDatabaseNames []string
VerifierSupport bool
VerifierAvailable bool
VerificationStarted bool
VerificationDone bool
VerificationResult VerificationResult
VerificationErr error
}
func FetchStatus ¶
type TLSConfig ¶
type TableFilter ¶
type TableIdentifier ¶
A comparable and lightweight type that stores the schema and table name.
func NewTableIdentifierFromSchemaTable ¶
func NewTableIdentifierFromSchemaTable(table *schema.Table) TableIdentifier
type TableSchemaCache ¶
func LoadTables ¶
func LoadTables(db *sql.DB, tableFilter TableFilter) (TableSchemaCache, error)
func (TableSchemaCache) AllTableNames ¶
func (c TableSchemaCache) AllTableNames() (tableNames []string)
func (TableSchemaCache) AsSlice ¶
func (c TableSchemaCache) AsSlice() (tables []*schema.Table)
type TableStatus ¶
type ThrottlerBase ¶
type ThrottlerBase struct {
// contains filtered or unexported fields
}
func (*ThrottlerBase) Disabled ¶
func (t *ThrottlerBase) Disabled() bool
func (*ThrottlerBase) SetDisabled ¶
func (t *ThrottlerBase) SetDisabled(disabled bool)
type TimerMetric ¶
type TimerMetric struct {
MetricBase
Value time.Duration
}
type VerificationResult ¶
func (VerificationResult) Error ¶
func (e VerificationResult) Error() string
type VerificationResultAndStatus ¶
type VerificationResultAndStatus struct {
VerificationResult
StartTime time.Time
DoneTime time.Time
}
func (VerificationResultAndStatus) IsDone ¶
func (r VerificationResultAndStatus) IsDone() bool
func (VerificationResultAndStatus) IsStarted ¶
func (r VerificationResultAndStatus) IsStarted() bool
type Verifier ¶
type Verifier interface {
// Start the verifier in the background during the cutover phase.
// Traditionally, this is called from within the ControlServer.
//
// This method maybe called multiple times and it's up to the verifier
// to decide if it is possible to re-run the verification.
StartInBackground() error
// Wait for the verifier until it finishes verification after it was
// started with the StartInBackground.
//
// A verification is "done" when it verified the dbs (either
// correct or incorrect) OR when it experiences an error.
Wait()
// Returns the result and the status of the verification.
// To check the status, call IsStarted() and IsDone() on
// VerificationResultAndStatus.
//
// If the verification has been completed successfully (without errors) and
// the data checks out to be "correct", the result will be
// VerificationResult{true, ""}, with error = nil.
// Otherwise, the result will be VerificationResult{false, "message"}, with
// error = nil.
//
// If the verification is "done" but experienced an error during the check,
// the result will be VerificationResult{}, with err = yourErr.
Result() (VerificationResultAndStatus, error)
}
The sole purpose of this interface is to make it easier for one to implement their own strategy for verification and hook it up with the ControlServer. If there is no such need, one does not need to implement this interface.
type WorkerPool ¶
func (*WorkerPool) Run ¶
func (p *WorkerPool) Run(n int) ([]interface{}, error)
Returns a list of results of the size same as the concurrency number. Returns the first error that occurs during the run. Also as soon as a single worker errors, all workers terminates.