Documentation
¶
Index ¶
- Variables
- func GetNodeAPIAddr(addr string, retries int, timeout time.Duration) (string, error)
- func IsNewNode(raftDir string) bool
- func IsStaleRead(leaderLastContact time.Time, lastFSMUpdateTime time.Time, ...) bool
- func ResetStats()
- type ClusterState
- type Config
- type FSM
- type FSMSnapshot
- type Layer
- type NodeTransport
- func (n *NodeTransport) Close() error
- func (n *NodeTransport) CommandCommitIndex() uint64
- func (n *NodeTransport) Consumer() <-chan raft.RPC
- func (n *NodeTransport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, ...) error
- func (n *NodeTransport) LeaderCommitIndex() uint64
- func (n *NodeTransport) Stats() map[string]interface{}
- type PragmaCheckRequest
- type Server
- type Servers
- type SnapshotStore
- type Store
- func (s *Store) Addr() string
- func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error)
- func (s *Store) Bootstrap(servers ...*Server) error
- func (s *Store) Close(wait bool) (retErr error)
- func (s *Store) CommitIndex() (uint64, error)
- func (s *Store) Committed(timeout time.Duration) (uint64, error)
- func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)
- func (s *Store) GetFromDatabase(key string) (string, error)
- func (s *Store) HasLeader() bool
- func (s *Store) HasLeaderID() bool
- func (s *Store) ID() string
- func (s *Store) IsLeader() bool
- func (s *Store) IsVoter() (bool, error)
- func (s *Store) Join(jr *commandProto.JoinRequest) error
- func (s *Store) LeaderAddr() (string, error)
- func (s *Store) LeaderCommitIndex() (uint64, error)
- func (s *Store) LeaderID() (string, error)
- func (s *Store) LeaderWithID() (string, string)
- func (s *Store) Load(lr *proto.LoadRequest) error
- func (s *Store) Nodes() ([]*Server, error)
- func (s *Store) Notify(nr *commandProto.NotifyRequest) error
- func (s *Store) Open() (retError error)
- func (s *Store) Path() string
- func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error)
- func (s *Store) Ready() bool
- func (s *Store) RegisterReadyChannel(ch <-chan struct{})
- func (s *Store) Remove(rn *commandProto.RemoveNodeRequest) error
- func (s *Store) Request(eqr *commandProto.ExecuteQueryRequest) ([]*commandProto.ExecuteQueryResponse, error)
- func (s *Store) Snapshot(n uint64) (retError error)
- func (s *Store) State() ClusterState
- func (s *Store) Stats() (map[string]interface{}, error)
- func (s *Store) Stepdown(wait bool) error
- func (s *Store) StoreInDatabase(key, value string) error
- func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error
- func (s *Store) WaitForLeader(timeout time.Duration) (string, error)
- func (s *Store) WaitForRemoval(id string, timeout time.Duration) error
- type StoreNew
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ( // ErrStoreNotOpen is returned when a Store is not open. ErrStoreNotOpen = errors.New("store not open") // ErrStoreOpen is returned when a Store is already open. ErrStoreOpen = errors.New("store already open") // ErrNotReady is returned when a Store is not ready to accept requests. ErrNotReady = errors.New("store not ready") // ErrNotLeader is returned when a node attempts to execute a leader-only // operation. ErrNotLeader = errors.New("not leader") // ErrNotSingleNode is returned when a node attempts to execute a single-node // only operation. ErrNotSingleNode = errors.New("not single-node") // ErrStaleRead is returned if the executing the query would violate the // requested freshness. ErrStaleRead = errors.New("stale read") // ErrOpenTimeout is returned when the Store does not apply its initial // logs within the specified time. ErrOpenTimeout = errors.New("timeout waiting for initial logs application") // ErrWaitForRemovalTimeout is returned when the Store does not confirm removal // of a node within the specified time. ErrWaitForRemovalTimeout = errors.New("timeout waiting for node removal confirmation") // ErrWaitForLeaderTimeout is returned when the Store cannot determine the leader // within the specified time. ErrWaitForLeaderTimeout = errors.New("timeout waiting for leader") // ErrInvalidBackupFormat is returned when the requested backup format // is not valid. ErrInvalidBackupFormat = errors.New("invalid backup format") // ErrInvalidVacuumFormat is returned when the requested backup format is not // compatible with vacuum. ErrInvalidVacuum = errors.New("invalid vacuum") // ErrLoadInProgress is returned when a load is already in progress and the // requested operation cannot be performed. ErrLoadInProgress = errors.New("load in progress") // ErrNotImplemented when there is no implementation of the function // will only exits until this application in under development ErrNotImplemented = errors.New("not implemented") // ErrDatabaseNotOpen when the database is closed ErrDatabaseNotOpen = errors.New("database is not open") )
Functions ¶
func GetNodeAPIAddr ¶
func IsStaleRead ¶
func IsStaleRead( leaderLastContact time.Time, lastFSMUpdateTime time.Time, lastAppendedAtTime time.Time, fsmIndex uint64, commitIndex uint64, freshness int64, strict bool, ) bool
IsStaleRead returns whether a read is stale.
func ResetStats ¶
func ResetStats()
ResetStats resets the expvar stats for this module. Mostly for test purposes.
Types ¶
type ClusterState ¶
type ClusterState int
ClusterState defines the possible Raft states the current node can be in
const ( Leader ClusterState = iota Follower Candidate Shutdown Unknown )
Represents the Raft cluster states
type FSM ¶
type FSM struct {
// contains filtered or unexported fields
}
FSM is a wrapper around the Store which implements raft.FSM.
type FSMSnapshot ¶
type FSMSnapshot struct {
Finalizer func() error
OnFailure func()
raft.FSMSnapshot
// contains filtered or unexported fields
}
FSMSnapshot is a wrapper around raft.FSMSnapshot which adds an optional Finalizer, instrumentation, and logging.
func (*FSMSnapshot) Persist ¶
func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) (retError error)
Persist writes the snapshot to the given sink.
func (*FSMSnapshot) Release ¶
func (f *FSMSnapshot) Release()
Release performs any final cleanup once the Snapshot has been persisted.
type Layer ¶
Layer is the interface expected by the Store for network communication between nodes, which is used for Raft distributed consensus.
type NodeTransport ¶
type NodeTransport struct {
*raft.NetworkTransport
// contains filtered or unexported fields
}
NodeTransport is a wrapper around the Raft NetworkTransport, which allows custom configuration of the InstallSnapshot method.
func NewNodeTransport ¶
func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport
NewNodeTransport returns an initialized NodeTransport.
func (*NodeTransport) CommandCommitIndex ¶
func (n *NodeTransport) CommandCommitIndex() uint64
CommandCommitIndex returns the index of the latest committed log entry which is applied to the FSM.
func (*NodeTransport) Consumer ¶
func (n *NodeTransport) Consumer() <-chan raft.RPC
Consumer returns a channel of RPC requests to be consumed.
func (*NodeTransport) InstallSnapshot ¶
func (n *NodeTransport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error
InstallSnapshot is used to push a snapshot down to a follower. The data is read from the ReadCloser and streamed to the client.
func (*NodeTransport) LeaderCommitIndex ¶
func (n *NodeTransport) LeaderCommitIndex() uint64
LeaderCommitIndex returns the index of the latest committed log entry which is known to be replicated to the majority of the cluster.
func (*NodeTransport) Stats ¶
func (n *NodeTransport) Stats() map[string]interface{}
Stats returns the current stats of the transport.
type PragmaCheckRequest ¶
type Server ¶
type Server struct {
ID string `json:"id,omitempty"`
Addr string `json:"addr,omitempty"`
Suffrage string `json:"suffrage,omitempty"`
}
Server represents another node in the cluster.
type Servers ¶
type Servers []*Server
Servers is a set of Servers.
func (Servers) Contains ¶
Contains returns whether the given node, as specified by its Raft ID, is a member of the set of servers.
func (Servers) IsReadOnly ¶
IsReadOnly returns whether the given node, as specified by its Raft ID, is a read-only (non-voting) node. If no node is found with the given ID then found will be false.
type SnapshotStore ¶
type SnapshotStore interface {
raft.SnapshotStore
// FullNeeded returns true if a full snapshot is needed.
FullNeeded() (bool, error)
// SetFullNeeded explicitly sets that a full snapshot is needed.
SetFullNeeded() error
// Stats returns stats about the Snapshot Store.
Stats() (map[string]interface{}, error)
}
SnapshotStore is the interface Snapshot stores must implement.
type Store ¶
type Store struct {
ShutdownOnRemove bool
SnapshotThreshold uint64
SnapshotInterval time.Duration
LeaderLeaseTimeout time.Duration
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
ApplyTimeout time.Duration
RaftLogLevel string
NoFreeListSync bool
AutoVacInterval time.Duration
AutoOptimizeInterval time.Duration
BootstrapExpect int
// Node-reaping configuration
ReapTimeout time.Duration
ReapReadOnlyTimeout time.Duration
// contains filtered or unexported fields
}
Wire Store is a BBolt/badgerDB database, where all changes are made via Raft consensus.
func (*Store) Backup ¶
Backup writes a consistent snapshot of the underlying database to dst. This can be called while writes are being made to the system. The backup may fail if the system is actively snapshotting. The client can just retry in this case.
func (*Store) Bootstrap ¶
Bootstrap executes a cluster bootstrap on this node, using the given Servers as the configuration.
func (*Store) Close ¶
Close closes the store. If wait is true, waits for a graceful shutdown. functionality is incomplete
func (*Store) CommitIndex ¶
func (*Store) Committed ¶
Committed blocks until the local commit index is greater than or equal to the Leader index, as checked when the function is called. It returns the committed index. If the Leader index is 0, then the system waits until the commit index is at least 1.
func (*Store) Execute ¶
func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)
Execute executes queries that return no rows, but do modify the database.
func (*Store) GetFromDatabase ¶
GetFromDatabase retrieves a value from the Badger database by key. Returns the value as a string
func (*Store) HasLeaderID ¶
HasLeaderID returns true if the cluster has a leader ID, false otherwise.
func (*Store) IsVoter ¶
IsVoter returns true if the current node is a voter in the cluster. If there is no reference to the current node in the current cluster configuration then false will also be returned.
func (*Store) Join ¶
func (s *Store) Join(jr *commandProto.JoinRequest) error
Join request to join this store
func (*Store) LeaderAddr ¶
LeaderAddr returns the address of the current leader. Returns a blank string if there is no leader or if the Store is not open.
func (*Store) LeaderCommitIndex ¶
LeaderCommitIndex returns the Raft leader commit index, as indicated by the latest AppendEntries RPC. If this node is the Leader then the commit index is returned directly from the Raft object.
func (*Store) LeaderID ¶
LeaderID returns the node ID of the Raft leader. Returns a blank string if there is no leader, or an error.
func (*Store) LeaderWithID ¶
LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.
func (*Store) Load ¶
func (s *Store) Load(lr *proto.LoadRequest) error
Loads an entire BadgerDB file into the database, sending the request through the Raft log.
func (*Store) Notify ¶
func (s *Store) Notify(nr *commandProto.NotifyRequest) error
Notify notifies this Store that a node is ready for bootstrapping at the given address. Once the number of known nodes reaches the expected level bootstrapping will be attempted using this Store. "Expected level" includes this node, so this node must self-notify to ensure the cluster bootstraps with the *advertised Raft address* which the Store doesn't know about.
Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (*Store) RegisterReadyChannel ¶
func (s *Store) RegisterReadyChannel(ch <-chan struct{})
RegisterReadyChannel registers a channel that must be closed before the store is considered "ready" to serve requests.
func (*Store) Remove ¶
func (s *Store) Remove(rn *commandProto.RemoveNodeRequest) error
func (*Store) Request ¶
func (s *Store) Request(eqr *commandProto.ExecuteQueryRequest) ([]*commandProto.ExecuteQueryResponse, error)
func (*Store) Snapshot ¶
Snapshot performs a snapshot, leaving n trailing logs behind. If n is greater than zero, that many logs are left in the log after snapshotting. If n is zero, then the number set at Store creation is used. Finally, once this function returns, the trailing log configuration value is reset to the value set at Store creation.
func (*Store) State ¶
func (s *Store) State() ClusterState
State returns the current node's Raft state
func (*Store) Stats ¶
Stats returns stats for the store. Not complete: does not include badger db stats
func (*Store) Stepdown ¶
Stepdown forces this node to relinquish leadership to another node in the cluster. If this node is not the leader, and 'wait' is true, an error will be returned.
func (*Store) StoreInDatabase ¶
store a value in the badger database
func (*Store) WaitForCommitIndex ¶
WaitForCommitIndex blocks until the local Raft commit index is equal to or greater the given index, or the timeout expires.
func (*Store) WaitForLeader ¶
WaitForLeader blocks until a leader is detected, or the timeout expires.
type Transport ¶
type Transport struct {
// contains filtered or unexported fields
}
Transport is the network service provided to Raft, and wraps a Listener.
func NewTransport ¶
NewTransport returns an initialized Transport.