flow

package module
v0.0.0-...-3e4e331 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2025 License: MIT Imports: 4 Imported by: 0

README

Flow - Lazy Stream Processing for Go

A powerful, functional stream processing library for Go with lazy evaluation.

Features

  • Lazy evaluation - operations are only executed when needed
  • Universal ForEach - accepts ANY function through reflection
  • Chainable operations - fluent API for elegant code
  • Zero dependencies - only uses Go standard library

Installation

go get github.com/MirrexOne/Flow

Requires Go 1.23+ for iterator support.

Quick Start

package main

import (
    "fmt"
    . "github.com/MirrexOne/Flow"
)

func main() {
    // Multiple ways to create flows:
    
    // From slice
    s := []int{1, 2, 3, 4, 5}
    NewFlow(s).ForEach(fmt.Print)  // Output: 12345
    
    // Variadic constructor
    Of(1, 2, 3, 4, 5).ForEach(func(x int) {
        fmt.Print(x, " ")
    })  // Output: 1 2 3 4 5
    
    // Complex pipeline
    Values(1, 2, 3, 4, 5).
        Filter(func(x int) bool { return x%2 == 0 }).
        Map(func(x int) int { return x * x }).
        ForEach(func(x int) {
            fmt.Printf("Result: %d\n", x)
        })
}

Performance

Some Benchmarks
Key Metrics (AMD Ryzen 5 7600X) on my PC:
┌───────────────────────────────────────────────┐
│ Simple Operations (10 items)                  │
│   Default Loop:     3 ns/op   0 allocs        │
│   Flow Reduce:         91 ns/op   2 allocs    │
│   > Overhead: ~90ns                           │
│                                               │
│ Complex Pipeline (100 items)                  │
│   Traditional:         39 ns/op   0 allocs    │
│   Flow Pipeline:      758 ns/op   6 allocs    │
│   Flow Lazy:          311 ns/op   9 allocs    │
│   > Lazy is 2.4x faster than full pipeline    │
└───────────────────────────────────────────────┘
Why Flow is Fast
  1. Lazy Evaluation - Only processes what's needed
  2. Zero-copy operations - Minimal memory allocations
  3. Optimized hot paths - Critical sections hand-tuned

API Overview

Stream Creation
// Universal constructor - works with any input type
FlowOf[int](42)                         // Single value
FlowOf[int](1, 2, 3, 4, 5)              // Variadic arguments  
FlowOf[int]([]int{1, 2, 3})             // From slice
FlowOf[int]([3]int{1, 2, 3})            // From array
FlowOf[int](ch)                         // From channel
FlowOf[int](anotherFlow)                // From existing Flow
FlowOf[string](map[string]int{...})     // Map keys
FlowOf[int](map[string]int{...})        // Map values

// Alternative constructors
NewFlow([]int{1, 2, 3})        // From slice
Of(1, 2, 3)                    // Variadic arguments
Values(1, 2, 3)                // Alternative variadic
Single(42)                     // Single value
Empty[int]()                   // Empty flow

// Generators
Range(1, 10)                   // Numbers from 1 to 9
Infinite(func(i int) T)        // Infinite stream
FromChannel(ch)                // From channel
FromFunc(generator)            // Custom generator

// Backward compatibility
From([]int{1, 2, 3})           // Alias for NewFlow
Intermediate Operations (Lazy)

These operations return a new Flow and are not executed until a terminal operation is called:

.Filter(predicate)             // Keep matching elements
.Map(mapper)                   // Transform elements (same type)
.Take(n)                       // First n elements
.Skip(n)                       // Skip first n elements
.TakeWhile(predicate)          // Take while condition is true
.SkipWhile(predicate)          // Skip while condition is true
.Peek(action)                  // Debug/side effects
.Concat(other)                 // Append another flow
.Merge(others...)              // Merge multiple flows

// Standalone functions (type transformations)
MapTo(flow, mapper)            // Transform to different type
Distinct(flow)                 // Remove duplicates
FlatMap(flow, mapper)          // Flatten nested flows
Chunk(flow, size)              // Group into fixed-size chunks
Window(flow, size, step)       // Sliding/tumbling windows
Terminal Operations (Execute)

These operations consume the stream and produce a result:

.ForEach(fn)                   // Execute function for each element (ANY function!)
.ForEachFunc(fn)               // Type-safe version (faster)
.Collect()                     // Gather into slice
.Count()                       // Count elements
.Reduce(initial, reducer)      // Combine elements
.First()                       // Get first element
.Last()                        // Get last element
.AnyMatch(predicate)           // Check if any match
.AllMatch(predicate)           // Check if all match
.NoneMatch(predicate)          // Check if none match
.FindFirst(predicate)          // Find first matching element
.ToChannel(bufferSize)         // Convert to channel

// Standalone terminal operations
GroupBy(flow, keyFunc)         // Group by key into map
Partition(flow, predicate)     // Split into matching/non-matching

Complete Examples

Basic Pipeline
Range(1, 11).
    Filter(func(x int) bool { return x%2 == 0 }).
    Map(func(x int) int { return x * x }).
    Take(3).
    ForEach(func(x int) { fmt.Println(x) })
// Output: 4, 16, 36
Working with Structs
type Person struct {
    Name string
    Age  int
}

Of(
    Person{"Alice", 25},
    Person{"Bob", 30},
    Person{"Charlie", 35},
).
    Filter(func(p Person) bool { return p.Age > 25 }).
    ForEach(func(p Person) {
        fmt.Printf("%s is %d\n", p.Name, p.Age)
    })
Infinite Streams
// Fibonacci sequence
Infinite(func(i int) int {
    if i < 2 { return i }
    a, b := 0, 1
    for j := 2; j <= i; j++ {
        a, b = b, a+b
    }
    return b
}).Take(10).Collect()
// [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
Advanced Operations
// FlatMap
words := Of("hello", "world")
FlatMap(words, func(word string) Flow[rune] {
    return NewFlow([]rune(word))
}).Collect()  // ['h','e','l','l','o','w','o','r','l','d']

// Chunk
Chunk(Range(1, 11), 3).ForEach(func(chunk []int) {
    fmt.Println(chunk)
})  // [1,2,3] [4,5,6] [7,8,9] [10]

names := Of("Alice", "Bob")
ages := Of(25, 30)
Combine(names, ages).ForEach(func(pair Pair[string, int]) {
    fmt.Printf("%s: %d\n", pair.First, pair.Second)
})

// CombineWith for custom combination
CombineWith(names, ages, func(name string, age int) string {
    return fmt.Sprintf("%s is %d", name, age)
}).ForEach(fmt.Println)

// Merge multiple flows
flow1 := NewFlow([]int{1, 2, 3})
flow2 := NewFlow([]int{4, 5, 6})
flow1.Merge(flow2).Collect()  // [1, 2, 3, 4, 5, 6]

// GroupBy operation
people := []Person{{Name: "Alice", Age: 25}, {Name: "Bob", Age: 30}, {Name: "Charlie", Age: 25}}
byAge := GroupBy(NewFlow(people), func(p Person) int { return p.Age })
// Result: map[25:[{Alice 25} {Charlie 25}] 30:[{Bob 30}]]

// Partition operation
evens, odds := Partition(Range(1, 11), func(x int) bool { return x%2 == 0 })
// evens: [2, 4, 6, 8, 10], odds: [1, 3, 5, 7, 9]

// Window operation (sliding windows)
Window(Range(1, 6), 3, 1).ForEach(func(window []int) {
    fmt.Println(window)
})  // [1,2,3] [2,3,4] [3,4,5]

Performance Deep Dive

Detailed Benchmarks
Operation Time Memory Allocs vs Loop
Small Data (10 items)
Default loop* 2.3 ns 0 B 0 1.0x
Flow.Reduce() 53 ns 64 B 2 23x
Flow.Take(3) 100 ns 280 B 7 44x
Medium Data (100 items)
Default loop* 23 ns 0 B 0 1.0x
Flow full pipeline 465 ns 192 B 6 20x
Flow with lazy eval 161 ns 344 B 9 7x
Large Data (1000 items)
Default loop* 230 ns 0 B 0 1.0x
Filter 2,562 ns 112 B 4 11x
Map 2,976 ns 112 B 4 13x
Distinct 888 ns 704 B 11 4x
Chunk(10) 3,956 ns 8,232 B 106 17x

*Estimated for equivalent operations

When to Use Flow vs Loops
// USE FLOW when:
// - Readability matters
// - Complex transformations
// - Lazy evaluation needed
// - Working with streams
result := flow.NewFlow(data).
    Filter(isValid).
    Map(transform).
    Take(10).
    Collect()

// USE LOOPS when:
// - Ultra-hot path (< 100ns)
// - Simple iteration
// - Zero-alloc required
for _, v := range data {
    sum += v
}

License

MIT License

Author

Created by MirrexOne

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CollectAny

func CollectAny(f Flow[any, any]) []any

CollectAny collects Flow[any, any] into []any

func GroupBy

func GroupBy[T, R any, K comparable](f Flow[T, R], keyFunc func(T) K) map[K][]T

GroupBy groups elements by a key function. Returns a map where keys are the result of the keyFunc and values are slices of elements. This is a terminal operation that consumes the entire stream.

Example:

people := []Person{{Name: "Alice", Age: 25}, {Name: "Bob", Age: 30}, {Name: "Charlie", Age: 25}}
byAge := flow.GroupBy(flow.NewFlow(people), func(p Person) int { return p.Age })
// Result: map[25:[{Alice 25} {Charlie 25}] 30:[{Bob 30}]]

func Partition

func Partition[T, U any](f Flow[T, U], predicate func(T) bool) (matching []T, notMatching []T)

Partition splits a flow into two based on a predicate. Returns two slices: elements that match the predicate and elements that don't. This is a terminal operation that consumes the entire stream.

Example:

evens, odds := flow.Partition(flow.Range(1, 11), func(x int) bool { return x%2 == 0 })
// evens: [2, 4, 6, 8, 10]
// odds: [1, 3, 5, 7, 9]

func ReduceAny

func ReduceAny(f Flow[any, any], initial any, reducer func(accumulator, element any) any) any

ReduceAny reduces Flow[any, any] using any types

Types

type Flow

type Flow[T, R any] struct {
	// contains filtered or unexported fields
}

Flow represents a lazy stream of elements that can be processed functionally. The zero value is not usable; use constructor functions like From, Range, etc.

func Chunk

func Chunk[T, R any](f Flow[T, R], size int) Flow[[]T, []T]

Chunk groups elements into slices of specified size. The last chunk may have fewer elements if the stream size is not divisible by the chunk size.

Example:

chunks := flow.Chunk(flow.Range(1, 11), 3)
// Produces: [1,2,3], [4,5,6], [7,8,9], [10]

func Combine

func Combine[T, U, R1, R2 any](f1 Flow[T, R1], f2 Flow[U, R2]) Flow[Pair[T, U], Pair[T, U]]

Combine merges two flows into pairs. The resulting flow ends when either input flow ends.

Example:

names := flow.NewFlow([]string{"Alice", "Bob"})
ages := flow.NewFlow([]int{25, 30})
pairs := flow.Combine(names, ages)
// Produces: {First: "Alice", Second: 25}, {First: "Bob", Second: 30}

func CombineWith

func CombineWith[T, U, V, R1, R2 any](f1 Flow[T, R1], f2 Flow[U, R2], combiner func(T, U) V) Flow[V, V]

CombineWith merges two flows using a custom combiner function. This provides more flexibility than Combine by allowing custom result types. The resulting flow ends when either input flow ends.

Example:

names := flow.NewFlow([]string{"Alice", "Bob"})
ages := flow.NewFlow([]int{25, 30})
people := flow.CombineWith(names, ages, func(name string, age int) string {
    return fmt.Sprintf("%s is %d years old", name, age)
})
// Produces: "Alice is 25 years old", "Bob is 30 years old"

func Distinct

func Distinct[T comparable, R any](f Flow[T, R]) Flow[T, R]

Distinct removes duplicate elements from the stream. Requires the type to be comparable. This is a lazy operation but requires memory to track seen elements.

Example:

unique := flow.Distinct(flow.NewFlow([]int{1, 2, 2, 3, 3, 3, 4}))

func Empty

func Empty[T any]() Flow[T, T]

Empty creates an empty Flow with no elements.

Example:

flow.Empty[int]().Count() // Returns 0

func FlatMap

func FlatMap[T, U, R1, R2 any](f Flow[T, R1], mapper func(T) Flow[U, R2]) Flow[U, R2]

FlatMap transforms each element to a Flow and flattens the results. Useful for working with nested structures.

Example:

words := flow.NewFlow([]string{"hello", "world"})
letters := flow.FlatMap(words, func(word string) flow.Flow[rune] {
    return flow.NewFlow([]rune(word))
})

func FlowOf

func FlowOf[T any](source any, rest ...any) Flow[T, T]

FlowOf creates a Flow from various input types using reflection. It's a universal constructor that intelligently handles:

  • single values: FlowOf(42)
  • variadic arguments: FlowOf(1, 2, 3)
  • slices: FlowOf([]int{1, 2, 3})
  • channels: FlowOf(ch)
  • existing Flows: FlowOf(anotherFlow)
  • arrays: FlowOf([3]int{1, 2, 3})
  • maps (keys): FlowOf(map[string]int{"a": 1})

The type parameter T must be explicitly specified.

Example:

FlowOf[int](42)                     // single value
FlowOf[int](1, 2, 3)                // variadic
FlowOf[int]([]int{1, 2, 3})         // slice
FlowOf[string](ch)                  // channel
FlowOf[Person](existingFlow)        // existing flow

func FromChannel

func FromChannel[T any](ch <-chan T) Flow[T, T]

FromChannel creates a Flow from a channel. The Flow will consume values from the channel until it's closed.

Example:

ch := make(chan int)
go func() {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch)
}()
flow.FromChannel(ch).ForEach(fmt.Println)

func FromFunc

func FromFunc[T any](generator func(yield func(T, T) bool)) Flow[T, T]

FromFunc creates a Flow from a generator function. The generator should call yield for each element and return when yield returns false.

Example:

flow.FromFunc(func(yield func(int) bool) {
    for i := 0; i < 10; i++ {
        if !yield(i) {
            return
        }
    }
})

func FromSlice

func FromSlice[T any](values []T) Flow[T, T]

func GroupByFlow

func GroupByFlow[T, R any, K comparable](f Flow[T, R], keyFunc func(T) K) Flow[KeyValue[K, []T], KeyValue[K, []T]]

GroupByFlow is a lazy version of GroupBy that returns a Flow of groups. Each group is represented as a KeyValue pair containing the key and slice of values. This is useful when you want to process groups lazily.

Example:

people := []Person{{Name: "Alice", Age: 25}, {Name: "Bob", Age: 30}, {Name: "Charlie", Age: 25}}
groups := flow.GroupByFlow(flow.NewFlow(people), func(p Person) int { return p.Age })
groups.ForEach(func(kv KeyValue[int, []Person]) {
    fmt.Printf("Age %d: %v\n", kv.Key, kv.Value)
})

func Infinite

func Infinite[T any](generator func(index int) T) Flow[T, T]

Infinite creates an infinite Flow using a generator function. The generator receives the current index starting from 0. Use Take() or other limiting operations to avoid infinite loops.

Example:

flow.Infinite(func(i int) int { return i * i }).Take(5).Collect()
// Returns: [0, 1, 4, 9, 16]

func MapTo

func MapTo[T, U, R any](f Flow[T, R], mapper func(T) U) Flow[U, U]

MapTo transforms each element to a different type. This is a lazy operation - the mapper is not called until the stream is consumed. Since Go doesn't support method-level type parameters, this is a standalone function.

Example:

strings := flow.MapTo(flow.Range(1, 6), func(x int) string {
    return fmt.Sprintf("Number: %d", x)
})

func Merge

func Merge[T, R any](flows ...Flow[T, R]) Flow[T, R]

Merge combines multiple flows into a single flow. Unlike Combine, this concatenates flows sequentially rather than pairing elements. Elements from all flows are yielded in the order they appear. Can be called without arguments, in which case it returns an empty flow.

Example:

flow1 := flow.NewFlow([]int{1, 2, 3})
flow2 := flow.NewFlow([]int{4, 5, 6})
flow3 := flow.NewFlow([]int{7, 8, 9})
merged := flow.Merge(flow1, flow2, flow3)
// Produces: 1, 2, 3, 4, 5, 6, 7, 8, 9

// Can also chain with method:
merged2 := flow1.Merge(flow2, flow3)
// Produces: 1, 2, 3, 4, 5, 6, 7, 8, 9

func NewFlow

func NewFlow[T any](values []T) Flow[T, T]

NewFlow creates a new Flow from a slice. The slice is not copied, so modifications to it may affect the stream.

Example:

numbers := []int{1, 2, 3, 4, 5}
flow.NewFlow(numbers).ForEach(fmt.Println)

func Of

func Of[T any](values ...T) Flow[T, T]

Of creates a Flow from variadic arguments.

Example:

flow.Of(1, 2, 3, 4, 5).ForEach(fmt.Println)
flow.Of("hello", "world").ForEach(fmt.Println)

func Range

func Range(start, end int) Flow[int, int]

Range creates a Flow of integers from start (inclusive) to end (exclusive).

Example:

flow.Range(1, 6).ForEach(fmt.Print) // Output: 12345

func Single

func Single[T any](value T) Flow[T, T]

Single creates a Flow with a single value.

Example:

flow.Single(42).ForEach(fmt.Println)

func Values

func Values[T any](values ...T) Flow[T, T]

func Window

func Window[T, U any](f Flow[T, U], size, step int) Flow[[]T, U]

Window creates sliding windows of elements. Each window contains 'size' elements, and windows overlap by 'size-step' elements. If step equals size, windows don't overlap (tumbling windows).

Example:

// Sliding window with size=3, step=1
windows := flow.Window(flow.Range(1, 6), 3, 1)
// Produces: [1,2,3], [2,3,4], [3,4,5]

// Tumbling window with size=3, step=3
windows := flow.Window(flow.Range(1, 10), 3, 3)
// Produces: [1,2,3], [4,5,6], [7,8,9]

func (Flow[T, R]) AllMatch

func (f Flow[T, R]) AllMatch(predicate func(T) bool) bool

AllMatch checks if all elements match the predicate. This is a TERMINAL operation - it stops at the first non-match.

Example:

allPositive := flow.NewFlow([]int{1, 2, 3}).AllMatch(func(x int) bool { return x > 0 })

func (Flow[T, R]) AnyMatch

func (f Flow[T, R]) AnyMatch(predicate func(T) bool) bool

AnyMatch checks if any element matches the predicate. This is a TERMINAL operation - it stops at the first match.

Example:

hasEven := flow.NewFlow([]int{1, 3, 5, 6}).AnyMatch(func(x int) bool { return x%2 == 0 })

func (Flow[T, R]) Collect

func (f Flow[T, R]) Collect() []T

Collect gathers all elements into a slice. This is a TERMINAL operation - it consumes the entire stream.

Example:

numbers := flow.Range(1, 6).Collect() // Returns []int{1, 2, 3, 4, 5}

func (Flow[T, R]) Concat

func (f Flow[T, R]) Concat(other Flow[T, R]) Flow[T, R]

Concat appends another Flow to this one. This is a lazy operation - the second flow is not consumed until needed.

Example:

first := flow.NewFlow([]int{1, 2, 3})
second := flow.NewFlow([]int{4, 5, 6})
combined := first.Concat(second) // [1, 2, 3, 4, 5, 6]

func (Flow[T, R]) Count

func (f Flow[T, R]) Count() int

Count returns the number of elements in the stream. This is a TERMINAL operation - it consumes the entire stream.

Example:

count := flow.Range(1, 100).Filter(func(x int) bool { return x%7 == 0 }).Count()

func (Flow[T, R]) Filter

func (f Flow[T, R]) Filter(predicate func(T) bool) Flow[T, R]

Filter returns a Flow containing only elements that match the predicate. This is a lazy operation - the predicate is not called until the stream is consumed.

Example:

flow.Range(1, 10).Filter(func(x int) bool { return x%2 == 0 })

func (Flow[T, R]) FindFirst

func (f Flow[T, R]) FindFirst(predicate func(T) bool) (T, bool)

FindFirst returns the first element matching the predicate. This is a TERMINAL operation - it stops at the first match.

Example:

if val, ok := flow.Range(1, 20).FindFirst(func(x int) bool { return x > 10 }); ok {
    fmt.Printf("Found: %d\n", val)
}

func (Flow[T, R]) First

func (f Flow[T, R]) First() (T, bool)

First returns the first element if it exists. This is a TERMINAL operation - it may consume only one element.

Example:

if val, ok := flow.Range(10, 20).First(); ok {
    fmt.Printf("First: %d\n", val)
}

func (Flow[T, R]) ForEach

func (f Flow[T, R]) ForEach(fn any)

ForEach executes the given function for each element in the stream. This is a TERMINAL operation - it consumes the stream immediately. Accepts ANY function through reflection for maximum flexibility. For better performance with known function types, use ForEachFunc.

Example:

flow.NewFlow([]int{1, 2, 3}).ForEach(fmt.Print)        // Works with fmt.Print!
flow.NewFlow([]int{1, 2, 3}).ForEach(fmt.Println)      // Works with fmt.Println!
flow.NewFlow([]int{1, 2, 3}).ForEach(customFunc)       // Works with any function!

func (Flow[T, R]) ForEachFunc

func (f Flow[T, R]) ForEachFunc(action func(T))

ForEachFunc is a type-safe, optimized version of ForEach. Use this for better performance when the function type is known at compile time. This version doesn't use reflection and is significantly faster.

Example:

flow.Range(1, 6).ForEachFunc(func(x int) {
    fmt.Println(x * x)
})

func (Flow[T, R]) Last

func (f Flow[T, R]) Last() (T, bool)

Last returns the last element if it exists. This is a TERMINAL operation - it consumes the entire stream.

Example:

if val, ok := flow.Range(10, 20).Last(); ok {
    fmt.Printf("Last: %d\n", val)
}

func (Flow[T, R]) Limit

func (f Flow[T, R]) Limit(n int) Flow[T, R]

func (Flow[T, R]) Map

func (f Flow[T, R]) Map(mapper any) Flow[any, any]

Map transforms each element using the provided mapper function. Returns Flow[any, any] to allow chaining with any output type. Optimized with minimal type assertions for common patterns.

Example:

flow.NewFlow([]int{1, 2, 3}).Map(func(x int) int { return x * 2 }).CollectAny()
flow.NewFlow([]int{1, 2, 3}).Map(func(x int) string { return strconv.Itoa(x) }).CollectAny()

func (Flow[T, R]) Merge

func (f Flow[T, R]) Merge(others ...Flow[T, R]) Flow[T, R]

Merge combines this flow with others into a single flow. Similar to Concat, but can merge multiple flows at once. This is a lazy operation - flows are consumed sequentially.

Example:

flow1 := flow.NewFlow([]int{1, 2, 3})
flow2 := flow.NewFlow([]int{4, 5, 6})
flow3 := flow.NewFlow([]int{7, 8, 9})
merged := flow1.Merge(flow2, flow3) // [1, 2, 3, 4, 5, 6, 7, 8, 9]

// Can also be used without arguments (returns the same flow)
same := flow1.Merge() // [1, 2, 3]

func (Flow[T, R]) NoneMatch

func (f Flow[T, R]) NoneMatch(predicate func(T) bool) bool

NoneMatch checks if no elements match the predicate. This is a TERMINAL operation - it stops at the first match.

Example:

noneNegative := flow.NewFlow([]int{1, 2, 3}).NoneMatch(func(x int) bool { return x < 0 })

func (Flow[T, R]) Offset

func (f Flow[T, R]) Offset(n int) Flow[T, R]

Offset is an alias for Skip. It discards the first n elements from the stream. If the stream has fewer than n elements, an empty stream is returned.

Example:

flow.Range(1, 10).Offset(5) // Stream of 6, 7, 8, 9

func (Flow[T, R]) Peek

func (f Flow[T, R]) Peek(action func(T)) Flow[T, R]

Peek performs an action on each element without consuming the stream. Useful for debugging or side effects like logging. The action is called lazily as elements are consumed.

Example:

flow.Range(1, 6).
    Peek(func(x int) { fmt.Printf("Processing: %d\n", x) }).
    Filter(func(x int) bool { return x%2 == 0 }).
    Collect()

func (Flow[T, R]) Reduce

func (f Flow[T, R]) Reduce(initial T, reducer func(accumulator, element T) T) T

Reduce combines all elements using the reducer function. This is a TERMINAL operation - it consumes the entire stream. The initial value is used as the starting accumulator.

Example:

sum := flow.Range(1, 6).Reduce(0, func(acc, x int) int { return acc + x })
product := flow.Range(1, 6).Reduce(1, func(acc, x int) int { return acc * x })

func (Flow[T, R]) Skip

func (f Flow[T, R]) Skip(n int) Flow[T, R]

Skip discards the first n elements from the stream. If the stream has fewer than n elements, an empty stream is returned.

Example:

flow.Range(1, 10).Skip(5) // Stream of 6, 7, 8, 9

func (Flow[T, R]) SkipWhile

func (f Flow[T, R]) SkipWhile(predicate func(T) bool) Flow[T, R]

SkipWhile skips elements while the predicate is true. This is a lazy operation - starts yielding when predicate returns false.

Example:

flow.Range(1, 10).SkipWhile(func(x int) bool { return x < 5 })

func (Flow[T, R]) Take

func (f Flow[T, R]) Take(n int) Flow[T, R]

Take limits the stream to the first n elements. If the stream has fewer than n elements, all elements are included.

Example:

flow.Infinite(func(i int) int { return i }).Take(5)

func (Flow[T, R]) TakeWhile

func (f Flow[T, R]) TakeWhile(predicate func(T) bool) Flow[T, R]

TakeWhile takes elements while the predicate is true. This is a lazy operation - stops when predicate returns false.

Example:

flow.Range(1, 10).TakeWhile(func(x int) bool { return x < 5 })

func (Flow[T, R]) ToChannel

func (f Flow[T, R]) ToChannel(bufferSize int) <-chan T

ToChannel sends all elements to a new channel. The channel is created with the specified buffer size. The channel is closed after all elements are sent. This is a TERMINAL operation that runs in a goroutine.

Example:

ch := flow.Range(1, 6).ToChannel(2)
for val := range ch {
    fmt.Println(val)
}

type KeyValue

type KeyValue[K comparable, V any] struct {
	Key   K
	Value V
}

KeyValue represents a key-value pair. Used by GroupByFlow and other key-value operations.

type Pair

type Pair[T, U any] struct {
	First  T
	Second U
}

Pair represents a pair of values. Used by the Combine function.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL