orderedproc

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2025 License: MIT Imports: 6 Imported by: 0

README

orderedproc

Summary

The orderedproc module facilitates parallel processing of data while ensuring the results are output in the same order as the input is provided.

This is performed using batching, which is why there is a required parameter called batchSize. concurrency is the number of goroutines that will be used for processing (will call your procFunc).

import "github.com/cuberat-go/orderedproc"
    procFunc := func(item int) int {
        return item * 2
    }

    proc := orderedproc.NewOrderedProcessor(procFunc,
        batchSize, concurrency)

    go func() {
        for i := range size {
            proc.Add(i)
        }
        proc.Done()
    }()

    got := slices.Collect(proc.Results())

Calling Done() tells the processor that you have finished adding data. The Results() method returns an interator over the results.

Example:

    size := 7 // input will be 0..6

    procFunc := func(item int) int {
        return item * 2
    }

    proc := orderedproc.NewOrderedProcessor(procFunc, 5, 3)

    go func() {
        for i := range size {
            proc.Add(i)
        }
        proc.Done()
    }()

    got := slices.Collect(proc.Results())
    fmt.Printf("Results: %v\n", got)
    // Output:
    // Results: [0 2 4 6 8 10 12]

If you prefer to iterate over batches of results, call the ResultBatches() method.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type OrderedBatch

type OrderedBatch[IN_T, OUT_T any] struct {
	// Items []*batchItem[IN_T, OUT_T]
	Items []OUT_T
}

type OrderedProcessor

type OrderedProcessor[IN_T, OUT_T any] struct {
	// contains filtered or unexported fields
}

An OrderedProcessor processes items in batches with a specified concurrency, ensuring that the results are output in the same order as the input items. Create one using NewOrderedProcessor().

func NewOrderedProcessor

func NewOrderedProcessor[IN_T, OUT_T any](
	procFunc ProcFunc[IN_T, OUT_T],
	batchSize int,
	concurrency int,
) *OrderedProcessor[IN_T, OUT_T]

Creates a new OrderedProcessor.

func (*OrderedProcessor[IN_T, OUT_T]) Add

func (obp *OrderedProcessor[IN_T, OUT_T]) Add(item IN_T)

Adds an item to be processed.

func (*OrderedProcessor[IN_T, OUT_T]) Done

func (obp *OrderedProcessor[IN_T, OUT_T]) Done()

Signals that no more items will be added.

func (*OrderedProcessor[IN_T, OUT_T]) ResultBatches

func (obp *OrderedProcessor[IN_T, OUT_T]) ResultBatches() iter.Seq[[]OUT_T]

Returns an iterator over batches of results.

Example

func TestExample2(t *testing.T) {

package main

import (
	"fmt"
	"slices"

	"github.com/cuberat-go/orderedproc"
)

func main() {
	size := 7 // input will be 0..6

	procFunc := func(item int) int {
		return item * 2
	}

	proc := orderedproc.NewOrderedProcessor(procFunc, 5, 3)

	go func() {
		for i := range size {
			proc.Add(i)
		}
		proc.Done()
	}()

	got := slices.Collect(proc.ResultBatches())
	fmt.Printf("Results: %v\n", got)
}
Output:

Results: [[0 2 4 6 8] [10 12]]

func (*OrderedProcessor[IN_T, OUT_T]) Results

func (obp *OrderedProcessor[IN_T, OUT_T]) Results() iter.Seq[OUT_T]

Returns an iterator over individual results

Example

func TestExample(t *testing.T) {

package main

import (
	"fmt"
	"slices"

	"github.com/cuberat-go/orderedproc"
)

func main() {
	size := 7 // input will be 0..6

	procFunc := func(item int) int {
		return item * 2
	}

	proc := orderedproc.NewOrderedProcessor(procFunc, 5, 3)

	go func() {
		for i := range size {
			proc.Add(i)
		}
		proc.Done()
	}()

	got := slices.Collect(proc.Results())
	fmt.Printf("Results: %v\n", got)
}
Output:

Results: [0 2 4 6 8 10 12]

type ProcFunc

type ProcFunc[IN_T, OUT_T any] func(IN_T) OUT_T

Type definition for the user-provided processing function.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL