dataplane

package
v0.0.0-...-120320f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: MIT Imports: 41 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CurrentBufferFormatVersion = "0.3.0"

	BufferStatusComplete BufferStatus = "complete"
	BufferStatusFailed   BufferStatus = "failed"

	HashChainHeader  = "x-ms-meta-cumulative_hash_chain"
	ContentMD5Header = "Content-MD5"
	ErrorCodeHeader  = "x-ms-error-code"

	StartMetadataBlobName = ".bufferstart"
	EndMetadataBlobName   = ".bufferend"
)
View Source
const (
	DefaultReadDop  = 32
	MaxRetries      = 6
	ResponseTimeout = 100 * time.Second
)
View Source
const (
	DefaultWriteDop              = 16
	DefaultBlockSize             = 4 * 1024 * 1024
	EncodedHashChainInitialValue = "MDAwMDAwMDAwMDAwMDAwMA=="
)
View Source
const (
	AuthorizationPermissionMismatchErrorCode = "AuthorizationPermissionMismatch"
)
View Source
const CurrentSasVersion = "0.1.0"

Variables

View Source
var (
	ErrNotFound          = errors.New("not found")
	ErrBufferFailedState = errors.New("the buffer is in a permanently failed state")
)
View Source
var (
	ErrInvalidSas          = errors.New("the SAS token is not valid")
	ErrSasActionNotAllowed = errors.New("the requested action is not permissed with the given SAS token")
)
View Source
var (
	DefaultFlushInterval = 1 * time.Second
)
View Source
var (
	ErrAccessStringNotUrl = errors.New("the buffer access string is invalid. It must be a URL or the path of a file whose contents is a URL")
)

Functions

func AddCommonBlobRequestHeaders

func AddCommonBlobRequestHeaders(header http.Header)

func DownloadBlob

func DownloadBlob(ctx context.Context, metrics *TransferMetrics, containerClient *ContainerClient, blobPath string, waitForBlob *atomic.Bool, blobNumber *int64, finalBlobNumber *atomic.Int64) (*readData, *time.Time, error)

func Gen

func Gen(byteCount int64, outputWriter io.Writer) error

func GetBufferAccessUrlFromFile

func GetBufferAccessUrlFromFile(filename string) (*url.URL, error)

func GetFreePort

func GetFreePort() (port int, err error)

func MakeBlobPath

func MakeBlobPath(blobNumber int64) string

func NewSshTunnelPool

func NewSshTunnelPool(ctx context.Context, sshParams client.SshParams, count int) *sshTunnelPool

func ParseBufferAccessUrl

func ParseBufferAccessUrl(accessString string) (*url.URL, error)

func Read

func Read(ctx context.Context, container *Container, outputWriter io.Writer, options ...ReadOption) error

func RelayInputServer

func RelayInputServer(
	ctx context.Context,
	listeners []net.Listener,
	bufferId string,
	outputWriter io.Writer,
	validateSignatureFunc ValidateSignatureFunc,
) error

func RelayOutputServer

func RelayOutputServer(
	ctx context.Context,
	listeners []net.Listener,
	containerId string,
	inputReaderChan <-chan ValueOrError[io.ReadCloser],
	validateSignatureFunc ValidateSignatureFunc,
) error

func RequestNewBufferAccessUrl

func RequestNewBufferAccessUrl(ctx context.Context, bufferId string, writable bool, accessTtl string) (*url.URL, error)

func ValidateSas

func ValidateSas(containerId string, action SasAction, queryString url.Values, validateSignature ValidateSignatureFunc) error

func Write

func Write(ctx context.Context, container *Container, inputReader io.Reader, options ...WriteOption) error

If invalidHashChain is set to true, the value of the hash chain attached to the blob will always be the Inital Value. This should only be set for testing.

Types

type BufferBlob

type BufferBlob struct {
	BlobNumber int64
	Contents   []byte
	Error      error

	// For Writing
	PreviousCumulativeHash chan string
	CurrentCumulativeHash  chan string

	// For Reading
	EncodedMD5Hash      string
	EncodedMD5ChainHash string
}

type BufferEndMetadata

type BufferEndMetadata struct {
	Status BufferStatus `json:"status"`
}

type BufferStartMetadata

type BufferStartMetadata struct {
	Version string `json:"version"`
}

type BufferStatus

type BufferStatus string

type Container

type Container struct {
	// contains filtered or unexported fields
}

func NewContainer

func NewContainer(accessUrl *url.URL) *Container

func NewContainerFromAccessFile

func NewContainerFromAccessFile(ctx context.Context, filename string) (*Container, error)

func NewContainerFromAccessString

func NewContainerFromAccessString(ctx context.Context, accessString string) (*Container, error)

func NewContainerFromBufferId

func NewContainerFromBufferId(ctx context.Context, bufferId string, writeable bool, accessTtl string) (*Container, error)

func (*Container) GetContainerName

func (c *Container) GetContainerName() string

func (*Container) NewContainerClient

func (c *Container) NewContainerClient(httpClient *retryablehttp.Client) *ContainerClient

func (*Container) Scheme

func (c *Container) Scheme() string

func (*Container) SupportsRelay

func (c *Container) SupportsRelay() bool

type ContainerClient

type ContainerClient struct {
	// contains filtered or unexported fields
}

func (*ContainerClient) Do

func (*ContainerClient) NewNonRetryableRequestWithRelativeUrl

func (c *ContainerClient) NewNonRetryableRequestWithRelativeUrl(ctx context.Context, method string, relativeUrl string, body io.Reader) *http.Request

func (*ContainerClient) NewRequestWithRelativeUrl

func (c *ContainerClient) NewRequestWithRelativeUrl(ctx context.Context, method string, relativeUrl string, body any) *retryablehttp.Request

type DownloadProgressReader

type DownloadProgressReader struct {
	Reader          io.ReadCloser
	TransferMetrics *TransferMetrics
}

func (*DownloadProgressReader) Close

func (pr *DownloadProgressReader) Close() error

func (*DownloadProgressReader) Read

func (pr *DownloadProgressReader) Read(p []byte) (int, error)

type InvalidAccessUrlError

type InvalidAccessUrlError struct {
	Reason string
}

func (*InvalidAccessUrlError) Error

func (e *InvalidAccessUrlError) Error() string

type MergedContext

type MergedContext struct {
	context.Context // The context that is is used for deadlines and cancellation
	// contains filtered or unexported fields
}

func (*MergedContext) Value

func (c *MergedContext) Value(key any) any

type PartiallyBufferedReader

type PartiallyBufferedReader struct {
	io.Reader
	// contains filtered or unexported fields
}

An io.Reader that stores the first N bytes read from the underlying reader as they are read so that it can be rewound and read again, if <= N bytes were read.

func NewPartiallyBufferedReader

func NewPartiallyBufferedReader(r io.Reader, capacity int) *PartiallyBufferedReader

func (*PartiallyBufferedReader) Read

func (r *PartiallyBufferedReader) Read(p []byte) (n int, err error)

func (*PartiallyBufferedReader) Rewind

func (r *PartiallyBufferedReader) Rewind() error

type ReadOption

type ReadOption func(o *readOptions)

func WithReadDop

func WithReadDop(dop int) ReadOption

func WithReadHttpClient

func WithReadHttpClient(httpClient *retryablehttp.Client) ReadOption

type ReaderWithMetrics

type ReaderWithMetrics struct {
	// contains filtered or unexported fields
}

func (*ReaderWithMetrics) Read

func (c *ReaderWithMetrics) Read(p []byte) (n int, err error)

type SasAction

type SasAction int
const (
	SasActionRead   SasAction = iota
	SasActionCreate SasAction = iota
)

type TransferMetrics

type TransferMetrics struct {
	// contains filtered or unexported fields
}

func NewTransferMetrics

func NewTransferMetrics(ctx context.Context) *TransferMetrics

func (*TransferMetrics) EnsureStarted

func (ts *TransferMetrics) EnsureStarted(startTime *time.Time)

func (*TransferMetrics) Stop

func (ts *TransferMetrics) Stop()

func (*TransferMetrics) UpdateCompleted

func (ts *TransferMetrics) UpdateCompleted(byteCount uint64, completedBuffers uint64)

Called when a buffer or buffer have been completely transferred

func (*TransferMetrics) UpdateInFlight

func (ts *TransferMetrics) UpdateInFlight(byteCount uint64)

Called when data HTTP body is being read or written. Note that because of retries, this

type UploadProgressReader

type UploadProgressReader struct {
	Reader          *bytes.Reader
	TransferMetrics *TransferMetrics
}

func (*UploadProgressReader) Len

func (ts *UploadProgressReader) Len() int

Implement retryablehttp.LenReader

func (*UploadProgressReader) Read

func (pr *UploadProgressReader) Read(p []byte) (int, error)

type ValidateSignatureFunc

type ValidateSignatureFunc func(data []byte, signature []byte) bool

func CreateSignatureValidationFunc

func CreateSignatureValidationFunc(primarySigningPublicKeyPath, secondarySigningPublicKeyPath string) (ValidateSignatureFunc, error)

type ValueOrError

type ValueOrError[A any] struct {
	Value A
	Err   error
}

type WriteOption

type WriteOption func(o *writeOptions)

func WithWriteBlockSize

func WithWriteBlockSize(blockSize int) WriteOption

func WithWriteDop

func WithWriteDop(dop int) WriteOption

func WithWriteFlushInterval

func WithWriteFlushInterval(flushInterval time.Duration) WriteOption

func WithWriteHttpClient

func WithWriteHttpClient(httpClient *retryablehttp.Client) WriteOption

func WithWriteMetadataEndWriteTimeout

func WithWriteMetadataEndWriteTimeout(timeout time.Duration) WriteOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL