postgres

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package postgres provides database operations for the poutbox outbox system. It handles connection management, schema migration, and logical replication for processing jobs.

Example usage with polling mode:

db, err := postgres.Connect(ctx, connStr)
defer db.Close()

err = postgres.Migrate(ctx, db)

Example usage with logical replication mode:

db, err := postgres.Connect(ctx, connStr)
defer db.Close()

err = postgres.Migrate(ctx, db)
err = postgres.InitializeLogicalReplication(ctx, db)

stream, err := postgres.NewReplicationStream(ctx, replConnStr, 0)
defer stream.Close(ctx)

for event, err := range stream.Events(ctx) {
	if err != nil {
		break
	}
	// Process event.Change or event.Keepalive
}

Index

Constants

View Source
const (
	// PublicationName is the publication used for logical replication.
	PublicationName = "poutbox_immediate_pub"
	// ReplicationSlot is the replication slot name for tracking LSN progress.
	ReplicationSlot = "poutbox_immediate_slot"
)
View Source
const (
	// ReplicationEventTypeKeepalive indicates a keepalive message from the server.
	ReplicationEventTypeKeepalive = "keepalive"
	// ReplicationEventTypeInsert indicates an insert change event.
	ReplicationEventTypeInsert = "insert"
)

Variables

This section is empty.

Functions

func Connect

func Connect(ctx context.Context, connStr string) (*sql.DB, error)

Connect establishes a connection to Postgres using the given connection string. Configures connection pooling: 25 max open, 5 idle, 5min lifetime, 2min idle timeout.

func InitializeLogicalReplication

func InitializeLogicalReplication(ctx context.Context, db *sql.DB) error

InitializeLogicalReplication creates the publication and replication slot if they don't exist. Idempotent: safe to call multiple times.

func Migrate

func Migrate(ctx context.Context, db *sql.DB) error

Migrate creates the database schema from the embedded schema.sql file.

Types

type DBTX

type DBTX interface {
	ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
	PrepareContext(context.Context, string) (*sql.Stmt, error)
	QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
	QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

type EnqueueScheduledParams

type EnqueueScheduledParams struct {
	Payload     string
	ScheduledAt time.Time
}

type GetImmediateJobsParams

type GetImmediateJobsParams struct {
	LastProcessedAt            time.Time
	LastProcessedTransactionID int64
	LastProcessedID            int64
	BatchSize                  int32
}

type GetScheduledJobsReadyRow

type GetScheduledJobsReadyRow struct {
	ID          int64
	Payload     string
	ScheduledAt time.Time
}

type InsertDeadLetterBatchParams

type InsertDeadLetterBatchParams struct {
	Ids           []int64
	Payloads      []string
	ErrorMessages []string
	RetryCounts   []int32
}

type InsertDeadLetterParams

type InsertDeadLetterParams struct {
	ID           int64
	Payload      string
	ErrorMessage sql.NullString
	RetryCount   int32
}

type InsertFailedBatchParams

type InsertFailedBatchParams struct {
	Ids           []int64
	Payloads      []string
	ErrorMessages []string
	RetryCounts   []int32
}

type InsertFailedParams

type InsertFailedParams struct {
	ID           int64
	Payload      string
	ErrorMessage sql.NullString
	RetryCount   int32
}

type InsertPartitionMetaParams

type InsertPartitionMetaParams struct {
	PartitionName string
	RangeStart    time.Time
	RangeEnd      time.Time
}

type KeepaliveRequest

type KeepaliveRequest struct {
	// ServerWALEnd is the server's current write-ahead log position.
	ServerWALEnd LSN
	// ReplyRequested indicates if the client should send a status update.
	ReplyRequested bool
}

KeepaliveRequest represents a server keepalive message.

type LSN

type LSN = pglogrepl.LSN

LSN is a Postgres log sequence number representing a position in the WAL.

type ListPartitionsRow

type ListPartitionsRow struct {
	PartitionName string
	RangeStart    time.Time
	RangeEnd      time.Time
}

type LogicalReplChange

type LogicalReplChange struct {
	// ID is the job identifier from the database.
	ID int64
	// Payload is the job data as bytes (typically JSON).
	Payload []byte
	// CreatedAt is when the change was created in the database.
	CreatedAt time.Time
	// TransactionID is the transaction ID that produced this change.
	TransactionID int64
	// CommitLsn is the LSN as stored in the immediate table.
	CommitLsn string
	// LSN is the write-ahead log position of this change.
	LSN LSN
}

LogicalReplChange represents a database change captured via logical replication.

type MessageParser

type MessageParser interface {
	// Parse decodes raw WAL data into a pglogrepl message.
	Parse(walData []byte) (pglogrepl.Message, error)
	// ParseRelation extracts table metadata from a relation message.
	ParseRelation(msg *pglogrepl.RelationMessage) *RelationMetadata
	// ParseInsert extracts change data from an insert message.
	ParseInsert(msg *pglogrepl.InsertMessageV2, relMeta *RelationMetadata, lsn LSN) (*LogicalReplChange, error)
}

MessageParser converts WAL messages into logical replication changes.

type PartitionOps

type PartitionOps struct {
	// contains filtered or unexported fields
}

PartitionOps manages table partitions for the immediate queue. Creates new partitions and removes old ones based on time windows.

func NewPartitionOps

func NewPartitionOps(db *sql.DB) *PartitionOps

NewPartitionOps creates a new PartitionOps instance with the given database.

func (*PartitionOps) Run

func (p *PartitionOps) Run(ctx context.Context, from time.Time, to time.Time, interval time.Duration, cutoffTime time.Time) error

Run creates partitions from 'from' to 'to' with the given interval. Drops partitions with end time before cutoffTime. All operations are wrapped in a transaction with locking for safety.

type PgoutputParser

type PgoutputParser struct{}

PgoutputParser implements MessageParser using the pgoutput plugin format.

func NewPgoutputParser

func NewPgoutputParser() *PgoutputParser

NewPgoutputParser creates a new pgoutput message parser.

func (*PgoutputParser) Parse

func (p *PgoutputParser) Parse(walData []byte) (pglogrepl.Message, error)

Parse decodes raw WAL data into a pglogrepl message.

func (*PgoutputParser) ParseInsert

func (p *PgoutputParser) ParseInsert(msg *pglogrepl.InsertMessageV2, relMeta *RelationMetadata, lsn LSN) (*LogicalReplChange, error)

ParseInsert extracts change data from an insert message into a LogicalReplChange.

func (*PgoutputParser) ParseRelation

func (p *PgoutputParser) ParseRelation(msg *pglogrepl.RelationMessage) *RelationMetadata

ParseRelation extracts table metadata from a relation message.

type PoutboxCursor

type PoutboxCursor struct {
	ID                         int32
	LastProcessedID            int64
	LastProcessedAt            time.Time
	LastProcessedTransactionID int64
	LastLsn                    string
	UpdatedAt                  time.Time
}

type PoutboxDeadLetter

type PoutboxDeadLetter struct {
	ID           int64
	Payload      string
	ErrorMessage sql.NullString
	RetryCount   int32
	FailedAt     time.Time
	DeadAt       time.Time
}

type PoutboxFailed

type PoutboxFailed struct {
	ID           int64
	Payload      string
	ErrorMessage sql.NullString
	RetryCount   int32
	FailedAt     time.Time
	NextRetryAt  time.Time
}

type PoutboxImmediate

type PoutboxImmediate struct {
	ID            int64
	Payload       string
	CreatedAt     time.Time
	TransactionID int64
	CommitLsn     string
}

type PoutboxPartitionMetum

type PoutboxPartitionMetum struct {
	PartitionName string
	RangeStart    time.Time
	RangeEnd      time.Time
	CreatedAt     time.Time
}

type PoutboxScheduled

type PoutboxScheduled struct {
	ID          int64
	Payload     string
	ScheduledAt time.Time
	CreatedAt   time.Time
}

type Queries

type Queries struct {
}

func New

func New() *Queries

func (*Queries) DeleteFailed

func (q *Queries) DeleteFailed(ctx context.Context, db DBTX, id int64) error

func (*Queries) DeleteFailedBatch

func (q *Queries) DeleteFailedBatch(ctx context.Context, db DBTX, ids []int64) error

func (*Queries) DeleteImmediate

func (q *Queries) DeleteImmediate(ctx context.Context, db DBTX, id int64) error

func (*Queries) DeletePartitionMeta

func (q *Queries) DeletePartitionMeta(ctx context.Context, db DBTX, partitionName string) error

func (*Queries) DeleteScheduledBatch

func (q *Queries) DeleteScheduledBatch(ctx context.Context, db DBTX, ids []int64) error

func (*Queries) EnqueueImmediate

func (q *Queries) EnqueueImmediate(ctx context.Context, db DBTX, payload string) (int64, error)

func (*Queries) EnqueueScheduled

func (q *Queries) EnqueueScheduled(ctx context.Context, db DBTX, arg EnqueueScheduledParams) (int64, error)

func (*Queries) GetCursor

func (q *Queries) GetCursor(ctx context.Context, db DBTX) (PoutboxCursor, error)

func (*Queries) GetFailedJobsReady

func (q *Queries) GetFailedJobsReady(ctx context.Context, db DBTX, batchSize int32) ([]PoutboxFailed, error)

func (*Queries) GetImmediateJobs

func (q *Queries) GetImmediateJobs(ctx context.Context, db DBTX, arg GetImmediateJobsParams) ([]PoutboxImmediate, error)

func (*Queries) GetScheduledJobsReady

func (q *Queries) GetScheduledJobsReady(ctx context.Context, db DBTX, batchSize int32) ([]GetScheduledJobsReadyRow, error)

func (*Queries) InsertDeadLetter

func (q *Queries) InsertDeadLetter(ctx context.Context, db DBTX, arg InsertDeadLetterParams) error

func (*Queries) InsertDeadLetterBatch

func (q *Queries) InsertDeadLetterBatch(ctx context.Context, db DBTX, arg InsertDeadLetterBatchParams) error

func (*Queries) InsertFailed

func (q *Queries) InsertFailed(ctx context.Context, db DBTX, arg InsertFailedParams) error

func (*Queries) InsertFailedBatch

func (q *Queries) InsertFailedBatch(ctx context.Context, db DBTX, arg InsertFailedBatchParams) error

func (*Queries) InsertPartitionMeta

func (q *Queries) InsertPartitionMeta(ctx context.Context, db DBTX, arg InsertPartitionMetaParams) error

func (*Queries) ListPartitions

func (q *Queries) ListPartitions(ctx context.Context, db DBTX) ([]ListPartitionsRow, error)

func (*Queries) UpdateCursor

func (q *Queries) UpdateCursor(ctx context.Context, db DBTX, arg UpdateCursorParams) error

func (*Queries) UpdateFailed

func (q *Queries) UpdateFailed(ctx context.Context, db DBTX, arg UpdateFailedParams) error

type RelationMetadata

type RelationMetadata struct {
	// ID is the relation OID.
	ID uint32
	// Name is the table name.
	Name string
	// Columns describes the table columns.
	Columns []*pglogrepl.RelationMessageColumn
}

RelationMetadata describes a database table for logical replication.

type ReplicationEvent

type ReplicationEvent struct {
	// Type is the event type: "keepalive" or "insert".
	Type string
	// Change contains the logical replication change data for insert events.
	Change *LogicalReplChange
	// Keepalive contains the server keepalive request for keepalive events.
	Keepalive *KeepaliveRequest
}

ReplicationEvent represents an event received from logical replication. Either Change or Keepalive will be set depending on Type.

type ReplicationStream

type ReplicationStream struct {
	// contains filtered or unexported fields
}

ReplicationStream receives logical replication events from Postgres. It parses WAL messages and tracks relation metadata.

func NewReplicationStream

func NewReplicationStream(ctx context.Context, connStr string, startLSN string) (*ReplicationStream, error)

NewReplicationStream creates a new replication stream starting at the given LSN. Connects to Postgres, identifies the system, and starts replication.

func (*ReplicationStream) Close

func (rs *ReplicationStream) Close(ctx context.Context) error

Close closes the replication stream and releases the connection.

func (*ReplicationStream) Events

Events returns an iterator of replication events from the stream. Yields either insert changes or keepalive messages as they arrive.

func (*ReplicationStream) SendKeepalive

func (rs *ReplicationStream) SendKeepalive(ctx context.Context, walApplyPosition LSN) error

SendKeepalive sends a status update to the server with the current apply position. Uses a separate timeout to avoid losing messages if the context is cancelled.

type UpdateCursorParams

type UpdateCursorParams struct {
	LastProcessedID            int64
	LastProcessedAt            time.Time
	LastProcessedTransactionID int64
	LastLsn                    string
}

type UpdateFailedParams

type UpdateFailedParams struct {
	ErrorMessage sql.NullString
	RetryCount   int32
	ID           int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL