persistence

package
v2.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 10, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

Package persistence provides the persistence layer for CommitDB.

The persistence layer is backed by Git, using go-git for storage. Every write operation creates a Git commit, providing full version control and history tracking.

Memory Persistence

For testing or ephemeral databases:

p, err := persistence.NewMemoryPersistence()
if err != nil {
    log.Fatal(err)
}

File Persistence

For persistent storage:

p, err := persistence.NewFilePersistence("/path/to/data", nil)
if err != nil {
    log.Fatal(err)
}

Transaction Batching

For improved write performance, use TransactionBuilder:

txn, _ := persistence.BeginTransaction()
txn.AddWrite("db", "table", "key1", data1)
txn.AddWrite("db", "table", "key2", data2)
result, _ := txn.Commit(identity)

Indexing

The persistence layer supports B-tree indexes for faster queries:

im := persistence.NewIndexManager(p, identity)
im.CreateIndex("idx_name", "db", "table", "column", false)
keys := im.Lookup("db", "table", "column", "value")

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNotInitialized = errors.New("persistence layer not initialized")
	ErrRepoNotFound   = errors.New("repository not found")
)

Functions

This section is empty.

Types

type AuthType

type AuthType string

AuthType defines the type of authentication

const (
	AuthTypeNone  AuthType = "none"
	AuthTypeToken AuthType = "token"
	AuthTypeSSH   AuthType = "ssh"
	AuthTypeBasic AuthType = "basic"
)

type Index

type Index struct {
	Name     string              `json:"name"`
	Database string              `json:"database"`
	Table    string              `json:"table"`
	Column   string              `json:"column"`
	Unique   bool                `json:"unique"`
	Entries  map[string][]string `json:"entries"` // column value -> list of primary keys
}

Index represents a B-tree index on a column

func (*Index) Delete

func (idx *Index) Delete(columnValue, primaryKey string)

Delete removes an entry from the index

func (*Index) Insert

func (idx *Index) Insert(columnValue, primaryKey string) error

Insert adds an entry to the index

func (*Index) Lookup

func (idx *Index) Lookup(columnValue string) []string

Lookup finds primary keys for a given column value

func (*Index) LookupRange

func (idx *Index) LookupRange(minValue, maxValue string) []string

LookupRange finds primary keys within a range (inclusive)

type IndexManager

type IndexManager struct {
	// contains filtered or unexported fields
}

IndexManager manages indexes for a persistence layer

func NewIndexManager

func NewIndexManager(persistence *Persistence, identity core.Identity) *IndexManager

NewIndexManager creates a new index manager

func (*IndexManager) CreateIndex

func (im *IndexManager) CreateIndex(name, database, table, column string, unique bool) (*Index, error)

CreateIndex creates a new index on a column

func (*IndexManager) DropIndex

func (im *IndexManager) DropIndex(database, table, column string) error

DropIndex removes an index

func (*IndexManager) GetIndex

func (im *IndexManager) GetIndex(database, table, column string) (*Index, bool)

GetIndex retrieves an existing index

func (*IndexManager) LoadIndexes

func (im *IndexManager) LoadIndexes(database, table string, columns []core.Column) error

LoadIndexes loads all indexes from storage for a table using plumbing API

func (*IndexManager) RebuildIndex

func (im *IndexManager) RebuildIndex(idx *Index, getRecordValue func(pk string) (string, bool)) error

RebuildIndex rebuilds an index by scanning all records

func (*IndexManager) SaveIndex

func (im *IndexManager) SaveIndex(idx *Index) error

SaveIndex persists an index to storage (public wrapper)

type MergeOptions

type MergeOptions struct {
	Strategy MergeStrategy
}

MergeOptions configures merge behavior

func DefaultMergeOptions

func DefaultMergeOptions() MergeOptions

DefaultMergeOptions returns the default merge options (row-level merge)

type MergeResult

type MergeResult struct {
	Transaction   Transaction
	MergedRecords int
	FastForward   bool
	Conflicts     []RecordConflict // conflicts that were auto-resolved (LWW)
	Unresolved    []RecordConflict // conflicts requiring manual resolution
	MergeID       string           // ID to resume pending merge (empty if complete)
	Pending       bool             // true if merge is pending manual resolution
}

MergeResult contains information about a completed or pending merge

type MergeStrategy

type MergeStrategy string

MergeStrategy defines how to handle merge

const (
	// MergeStrategyFastForwardOnly only allows fast-forward merges
	MergeStrategyFastForwardOnly MergeStrategy = "fast-forward-only"
	// MergeStrategyRowLevel performs row-level merge for diverged branches (LWW auto-resolve)
	MergeStrategyRowLevel MergeStrategy = "row-level"
	// MergeStrategyManual pauses on conflicts for manual resolution
	MergeStrategyManual MergeStrategy = "manual"
)

type Operation

type Operation struct {
	Type     OperationType
	Database string
	Table    string
	Key      string
	Data     []byte
}

Operation represents a single write operation in a transaction

type OperationType

type OperationType int
const (
	WriteOp OperationType = iota
	DeleteOp
)

type PendingMerge

type PendingMerge struct {
	MergeID       string            `json:"merge_id"`
	HeadCommit    string            `json:"head_commit"`
	SourceCommit  string            `json:"source_commit"`
	SourceBranch  string            `json:"source_branch"`
	BaseCommit    string            `json:"base_commit"`
	Resolved      map[string][]byte `json:"resolved"`       // key -> resolved data
	Unresolved    []RecordConflict  `json:"unresolved"`     // remaining conflicts
	MergedRecords map[string][]byte `json:"merged_records"` // non-conflicting merged data
	CreatedAt     time.Time         `json:"created_at"`
}

PendingMerge stores state for a merge awaiting manual conflict resolution

type Persistence

type Persistence struct {
	// contains filtered or unexported fields
}

func NewFilePersistence

func NewFilePersistence(baseDir string, gitURL *string) (Persistence, error)

func NewMemoryPersistence

func NewMemoryPersistence() (Persistence, error)

func (*Persistence) AbortMerge

func (p *Persistence) AbortMerge() error

AbortMerge cancels a pending merge

func (*Persistence) AddRemote

func (p *Persistence) AddRemote(name, url string) error

AddRemote adds a named remote to the repository

func (*Persistence) BeginTransaction

func (p *Persistence) BeginTransaction() (*TransactionBuilder, error)

BeginTransaction creates a new transaction builder for batching operations

func (*Persistence) Branch

func (p *Persistence) Branch(name string, from *Transaction) error

Branch creates a new branch at current HEAD or at a specific transaction

func (*Persistence) Checkout

func (p *Persistence) Checkout(name string) error

Checkout switches to an existing branch

func (*Persistence) CompleteMerge

func (p *Persistence) CompleteMerge(identity core.Identity) (Transaction, error)

CompleteMerge finishes a pending merge after all conflicts are resolved

func (*Persistence) CopyRecords

func (p *Persistence) CopyRecords(srcDatabase, srcTable, dstDatabase, dstTable string, identity core.Identity) (txn Transaction, err error)

CopyRecords copies all records from source table to destination table in a single atomic transaction. This is memory-efficient: records are streamed row-by-row from source and written to dest without loading all into memory.

func (*Persistence) CopyRecordsDirect

func (p *Persistence) CopyRecordsDirect(srcDatabase, srcTable, dstDatabase, dstTable string, identity core.Identity) (Transaction, error)

CopyRecordsDirect copies records between tables using low-level plumbing API Uses batch tree update for efficient multi-record operations

func (*Persistence) CreateDatabase

func (p *Persistence) CreateDatabase(database core.Database, identity core.Identity) (txn Transaction, err error)

func (*Persistence) CreateShare

func (p *Persistence) CreateShare(name, url string, auth *RemoteAuth, identity core.Identity) error

CreateShare creates a new share by cloning an external repository

func (*Persistence) CreateTable

func (p *Persistence) CreateTable(table core.Table, identity core.Identity) (txn Transaction, err error)

func (*Persistence) CreateView

func (p *Persistence) CreateView(view core.View, identity core.Identity) (txn Transaction, err error)

CreateView stores a view definition

func (*Persistence) CurrentBranch

func (p *Persistence) CurrentBranch() (string, error)

CurrentBranch returns the name of the current branch

func (*Persistence) DeleteBranch

func (p *Persistence) DeleteBranch(name string) error

DeleteBranch deletes a branch

func (*Persistence) DeleteMaterializedViewData

func (p *Persistence) DeleteMaterializedViewData(database, viewName string, identity core.Identity) (txn Transaction, err error)

DeleteMaterializedViewData removes cached data for a materialized view

func (*Persistence) DeletePathDirect

func (p *Persistence) DeletePathDirect(paths []string, identity core.Identity, message string) (Transaction, error)

DeletePathDirect deletes one or more paths from the repository using plumbing API

func (*Persistence) DeleteRecord

func (p *Persistence) DeleteRecord(database string, table string, key string, identity core.Identity) (txn Transaction, err error)

func (*Persistence) DeleteRecordDirect

func (p *Persistence) DeleteRecordDirect(database, table, key string, identity core.Identity) (Transaction, error)

DeleteRecordDirect deletes a record using low-level plumbing API

func (*Persistence) DropDatabase

func (p *Persistence) DropDatabase(name string, identity core.Identity) (txn Transaction, err error)

func (*Persistence) DropShare

func (p *Persistence) DropShare(name string, identity core.Identity) error

DropShare removes a share

func (*Persistence) DropTable

func (p *Persistence) DropTable(database string, table string, identity core.Identity) (txn Transaction, err error)

func (*Persistence) DropView

func (p *Persistence) DropView(database, name string, identity core.Identity) (txn Transaction, err error)

DropView removes a view definition

func (*Persistence) Fetch

func (p *Persistence) Fetch(remoteName string, auth *RemoteAuth) error

Fetch fetches refs from a remote without merging

func (*Persistence) GetDatabase

func (p *Persistence) GetDatabase(name string) (d *core.Database, err error)

func (*Persistence) GetMaterializedViewDataAtTransaction

func (p *Persistence) GetMaterializedViewDataAtTransaction(database, viewName, transactionID string) ([]map[string]string, error)

GetMaterializedViewDataAtTransaction reads cached data for a materialized view at a specific transaction

func (*Persistence) GetPendingMerge

func (p *Persistence) GetPendingMerge() *PendingMerge

GetPendingMerge returns the current pending merge, if any

func (*Persistence) GetRecord

func (p *Persistence) GetRecord(database string, table string, key string) (data []byte, exists bool)

func (*Persistence) GetRecordAtTransaction

func (p *Persistence) GetRecordAtTransaction(database, table, key, transactionID string) ([]byte, bool, error)

GetRecordAtTransaction reads a record as it existed at a specific transaction (commit).

func (*Persistence) GetRecordDirect

func (p *Persistence) GetRecordDirect(database, table, key string) ([]byte, bool)

GetRecordDirect reads a record directly from the Git tree (bypasses worktree filesystem)

func (*Persistence) GetSharePath

func (p *Persistence) GetSharePath(name string) (string, error)

GetSharePath returns the filesystem path to a share

func (*Persistence) GetTable

func (p *Persistence) GetTable(database string, table string) (t *core.Table, err error)

func (*Persistence) GetTableAtTransaction

func (p *Persistence) GetTableAtTransaction(database, table, transactionID string) (*core.Table, error)

GetTableAtTransaction retrieves table metadata as it existed at a specific transaction.

func (*Persistence) GetView

func (p *Persistence) GetView(database, name string) (*core.View, error)

GetView retrieves a view definition

func (*Persistence) IsInitialized

func (p *Persistence) IsInitialized() bool

IsInitialized returns true if the persistence layer has a valid repository

func (*Persistence) IsShare

func (p *Persistence) IsShare(name string) bool

IsShare checks if a database name refers to a share

func (*Persistence) LatestTransaction

func (p *Persistence) LatestTransaction() Transaction

func (*Persistence) ListBranches

func (p *Persistence) ListBranches() ([]string, error)

ListBranches returns all branch names

func (*Persistence) ListDatabases

func (p *Persistence) ListDatabases() []string

func (*Persistence) ListEntriesDirect

func (p *Persistence) ListEntriesDirect(dirPath string) ([]TreeEntry, error)

ListEntriesDirect lists directory entries directly from the Git tree

func (*Persistence) ListRecordKeys

func (p *Persistence) ListRecordKeys(database string, table string) []string

func (*Persistence) ListRecordKeysDirect added in v2.10.0

func (p *Persistence) ListRecordKeysDirect(database, table string) []string

ListRecordKeysDirect lists all record keys in a table using a single tree traversal. More efficient than ListEntriesDirect because it avoids separate HEAD resolution.

func (*Persistence) ListRecordsAtTransaction

func (p *Persistence) ListRecordsAtTransaction(database, table, transactionID string) ([]string, error)

ListRecordsAtTransaction lists all records in a table as they existed at a specific transaction.

func (*Persistence) ListRemotes

func (p *Persistence) ListRemotes() ([]Remote, error)

ListRemotes returns all configured remotes

func (*Persistence) ListShares

func (p *Persistence) ListShares() ([]Share, error)

ListShares returns all configured shares

func (*Persistence) ListTables

func (p *Persistence) ListTables(database string) []string

func (*Persistence) ListTransactions

func (p *Persistence) ListTransactions(limit int) ([]Transaction, error)

ListTransactions returns the most recent transactions (commits)

func (*Persistence) ListViews

func (p *Persistence) ListViews(database string) ([]core.View, error)

ListViews returns all views in a database

func (*Persistence) Lock

func (p *Persistence) Lock()

Lock acquires a write lock for exclusive write operations

func (*Persistence) Merge

func (p *Persistence) Merge(source string, identity core.Identity) (Transaction, error)

Merge merges the source branch into the current branch. Uses row-level merge strategy for diverged branches (Last-Writer-Wins).

func (*Persistence) MergeWithOptions

func (p *Persistence) MergeWithOptions(source string, identity core.Identity, opts MergeOptions) (MergeResult, error)

MergeWithOptions merges source branch into current with specified options

func (*Persistence) OpenSharePersistence

func (p *Persistence) OpenSharePersistence(shareName string) (*Persistence, error)

OpenSharePersistence opens a read-only persistence for a share's repository

func (*Persistence) Pull

func (p *Persistence) Pull(remoteName, branch string, auth *RemoteAuth) error

Pull pulls changes from a remote and merges into current branch

func (*Persistence) Push

func (p *Persistence) Push(remoteName, branch string, auth *RemoteAuth) error

Push pushes a branch to a remote

func (*Persistence) RLock

func (p *Persistence) RLock()

RLock acquires a read lock for concurrent read operations

func (*Persistence) RUnlock

func (p *Persistence) RUnlock()

RUnlock releases the read lock

func (*Persistence) ReadFileDirect

func (p *Persistence) ReadFileDirect(filePath string) ([]byte, error)

ReadFileDirect reads a file directly from the Git tree (bypasses worktree filesystem)

func (*Persistence) ReadMaterializedViewData

func (p *Persistence) ReadMaterializedViewData(database, viewName string) ([]map[string]string, error)

ReadMaterializedViewData reads cached data for a materialized view

func (*Persistence) Recover

func (p *Persistence) Recover(name string) error

func (*Persistence) RemoveRemote

func (p *Persistence) RemoveRemote(name string) error

RemoveRemote removes a remote from the repository

func (*Persistence) RenameBranch

func (p *Persistence) RenameBranch(oldName, newName string) error

RenameBranch renames a branch

func (*Persistence) ResolveConflict

func (p *Persistence) ResolveConflict(database, table, key string, resolution []byte) error

ResolveConflict resolves a single conflict in a pending merge

func (*Persistence) Restore

func (p *Persistence) Restore(asof Transaction, database *string, table *string) error

func (*Persistence) SaveRecord

func (p *Persistence) SaveRecord(database string, table string, records map[string][]byte, identity core.Identity) (txn Transaction, err error)

func (*Persistence) SaveRecordDirect

func (p *Persistence) SaveRecordDirect(database, table string, records map[string][]byte, identity core.Identity) (Transaction, error)

SaveRecordDirect saves records using low-level plumbing API (no worktree) Uses batch tree update for efficient multi-record operations

func (*Persistence) Scan

func (p *Persistence) Scan(database string, table string, filterExpr *func(key string, value []byte) bool) iter.Seq2[string, []byte]

func (*Persistence) ScanDirect added in v2.10.0

func (p *Persistence) ScanDirect(database, table string) iter.Seq2[string, []byte]

ScanDirect scans all records in a table using a single tree traversal. This is significantly faster than listing keys and reading each record individually, because it resolves HEAD → commit → tree only once, then reads all blob entries. Data is read into memory under RLock, then yielded after the lock is released, allowing callers to acquire write locks during iteration (e.g., UPDATE/DELETE).

func (*Persistence) Snapshot

func (p *Persistence) Snapshot(name string, asof *Transaction) error

func (*Persistence) SyncShare

func (p *Persistence) SyncShare(name string, auth *RemoteAuth) error

SyncShare pulls latest changes from the share's remote

func (*Persistence) TransactionsFrom

func (p *Persistence) TransactionsFrom(asof string) []Transaction

func (*Persistence) TransactionsSince

func (p *Persistence) TransactionsSince(asof time.Time) []Transaction

func (*Persistence) Unlock

func (p *Persistence) Unlock()

Unlock releases the write lock

func (*Persistence) UpdateTable

func (p *Persistence) UpdateTable(table core.Table, identity core.Identity, message string) (txn Transaction, err error)

UpdateTable updates an existing table's schema

func (*Persistence) UpdateView

func (p *Persistence) UpdateView(view core.View, identity core.Identity) (txn Transaction, err error)

UpdateView updates a view definition (used for refresh timestamps)

func (*Persistence) WriteFileDirect

func (p *Persistence) WriteFileDirect(filePath string, data []byte, identity core.Identity, message string) (Transaction, error)

WriteFileDirect writes a single file to the repository using plumbing API

func (*Persistence) WriteMaterializedViewData

func (p *Persistence) WriteMaterializedViewData(database, viewName string, rows []map[string]string, identity core.Identity) (txn Transaction, err error)

WriteMaterializedViewData stores cached data for a materialized view

type RecordConflict

type RecordConflict struct {
	Database  string
	Table     string
	Key       string
	BaseVal   []byte // nil if record didn't exist at base
	HeadVal   []byte // nil if deleted in HEAD
	SourceVal []byte // nil if deleted in SOURCE
	Resolved  []byte // the resolved value (LWW winner)
}

RecordConflict represents a conflict where both branches modified the same record

type Remote

type Remote struct {
	Name string
	URLs []string
}

Remote represents a Git remote

type RemoteAuth

type RemoteAuth struct {
	Type       AuthType
	Token      string // For token auth
	KeyPath    string // For SSH key auth
	Passphrase string // For SSH key with passphrase
	Username   string // For basic auth
	Password   string // For basic auth
}

RemoteAuth holds authentication configuration for remote operations

type Share

type Share struct {
	Name      string `json:"name"`
	URL       string `json:"url"`
	CommitRef string `json:"commit_ref,omitempty"` // Last synced commit
}

Share represents an external database reference

type SharesConfig

type SharesConfig struct {
	Shares []Share `json:"shares"`
}

SharesConfig holds all shares metadata

type Transaction

type Transaction struct {
	ID      string
	When    time.Time
	Author  string // "Name <email>" format
	Message string // Commit message
}

func (Transaction) String

func (transaction Transaction) String() string

type TransactionBuilder

type TransactionBuilder struct {
	// contains filtered or unexported fields
}

TransactionBuilder allows batching multiple write operations into a single commit

func (*TransactionBuilder) AddDelete

func (tb *TransactionBuilder) AddDelete(database, table, key string) error

AddDelete adds a delete operation to the transaction batch

func (*TransactionBuilder) AddWrite

func (tb *TransactionBuilder) AddWrite(database, table, key string, data []byte) error

AddWrite adds a write operation to the transaction batch

func (*TransactionBuilder) Commit

func (tb *TransactionBuilder) Commit(identity core.Identity) (Transaction, error)

Commit applies all batched operations in a single git commit using plumbing API Uses batch tree update for efficient multi-operation commits

func (*TransactionBuilder) OperationCount

func (tb *TransactionBuilder) OperationCount() int

OperationCount returns the number of pending operations

func (*TransactionBuilder) Rollback

func (tb *TransactionBuilder) Rollback()

Rollback discards all batched operations without committing

type TreeChange

type TreeChange struct {
	Path     string        // File path (e.g., "db/table/key")
	BlobHash plumbing.Hash // Blob hash to set (ZeroHash = delete)
	IsDelete bool          // True if this is a deletion
}

TreeChange represents a single change to apply to a tree

type TreeEntry

type TreeEntry struct {
	Name  string
	IsDir bool
}

TreeEntry represents a directory entry from the Git tree

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL