Documentation
¶
Overview ¶
HarmonyQuery provides database abstractions over SP-wide Postgres-compatible instance(s). ¶
Features ¶
Rolling to secondary database servers on connection failure Convenience features for Go + SQL Prevention of SQL injection vulnerabilities Monitors behavior via Prometheus stats and logging of errors.
Usage ¶
Processes should use New() to instantiate a *DB and keep it. Consumers can use this *DB concurrently. Creating and changing tables & views should happen in ./sql/ folder. Name the file "today's date" in the format: YYYYMMDD.sql (ex: 20231231.sql for the year's last day)
a. CREATE TABLE should NOT have a schema: GOOD: CREATE TABLE foo (); BAD: CREATE TABLE me.foo (); b. Schema is managed for you. It provides isolation for integraton tests & multi-use. c. Git Merges: All run once, so old-after-new is OK when there are no deps. d. NEVER change shipped sql files. Have later files make corrections. e. Anything not ran will be ran, so an older date making it to master is OK.
Write SQL with context, raw strings, and args:
name := "Alice" var ID int err := QueryRow(ctx, "SELECT id from people where first_name=?", name).Scan(&ID) fmt.Println(ID)
Note: Scan() is column-oriented, while Select() & StructScan() is field name/tag oriented.
Index ¶
- Constants
- Variables
- func IsErrDDLConflict(err error) bool
- func IsErrSerialization(err error) bool
- func IsErrUniqueContraint(err error) bool
- func Iter[T any](db *DB, ctx context.Context, errOut *error, sql rawStringOnly, args ...any) iter.Seq[T]
- func TxIter[T any](tx *Tx, errOut *error, sql rawStringOnly, args ...any) iter.Seq[T]
- type Config
- type DB
- func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error), ...) (didCommit bool, retErr error)
- func (db *DB) DowngradeTo(ctx context.Context, dateNum int) error
- func (db *DB) Exec(ctx context.Context, sql rawStringOnly, arguments ...any) (count int, err error)
- func (db *DB) GetRoutableIP() (string, error)
- func (db *DB) ITestDeleteAll()
- func (db *DB) Query(ctx context.Context, sql rawStringOnly, arguments ...any) (*Query, error)
- func (db *DB) QueryRow(ctx context.Context, sql rawStringOnly, arguments ...any) Row
- func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error
- type ITestID
- type PoolConfig
- type Qry
- type Query
- type Row
- type TransactionOption
- type TransactionOptions
- type Tx
- func (t *Tx) Exec(sql rawStringOnly, arguments ...any) (count int, err error)
- func (t *Tx) Query(sql rawStringOnly, arguments ...any) (*Query, error)
- func (t *Tx) QueryRow(sql rawStringOnly, arguments ...any) Row
- func (t *Tx) Select(sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error
- func (t *Tx) SendBatch(ctx context.Context, b *pgx.Batch) (pgx.BatchResults, error)
Constants ¶
const SQL_START = ctxkey("sqlStart")
const SQL_STRING = ctxkey("sqlString")
Variables ¶
var DBMeasures = struct { Hits *stats.Int64Measure TotalWait *stats.Int64Measure Waits prometheus.Histogram OpenConnections *stats.Int64Measure Errors *stats.Int64Measure WhichHost prometheus.Histogram }{ Hits: stats.Int64(pre+"hits", "Total number of uses.", stats.UnitDimensionless), TotalWait: stats.Int64(pre+"total_wait", "Total delay. A numerator over hits to get average wait.", stats.UnitMilliseconds), Waits: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: pre + "waits", Buckets: waitsBuckets, Help: "The histogram of waits for query completions.", }), OpenConnections: stats.Int64(pre+"open_connections", "Total connection count.", stats.UnitDimensionless), Errors: stats.Int64(pre+"errors", "Total error count.", stats.UnitDimensionless), WhichHost: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: pre + "which_host", Buckets: whichHostBuckets, Help: "The index of the hostname being used", }), }
DBMeasures groups all db metrics.
var DefaultHostEnv = "HARMONYQUERY_HOSTS"
var DefaultSchema = "curio"
var ITestUpgradeFunc func(*pgxpool.Pool, string, string)
Functions ¶
func IsErrDDLConflict ¶
IsErrDDLConflict returns true if the error is a DDL conflict (object already exists or doesn't exist)
func IsErrSerialization ¶
func IsErrUniqueContraint ¶
func Iter ¶
func Iter[T any](db *DB, ctx context.Context, errOut *error, sql rawStringOnly, args ...any) iter.Seq[T]
Iter returns an iterator that scans each row into a value of type T. T is typically a struct with fields matching the query's columns (via name or `db` tags), but a single-column result can scan into a primitive type. The underlying cursor is closed automatically when the iterator finishes or the caller breaks out of the loop.
On any error (query, scan, or cursor), iteration stops and *errOut is set.
Example:
var err error
for user := range harmonyquery.Iter[User](db, ctx, &err, "SELECT name, id FROM users WHERE active = $1", true) {
fmt.Println(user.Name, user.ID)
}
if err != nil {
return err
}
func TxIter ¶
TxIter is the transaction variant of Iter. It returns an iterator that scans each row into a value of type T within an open transaction.
Example:
db.BeginTransaction(ctx, func(tx *Tx) (bool, error) {
var err error
for user := range harmonyquery.TxIter[User](tx, &err, "SELECT name, id FROM users") {
fmt.Println(user.Name, user.ID)
}
return err == nil, err
})
Types ¶
type Config ¶
type Config struct {
// HOSTS is a list of hostnames to nodes running YugabyteDB
// in a cluster. Only 1 is required
Hosts []string
// The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default.
Username string
// The password for the related username. Blank for default.
Password string
// The database (logical partition) within Yugabyte. Blank for default.
Database string
// The port to find Yugabyte. Blank for default.
Port string
// Load Balance the connection over multiple nodes
LoadBalance bool
// SSL Mode for the connection
SSLMode string
// Schema to use for the connection
Schema string
// ApplicationName is sent to PostgreSQL as the application_name connection parameter,
// visible in pg_stat_activity. Defaults to the binary name (os.Args[0]).
ApplicationName string
SqlEmbedFS *embed.FS
DowngradeEmbedFS *embed.FS
ITestID ITestID
*PoolConfig // Set all or nothing. We use every value.
}
type DB ¶
type DB struct {
BTFPOnce sync.Once
BTFP uintptr // A PC only in your stack when you call BeginTransaction()
// contains filtered or unexported fields
}
func New ¶
func New(hosts []string, username, password, database, port string, loadBalance bool, itestID ITestID) (*DB, error)
New is to be called once per binary to establish the pool. log() is for errors. It returns an upgraded database's connection. This entry point serves both production and integration tests, so it's more DI.
func NewFromConfig ¶
func (*DB) BeginTransaction ¶
func (db *DB) BeginTransaction(ctx context.Context, f func(*Tx) (commit bool, err error), opt ...TransactionOption) (didCommit bool, retErr error)
BeginTransaction is how you can access transactions using this library. The entire transaction happens in the function passed in. The return must be true or a rollback will occur. Be sure to test the error for IsErrSerialization() if you want to retry
when there is a DB serialization error.
func (*DB) DowngradeTo ¶
DowngradeTo downgrades the database schema to a previous date (when an upgrade was applied). Note: these dates (YYYYMMDD) are not the SQL date but the date the user did an upgrade.
func (*DB) Exec ¶
Exec executes changes (INSERT, DELETE, or UPDATE). Note, for CREATE & DROP please keep these permanent and express them in the ./sql/ files (next number).
func (*DB) GetRoutableIP ¶
func (*DB) ITestDeleteAll ¶
func (db *DB) ITestDeleteAll()
ITestDeleteAll will delete everything created for "this" integration test. This must be called at the end of each integration test.
func (*DB) Query ¶
Query allows iterating returned values to save memory consumption with the downside of needing to `defer q.Close()`. For a simpler interface, try Select() Next() must be called to advance the row cursor, including the first time: Ex: q, err := db.Query(ctx, "SELECT id, name FROM users") handleError(err) defer q.Close()
for q.Next() {
var id int
var name string
handleError(q.Scan(&id, &name))
fmt.Println(id, name)
}
func (*DB) QueryRow ¶
QueryRow gets 1 row using column order matching. This is a timesaver for the special case of wanting the first row returned only. EX:
var name, pet string var ID = 123 err := db.QueryRow(ctx, "SELECT name, pet FROM users WHERE ID=?", ID).Scan(&name, &pet)
func (*DB) Select ¶
func (db *DB) Select(ctx context.Context, sliceOfStructPtr any, sql rawStringOnly, arguments ...any) error
Select multiple rows into a slice using name matching Ex:
type user struct {
Name string
ID int
Number string `db:"tel_no"`
}
var users []user
pet := "cat"
err := db.Select(ctx, &users, "SELECT name, id, tel_no FROM customers WHERE pet=?", pet)
type PoolConfig ¶
type Query ¶
type Query struct {
Qry
}
Query offers Next/Err/Close/Scan/Values
func (*Query) StructScan ¶
StructScan allows scanning a single row into a struct. This improves efficiency of processing large result sets by avoiding the need to allocate a slice of structs.
type TransactionOption ¶
type TransactionOption func(*TransactionOptions)
func OptionRetry ¶
func OptionRetry() TransactionOption
type TransactionOptions ¶
type TransactionOptions struct {
RetrySerializationError bool
}