pipefn

package module
v0.0.0-...-88a9c35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 4 Imported by: 0

README

pipefn

pipefn is a Go library for building lazy, functional-style streaming pipelines on top of iter.Seq.

status

This project is still a work in progress. Expect bugs and possible API changes as it evolves.

Use in production at your own risk.

Documentation

Overview

Package pipefn provides functional-style and composable transformations for iter.Seq, enabling streaming pipelines without intermediate buffering.

This package is built around the concept of Pipes, a Pipe[T] represents a lazily-evaluated stream of values of type T paired with an internal error channel.

All transformations (Map, FlatMap, Filter, and more) are provided as package-level functions. Each transformation returns a new Pipe, allowing pipelines to be composed through simple chaining. Values are only produced when the resulting iter.Seq is iterated, making all pipelines demand-driven.

Errors produced by any stage flow through the pipe’s internal error channel. All transformations inherit their input Pipe’s error channel, so errors automatically propagate through the pipeline and can be consumed alongside values when the pipeline is processed.

Pipes are consumed using the Results, Values, ForEach, Collect and CollectValues methods. Consuming a Pipe is a destructive operation, once a Pipe has been consumed, it cannot be reused or iterated again.

For more details on each type, function, and available transformation, please refer to the corresponding package-level documentation.

Example

Example demonstrates a relatively complex pipeline that parses items from (fake) event files

package main

import (
	"fmt"
	"iter"
	"strings"
	"sync"

	"github.com/KasperOmsK/pipefn"
)

type Item struct {
	ID string
}

type Event struct {
	Type  string
	Items []Item
}

// Example demonstrates a relatively complex pipeline that parses items from (fake) event files
func main() {
	// Wrap iter.Seqs into Pipes
	input1 := pipefn.From(IterateFile("events-2023.log"))
	input2 := pipefn.From(IterateFile("events-2024.log"))

	// Merge multiple pipes of the same type.
	p := pipefn.Merge(input1, input2)

	// Map applies deterministic transformations that cannot fail.
	// Transformations are simple functions, making it easy to reuse existing code.
	trimmed := pipefn.Map(p, strings.TrimSpace)

	// TryMap applies transformations that may fail.
	// Errors are forwarded to the Pipe's error channel.
	events := pipefn.TryMap(trimmed, func(line string) (Event, error) {
		return ParseEvent(line)
	})

	// All common FP-style transformation are available.

	purchases := pipefn.Filter(events, func(e Event) bool {
		return e.Type == "purchase"
	})

	items := pipefn.FlatMap(purchases, func(e Event) []Item {
		return e.Items
	})

	// TryMap can act as a validation filter with error reporting
	validItems := pipefn.TryMap(items, func(it Item) (Item, error) {
		if it.ID == "" {
			return Item{}, fmt.Errorf("missing item ID")
		}
		return it, nil
	})

	batches := pipefn.Chunk(validItems, 2)

	// Convert Pipe back to iter.Seq for consumption
	vals, errs := batches.Results()

	// Errors *must* be consumed concurrently to avoid blocking the pipeline.
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for err := range errs {
			fmt.Println("pipeline error:", err)
		}
	}()

	batchCount := 0
	for batch := range vals {
		fmt.Println("batch:", batchCount)
		err := ProcessBatch(batch)
		if err != nil {
			break // Stopping consumption halts the pipeline
		}
		batchCount++
	}
	wg.Wait()
}

func IterateFile(path string) iter.Seq[string] {
	return func(yield func(string) bool) {
		var lines []string

		switch path {
		case "events-2023.log":
			lines = []string{
				"purchase:1001,1002,1003",
				"refund:2001",
				"purchase:1004,1005",
				"purchase:", // invalid item (empty ID)
			}
		case "events-2024.log":
			lines = []string{
				"purchase:3001,3002",
				"invalid-line-without-colon",
				"purchase:3003",
			}
		default:
			lines = []string{}
		}

		for _, line := range lines {
			if !yield(line) {
				return
			}
		}
	}
}

func ParseEvent(line string) (Event, error) {
	parts := strings.SplitN(line, ":", 2)
	if len(parts) != 2 {
		return Event{}, fmt.Errorf("invalid event format: %q", line)
	}

	eventType := strings.TrimSpace(parts[0])
	rawItems := strings.TrimSpace(parts[1])

	var items []Item
	if rawItems == "" {
		return Event{}, fmt.Errorf("invalid event format: no items")
	}

	for _, id := range strings.Split(rawItems, ",") {
		items = append(items, Item{
			ID: strings.TrimSpace(id),
		})
	}

	return Event{
		Type:  eventType,
		Items: items,
	}, nil
}

func ProcessBatch(items []Item) error {
	for _, it := range items {
		fmt.Println(it)
	}
	return nil
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MapFunc

type MapFunc[In, Out any] func(in In) Out

MapFunc is a pure mapping function used by Map that transforms a value of type In into a value of type Out.

type Pipe

type Pipe[T any] struct {
	// contains filtered or unexported fields
}

Pipe represents a lazily-evaluated sequence of values of type T.

Each transformation (Map, TryMap, Flatten etc.) produces a new Pipe that wraps the previous one.

Errors generated by faillible stages (e.g. TryMap) are sent on the shared error channel.

A Pipe is consumed when its sequence is iterated, typically through Results, Values, or ForEach. Once consumed, a Pipe cannot be reused. Refer to each consuming method's documentation for details on their behavior.

func Chunk

func Chunk[T any](p Pipe[T], chunkSize int) Pipe[[]T]

Chunk groups incoming values into slices of the given size and returns a Pipe producing those slices.

The final chunk may be smaller than chunkSize.

Chunk panics if chunkSize is not positive.

Errors from the input Pipe are preserved.

func Filter

func Filter[T any](p Pipe[T], predicate Predicate[T]) Pipe[T]

Filter returns a Pipe that yields only the values for which predicate returns true.

Errors from the input Pipe are preserved.

func FlatMap

func FlatMap[In, Out any](p Pipe[In], fn MapFunc[In, []Out]) Pipe[Out]

FlatMap transforms each input value using fn and returns a Pipe producing the flattened output values.

FlatMap is equivalent to calling Flatten(Map(p, fn)).

Errors from the input Pipe are preserved.

func FlatTryMap

func FlatTryMap[In, Out any](p Pipe[In], fn TryMapFunc[In, []Out]) Pipe[Out]

FlatTryMap transforms each input value using fn and returns a Pipe producing the flattened output values.

Any non-nil error returned by fn is forwarded to the Pipe’s error channel.

FlatTryMap is equivalent to calling Flatten(TryMap(p, fn)).

Errors from the input Pipe are preserved.

func Flatten

func Flatten[T any](p Pipe[[]T]) Pipe[T]

Flatten converts a Pipe of slices into a Pipe of their elements, emitting the items of each slice in order.

Errors from the input Pipe are preserved.

func From

func From[T any](seq iter.Seq[T]) Pipe[T]

From creates a new Pipe that produces values from the provided iter.Seq.

func GroupBy

func GroupBy[T any, K comparable](p Pipe[T], keyFunc func(T) K) Pipe[[]T]

GroupBy groups consecutive input values according to a key function and returns a Pipe producing slices of those grouped values.

GroupBy does not reorder values; it relies on the input Pipe already being ordered by the grouping key if consistent grouping is desired.

In other words, Values are grouped only when they appear consecutively with the same key. When the key returned by keyFunc changes, the current group is emitted and a new group is started.

For example, given input values:

A, A, B, B, A

GroupBy will emit:

[A, A], [B, B], [A]

Errors from the input Pipe are preserved.

func GroupByAggregate

func GroupByAggregate[In any, K comparable, Out any](
	p Pipe[In],
	keyFunc func(In) K,
	initFunc func(first In) Out,
	updateFunc func(acc *Out, item In)) Pipe[Out]

GroupByAggregate groups input values by key and aggregates them using user-supplied initialization and update callbacks, producing one aggregated output value per group.

GroupByAggregate is equivalent to performing a GroupBy followed by a Map, but does so without allocating a slice for each group. This makes it preferred for pipelines where groups may be large.

initFunc is called when a new group starts. It receives the first value of the group and should returns the initial accumulator for that group.

updateFunc is called for each value in the current group. It receives a pointer to the accumulator and the current input value, and should updates the accumulator in place.

For example, to sum values in each group:

initFunc := func(v int) int {
    return 0 // start at 0
}

updateFunc := func(acc *int, v int) {
    *acc += v // add the value to the accumulator
}

Like GroupBy, GroupByAggregate does not reorder input values. The input Pipe must already be ordered by key if consistent aggregation per key is desired.

For example, with input values:

A1, A2, B1, B2, A3

GroupByAggregate will emit aggregated results for:

[A1, A2], [B1, B2], [A3]

Errors from the input Pipe are preserved.

func Map

func Map[In, Out any](p Pipe[In], fn MapFunc[In, Out]) Pipe[Out]

Map transforms each input value using fn and returns a new Pipe producing the mapped values.

Errors from the input Pipe are preserved.

func Merge

func Merge[T any](pipes ...Pipe[T]) Pipe[T]

Merge combines multiple pipes into a single pipe that yields all values produced by the input pipes.

Values from different pipes may appear in any order. Errors from all input pipes are merged into the returned pipe's error channel.

func TryMap

func TryMap[In, Out any](p Pipe[In], fn TryMapFunc[In, Out]) Pipe[Out]

TryMap transforms each input value using fn, forwarding any non-nil errors onto the Pipe's error channel and yielding only successful results.

Errors from the input Pipe are preserved.

func (Pipe[T]) Collect

func (p Pipe[T]) Collect() ([]T, []error)

Collect consumes the entire Pipe and returns two slices containing all emitted values and all emitted errors, respectively.

Values are collected in the order they are produced by the Pipe. Errors are collected in the order they are emitted.

Collect fully drains the Pipe; after calling it, the Pipe cannot be consumed again.

func (Pipe[T]) CollectValues

func (p Pipe[T]) CollectValues() []T

CollectValues consumes the entire Pipe and returns a slice containing all emitted values. Errors, if any, are ignored.

Values are collected in the order they are produced by the Pipe.

CollectValues fully drains the Pipe; after calling it, the Pipe cannot be consumed again.

func (Pipe[T]) ForEach

func (p Pipe[T]) ForEach(consumeFn func(item T), errorFn func(err error))

ForEach consumes all values from the Pipe, invoking consumeFn for each.

Errors produced by the pipe are forwarded to errorFn.

consumeFn and errorFn are executed concurrently. If shared state is accessed, the caller is responsible for ensuring proper synchronization.

ForEach blocks until all values and errors have been processed.

func (Pipe[T]) Results

func (p Pipe[T]) Results() (iter.Seq[T], <-chan error)

Results returns an iterator that yields the values produced by p, and a channel that emits the errors produced by p.

Iterating over the values iterator consumes p. Once iteration begins, p cannot be reused, restarted, or iterated again.

Callers must drain the errors chan. If errors are not consumed, the pipeline may block when a stage attempts to send an error.

Callers that wish to consume a pipe without having to handle errors should use p.Values() instead.

All errors emited by Results will be of type *PipeError.

Typical usage:

values, errs := p.Results()

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    for err := range errs {
	perr := (*PipeError)
        log.Printf("pipeline error: item %v: %s", perr.Item, perr.Reason)
    }
}()

for v := range values {
    fmt.Println("value:", v)
}
wg.Wait()
// here, both values and errs have been fully drained.

func (*Pipe[T]) Tap

func (p *Pipe[T]) Tap(tapFn func(T))

Tap modifies p so that tapFn is called for each element that passes through p. The tap function is called before the element is yielded.

Calling Tap multiple times will chain tap functions in the order they were added.

Tap panics if tapFn is nil.

func (Pipe[T]) Values

func (p Pipe[T]) Values() (values iter.Seq[T])

Values returns the sequence of values produced by p.

Values behaves like Results, except that the error sequence is consumed internally and discarded. This allows callers who do not care about errors to ignore them while still ensuring the pipe completes correctly.

Iterating over the returned values sequence consumes p. Once iteration begins, p cannot be reused or iterated again.

Typical usage:

for v := range p.Values() {
	...
}

type PipeError

type PipeError struct {
	// The item that generated the error
	Item any
	// The actual error
	Reason error
}

PipeError represents an error that occured inside a pipeline

func (*PipeError) Error

func (pe *PipeError) Error() string

type Predicate

type Predicate[T any] func(item T) bool

Predicate represents a filtering function that returns true when the provided value should be included in the output stream.

type TryMapFunc

type TryMapFunc[In, Out any] func(in In) (Out, error)

TryMapFunc is a mapping function that may return an error.

Errors are forwarded to the Pipe's error channel while successful values continue through the pipeline.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL