store

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2025 License: GPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStoreNotOpen is returned when a Store is not open.
	ErrStoreNotOpen = errors.New("store not open")

	// ErrStoreOpen is returned when a Store is already open.
	ErrStoreOpen = errors.New("store already open")

	// ErrNotReady is returned when a Store is not ready to accept requests.
	ErrNotReady = errors.New("store not ready")

	// ErrNotLeader is returned when a node attempts to execute a leader-only
	// operation.
	ErrNotLeader = errors.New("not leader")

	// ErrNotSingleNode is returned when a node attempts to execute a single-node
	// only operation.
	ErrNotSingleNode = errors.New("not single-node")

	// ErrStaleRead is returned if the executing the query would violate the
	// requested freshness.
	ErrStaleRead = errors.New("stale read")

	// ErrOpenTimeout is returned when the Store does not apply its initial
	// logs within the specified time.
	ErrOpenTimeout = errors.New("timeout waiting for initial logs application")

	// ErrWaitForRemovalTimeout is returned when the Store does not confirm removal
	// of a node within the specified time.
	ErrWaitForRemovalTimeout = errors.New("timeout waiting for node removal confirmation")

	// ErrWaitForLeaderTimeout is returned when the Store cannot determine the leader
	// within the specified time.
	ErrWaitForLeaderTimeout = errors.New("timeout waiting for leader")

	// ErrInvalidBackupFormat is returned when the requested backup format
	// is not valid.
	ErrInvalidBackupFormat = errors.New("invalid backup format")

	// ErrInvalidVacuumFormat is returned when the requested backup format is not
	// compatible with vacuum.
	ErrInvalidVacuum = errors.New("invalid vacuum")

	// ErrLoadInProgress is returned when a load is already in progress and the
	// requested operation cannot be performed.
	ErrLoadInProgress = errors.New("load in progress")

	// ErrNotImplemented when there is no implementation of the function
	// will only exits until this application in under development
	ErrNotImplemented = errors.New("not implemented")

	// ErrDatabaseNotOpen when the database is closed
	ErrDatabaseNotOpen = errors.New("database is not open")
)

Functions

func GetNodeAPIAddr

func GetNodeAPIAddr(addr string, retries int, timeout time.Duration) (string, error)

func IsNewNode

func IsNewNode(raftDir string) bool

IsNewNode checks if this the a new or pre-existing node

func IsStaleRead

func IsStaleRead(
	leaderLastContact time.Time,
	lastFSMUpdateTime time.Time,
	lastAppendedAtTime time.Time,
	fsmIndex uint64,
	commitIndex uint64,
	freshness int64,
	strict bool,
) bool

IsStaleRead returns whether a read is stale.

func ResetStats

func ResetStats()

ResetStats resets the expvar stats for this module. Mostly for test purposes.

Types

type ClusterState

type ClusterState int

ClusterState defines the possible Raft states the current node can be in

const (
	Leader ClusterState = iota
	Follower
	Candidate
	Shutdown
	Unknown
)

Represents the Raft cluster states

type Config

type Config struct {
	Dir string    // The working directory for raft.
	Tn  Transport // The underlying Transport for raft.
	ID  string    // Node ID.
}

type FSM

type FSM struct {
	// contains filtered or unexported fields
}

FSM is a wrapper around the Store which implements raft.FSM.

func NewFSM

func NewFSM(s *Store) *FSM

NewFSM returns a new FSM.

func (*FSM) Apply

func (f *FSM) Apply(l *raft.Log) interface{}

Apply applies a Raft log entry to the Store.

func (*FSM) Restore

func (f *FSM) Restore(rc io.ReadCloser) error

Restore restores the Store from a snapshot.

func (*FSM) Snapshot

func (f *FSM) Snapshot() (raft.FSMSnapshot, error)

Snapshot returns a Snapshot of the Store

type FSMSnapshot

type FSMSnapshot struct {
	Finalizer func() error
	OnFailure func()

	raft.FSMSnapshot
	// contains filtered or unexported fields
}

FSMSnapshot is a wrapper around raft.FSMSnapshot which adds an optional Finalizer, instrumentation, and logging.

func (*FSMSnapshot) Persist

func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) (retError error)

Persist writes the snapshot to the given sink.

func (*FSMSnapshot) Release

func (f *FSMSnapshot) Release()

Release performs any final cleanup once the Snapshot has been persisted.

type Layer

type Layer interface {
	net.Listener
	Dial(address string, timeout time.Duration) (net.Conn, error)
}

Layer is the interface expected by the Store for network communication between nodes, which is used for Raft distributed consensus.

type NodeTransport

type NodeTransport struct {
	*raft.NetworkTransport
	// contains filtered or unexported fields
}

NodeTransport is a wrapper around the Raft NetworkTransport, which allows custom configuration of the InstallSnapshot method.

func NewNodeTransport

func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport

NewNodeTransport returns an initialized NodeTransport.

func (*NodeTransport) Close

func (n *NodeTransport) Close() error

Close closes the transport

func (*NodeTransport) CommandCommitIndex

func (n *NodeTransport) CommandCommitIndex() uint64

CommandCommitIndex returns the index of the latest committed log entry which is applied to the FSM.

func (*NodeTransport) Consumer

func (n *NodeTransport) Consumer() <-chan raft.RPC

Consumer returns a channel of RPC requests to be consumed.

func (*NodeTransport) InstallSnapshot

InstallSnapshot is used to push a snapshot down to a follower. The data is read from the ReadCloser and streamed to the client.

func (*NodeTransport) LeaderCommitIndex

func (n *NodeTransport) LeaderCommitIndex() uint64

LeaderCommitIndex returns the index of the latest committed log entry which is known to be replicated to the majority of the cluster.

func (*NodeTransport) Stats

func (n *NodeTransport) Stats() map[string]interface{}

Stats returns the current stats of the transport.

type PragmaCheckRequest

type PragmaCheckRequest proto.Request

type Server

type Server struct {
	ID       string `json:"id,omitempty"`
	Addr     string `json:"addr,omitempty"`
	Suffrage string `json:"suffrage,omitempty"`
}

Server represents another node in the cluster.

func NewServer

func NewServer(id, addr string, voter bool) *Server

NewServer returns an initialized Server.

type Servers

type Servers []*Server

Servers is a set of Servers.

func (Servers) Contains

func (s Servers) Contains(id string) bool

Contains returns whether the given node, as specified by its Raft ID, is a member of the set of servers.

func (Servers) IsReadOnly

func (s Servers) IsReadOnly(id string) (readOnly bool, found bool)

IsReadOnly returns whether the given node, as specified by its Raft ID, is a read-only (non-voting) node. If no node is found with the given ID then found will be false.

func (Servers) Len

func (s Servers) Len() int

func (Servers) Less

func (s Servers) Less(i, j int) bool

func (Servers) Swap

func (s Servers) Swap(i, j int)

type SnapshotStore

type SnapshotStore interface {
	raft.SnapshotStore

	// FullNeeded returns true if a full snapshot is needed.
	FullNeeded() (bool, error)

	// SetFullNeeded explicitly sets that a full snapshot is needed.
	SetFullNeeded() error

	// Stats returns stats about the Snapshot Store.
	Stats() (map[string]interface{}, error)
}

SnapshotStore is the interface Snapshot stores must implement.

type Store

type Store struct {
	ShutdownOnRemove     bool
	SnapshotThreshold    uint64
	SnapshotInterval     time.Duration
	LeaderLeaseTimeout   time.Duration
	HeartbeatTimeout     time.Duration
	ElectionTimeout      time.Duration
	ApplyTimeout         time.Duration
	RaftLogLevel         string
	NoFreeListSync       bool
	AutoVacInterval      time.Duration
	AutoOptimizeInterval time.Duration

	BootstrapExpect int

	// Node-reaping configuration
	ReapTimeout         time.Duration
	ReapReadOnlyTimeout time.Duration
	// contains filtered or unexported fields
}

Wire Store is a BBolt/badgerDB database, where all changes are made via Raft consensus.

func New

func New(ly Layer, c *Config) *Store

allocate a new store in memory and initialize

func (*Store) Addr

func (s *Store) Addr() string

Addr returns the address of the store.

func (*Store) Backup

func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error)

Backup writes a consistent snapshot of the underlying database to dst. This can be called while writes are being made to the system. The backup may fail if the system is actively snapshotting. The client can just retry in this case.

func (*Store) Bootstrap

func (s *Store) Bootstrap(servers ...*Server) error

Bootstrap executes a cluster bootstrap on this node, using the given Servers as the configuration.

func (*Store) Close

func (s *Store) Close(wait bool) (retErr error)

Close closes the store. If wait is true, waits for a graceful shutdown. functionality is incomplete

func (*Store) CommitIndex

func (s *Store) CommitIndex() (uint64, error)

func (*Store) Committed

func (s *Store) Committed(timeout time.Duration) (uint64, error)

Committed blocks until the local commit index is greater than or equal to the Leader index, as checked when the function is called. It returns the committed index. If the Leader index is 0, then the system waits until the commit index is at least 1.

func (*Store) Execute

func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteQueryResponse, error)

Execute executes queries that return no rows, but do modify the database.

func (*Store) GetFromDatabase

func (s *Store) GetFromDatabase(key string) (string, error)

GetFromDatabase retrieves a value from the Badger database by key. Returns the value as a string

func (*Store) HasLeader

func (s *Store) HasLeader() bool

HasLeader returns true if the cluster has a leader, false otherwise.

func (*Store) HasLeaderID

func (s *Store) HasLeaderID() bool

HasLeaderID returns true if the cluster has a leader ID, false otherwise.

func (*Store) ID

func (s *Store) ID() string

ID returns the Raft ID of the store.

func (*Store) IsLeader

func (s *Store) IsLeader() bool

IsLeader return true if the node is the cluster leader else returns false

func (*Store) IsVoter

func (s *Store) IsVoter() (bool, error)

IsVoter returns true if the current node is a voter in the cluster. If there is no reference to the current node in the current cluster configuration then false will also be returned.

func (*Store) Join

func (s *Store) Join(jr *commandProto.JoinRequest) error

Join request to join this store

func (*Store) LeaderAddr

func (s *Store) LeaderAddr() (string, error)

LeaderAddr returns the address of the current leader. Returns a blank string if there is no leader or if the Store is not open.

func (*Store) LeaderCommitIndex

func (s *Store) LeaderCommitIndex() (uint64, error)

LeaderCommitIndex returns the Raft leader commit index, as indicated by the latest AppendEntries RPC. If this node is the Leader then the commit index is returned directly from the Raft object.

func (*Store) LeaderID

func (s *Store) LeaderID() (string, error)

LeaderID returns the node ID of the Raft leader. Returns a blank string if there is no leader, or an error.

func (*Store) LeaderWithID

func (s *Store) LeaderWithID() (string, string)

LeaderWithID is used to return the current leader address and ID of the cluster. It may return empty strings if there is no current leader or the leader is unknown.

func (*Store) Load

func (s *Store) Load(lr *proto.LoadRequest) error

Loads an entire BadgerDB file into the database, sending the request through the Raft log.

func (*Store) Nodes

func (s *Store) Nodes() ([]*Server, error)

Nodes returns the slice of nodes in the cluster, sorted by ID ascending.

func (*Store) Notify

func (s *Store) Notify(nr *commandProto.NotifyRequest) error

Notify notifies this Store that a node is ready for bootstrapping at the given address. Once the number of known nodes reaches the expected level bootstrapping will be attempted using this Store. "Expected level" includes this node, so this node must self-notify to ensure the cluster bootstraps with the *advertised Raft address* which the Store doesn't know about.

Notifying is idempotent. A node may repeatedly notify the Store without issue.

func (*Store) Open

func (s *Store) Open() (retError error)

open the store

func (*Store) Path

func (s *Store) Path() string

Path returns the path to the store's storage directory.

func (*Store) Query

func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error)

func (*Store) Ready

func (s *Store) Ready() bool

func (*Store) RegisterReadyChannel

func (s *Store) RegisterReadyChannel(ch <-chan struct{})

RegisterReadyChannel registers a channel that must be closed before the store is considered "ready" to serve requests.

func (*Store) Remove

func (s *Store) Remove(rn *commandProto.RemoveNodeRequest) error

func (*Store) Snapshot

func (s *Store) Snapshot(n uint64) (retError error)

Snapshot performs a snapshot, leaving n trailing logs behind. If n is greater than zero, that many logs are left in the log after snapshotting. If n is zero, then the number set at Store creation is used. Finally, once this function returns, the trailing log configuration value is reset to the value set at Store creation.

func (*Store) State

func (s *Store) State() ClusterState

State returns the current node's Raft state

func (*Store) Stats

func (s *Store) Stats() (map[string]interface{}, error)

Stats returns stats for the store. Not complete: does not include badger db stats

func (*Store) Stepdown

func (s *Store) Stepdown(wait bool) error

Stepdown forces this node to relinquish leadership to another node in the cluster. If this node is not the leader, and 'wait' is true, an error will be returned.

func (*Store) StoreInDatabase

func (s *Store) StoreInDatabase(key, value string) error

store a value in the badger database

func (*Store) WaitForCommitIndex

func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error

WaitForCommitIndex blocks until the local Raft commit index is equal to or greater the given index, or the timeout expires.

func (*Store) WaitForLeader

func (s *Store) WaitForLeader(timeout time.Duration) (string, error)

WaitForLeader blocks until a leader is detected, or the timeout expires.

func (*Store) WaitForRemoval

func (s *Store) WaitForRemoval(id string, timeout time.Duration) error

WaitForRemoval blocks until a node with the given ID is removed from the cluster or the timeout expires.

type StoreNew

type StoreNew struct {
	Mu sync.Mutex
	// contains filtered or unexported fields
}

func NewStore

func NewStore() *StoreNew

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport is the network service provided to Raft, and wraps a Listener.

func NewTransport

func NewTransport(ly Layer) *Transport

NewTransport returns an initialized Transport.

func (*Transport) Accept

func (t *Transport) Accept() (net.Conn, error)

Accept waits for the next connection.

func (*Transport) Addr

func (t *Transport) Addr() net.Addr

Addr returns the binding address of the transport.

func (*Transport) Close

func (t *Transport) Close() error

Close closes the transport

func (*Transport) Dial

func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error)

Dial creates a new network connection.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL