disk

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2020 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package disk is a generated protocol buffer package.

It is generated from these files:
	disk.proto

It has these top-level messages:
	IndexEntries
	IndexEntry

Package disk provides local disk storage

Index

Constants

View Source
const (
	DefaultFileBufferSize  = 4 * 1024  // 4KB, same as bufio defaultBufSize, InfluxDB use 1MB
	IndexOfIndexUnitLength = 8 + 4 + 4 // id + offset + length
	FooterLength           = 25
)
View Source
const (
	// Version is current supported file format version
	Version byte = 1
	// MagicNumber is 'xephon-k', 8 bytes stored in big endian in uint64, used for identify file without relying on extension
	MagicNumber uint64 = 0x786570686F6E2D6B
)
View Source
const (
	MinimalSingleFileSize = 64 * 1024 * 1024
)

Variables

View Source
var (
	ErrNoData       = fmt.Errorf("no data written, can't write index")
	ErrNotFinalized = fmt.Errorf("index is not written, the file is unreadable")
	ErrFinalized    = fmt.Errorf("index is already written, this file can no longer be updated")
)
View Source
var (
	ErrInvalidLengthDisk = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowDisk   = fmt.Errorf("proto: integer overflow")
)

Functions

func DecodeBlock

func DecodeBlock(p []byte, meta common.SeriesMeta) (common.Series, error)

func EncodeBlockTo

func EncodeBlockTo(series common.Series, opt EncodingOption, w io.Writer) (int, error)

EncodeBlockTo encode the series data points and write to underlying writer It does not return bytes to avoid need less copying when concat encoded time and values

func IsMagic

func IsMagic(buf []byte) bool

func IsValidFormat

func IsValidFormat(buf []byte) bool

func MagicBytes

func MagicBytes() []byte

Types

type Config added in v0.1.0

type Config struct {
	Folder               string                 `yaml:"folder" json:"folder"`
	ConcurrentWriteFiles int                    `yaml:"concurrentWriteFiles" json:"concurrentWriteFiles"`
	SingleFileSize       int                    `yaml:"singleFileSize" json:"singleFileSize"`
	FileBufferSize       int                    `yaml:"fileBufferSize" json:"file_buffer_size"`
	Encoding             map[string]string      `yaml:"encoding" json:"encoding"`
	XXX                  map[string]interface{} `yaml:",inline"`
}

TODO: encoding

func NewConfig added in v0.1.0

func NewConfig() Config

func (*Config) Apply added in v0.1.0

func (c *Config) Apply() error

func (*Config) UnmarshalYAML added in v0.1.0

func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error

func (*Config) Validate added in v0.1.0

func (c *Config) Validate() error

type DataFileIndexWriter

type DataFileIndexWriter interface {
	// TODO: we need to record information like int/double, precision, min, max time etc.
	Add(series common.Series, offset uint64, size uint32, minTime int64, maxTime int64) error
	SortedID() []common.SeriesID
	Len() int
	// NOTE: use must means we will panic if use non exist ID
	MustEntries(id common.SeriesID) *IndexEntries
	WriteAll(io.Writer) (length uint32, indexOffset uint32, errs error)
}

type DataFileReader

type DataFileReader interface {
	ReadIndexOfIndexes() error
	ReadAllIndexEntries() error
	SeriesCount() int
	Close() error
	PrintAll()
}

type DataFileWriter

type DataFileWriter interface {
	// WriteHeader writes the magic number and version, it will be called by WriteSeries automatically for once
	WriteHeader() error
	WriteSeries(series common.Series) error
	// Finalized returns if the index is written and the file can be closed
	Finalized() bool
	// WriteIndex writes index data and trailing magic number into the end of the file (buffer). You can only call it once
	WriteIndex() error
	// Flush flushes data in the buffer to disk
	Flush() error
	// Close closes the underlying file, the file must be finalized, otherwise it can't be read
	Close() error
}

DataFileWriter writes data to disk, index is at the end of the file for locating data blocks. It is NOT thread safe

type EncodingOption

type EncodingOption struct {
	TimeCodec        byte
	IntValueCodec    byte
	DoubleValueCodec byte
}

TODO: allow adaptive encoding

func NewEncodingOption

func NewEncodingOption(options ...func(*EncodingOption)) (EncodingOption, error)

https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis

type IndexEntries

type IndexEntries struct {
	// TODO: index entries may have the overall aggregation of all the index entries, like min, max, time
	common.SeriesMeta `protobuf:"bytes,1,opt,name=Meta,embedded=Meta" json:"Meta"`
	Entries           []IndexEntry `protobuf:"bytes,2,rep,name=Entries" json:"Entries"`
}

func (*IndexEntries) Descriptor

func (*IndexEntries) Descriptor() ([]byte, []int)

func (*IndexEntries) Marshal

func (m *IndexEntries) Marshal() (dAtA []byte, err error)

func (*IndexEntries) MarshalTo

func (m *IndexEntries) MarshalTo(dAtA []byte) (int, error)

func (*IndexEntries) ProtoMessage

func (*IndexEntries) ProtoMessage()

func (*IndexEntries) Reset

func (m *IndexEntries) Reset()

func (*IndexEntries) Size

func (m *IndexEntries) Size() (n int)

func (*IndexEntries) String

func (m *IndexEntries) String() string

func (*IndexEntries) Unmarshal

func (m *IndexEntries) Unmarshal(dAtA []byte) error

type IndexEntriesWrapper

type IndexEntriesWrapper struct {
	// contains filtered or unexported fields
}

type IndexEntry

type IndexEntry struct {
	Offset    uint64 `protobuf:"varint,1,opt,name=Offset,proto3" json:"Offset,omitempty"`
	BlockSize uint32 `protobuf:"varint,2,opt,name=BlockSize,proto3" json:"BlockSize,omitempty"`
	MinTime   int64  `protobuf:"varint,3,opt,name=minTime,proto3" json:"minTime,omitempty"`
	MaxTime   int64  `protobuf:"varint,4,opt,name=maxTime,proto3" json:"maxTime,omitempty"`
}

func (*IndexEntry) Descriptor

func (*IndexEntry) Descriptor() ([]byte, []int)

func (*IndexEntry) Marshal

func (m *IndexEntry) Marshal() (dAtA []byte, err error)

func (*IndexEntry) MarshalTo

func (m *IndexEntry) MarshalTo(dAtA []byte) (int, error)

func (*IndexEntry) ProtoMessage

func (*IndexEntry) ProtoMessage()

func (*IndexEntry) Reset

func (m *IndexEntry) Reset()

func (*IndexEntry) Size

func (m *IndexEntry) Size() (n int)

func (*IndexEntry) String

func (m *IndexEntry) String() string

func (*IndexEntry) Unmarshal

func (m *IndexEntry) Unmarshal(dAtA []byte) error

type LocalDataFileIndexWriter

type LocalDataFileIndexWriter struct {
	// contains filtered or unexported fields
}

func NewLocalFileIndexWriter

func NewLocalFileIndexWriter() *LocalDataFileIndexWriter

func (*LocalDataFileIndexWriter) Add

func (idx *LocalDataFileIndexWriter) Add(series common.Series, offset uint64, size uint32, minTime int64, maxTime int64) error

func (*LocalDataFileIndexWriter) Len

func (idx *LocalDataFileIndexWriter) Len() int

func (*LocalDataFileIndexWriter) MustEntries

func (idx *LocalDataFileIndexWriter) MustEntries(id common.SeriesID) *IndexEntries

func (*LocalDataFileIndexWriter) SortedID

func (idx *LocalDataFileIndexWriter) SortedID() []common.SeriesID

func (*LocalDataFileIndexWriter) WriteAll

func (idx *LocalDataFileIndexWriter) WriteAll(w io.Writer) (length uint32, indexOffset uint32, errs error)

type LocalDataFileReader

type LocalDataFileReader struct {
	// contains filtered or unexported fields
}

func NewLocalDataFileReader

func NewLocalDataFileReader(f *os.File) (*LocalDataFileReader, error)

func (*LocalDataFileReader) Close

func (reader *LocalDataFileReader) Close() error

func (*LocalDataFileReader) PrintAbstract added in v0.1.0

func (reader *LocalDataFileReader) PrintAbstract()

func (*LocalDataFileReader) PrintAll

func (reader *LocalDataFileReader) PrintAll()

func (*LocalDataFileReader) ReadAllIndexEntries

func (reader *LocalDataFileReader) ReadAllIndexEntries() error

func (*LocalDataFileReader) ReadIndexOfIndexes

func (reader *LocalDataFileReader) ReadIndexOfIndexes() error

func (*LocalDataFileReader) SeriesCount

func (reader *LocalDataFileReader) SeriesCount() int

type LocalDataFileWriter

type LocalDataFileWriter struct {
	// contains filtered or unexported fields
}

func NewLocalFileWriter

func NewLocalFileWriter(f *os.File, bufferSize int, encodingOpt EncodingOption) (*LocalDataFileWriter, error)

func NewLocalFileWriter(w io.WriteCloser, bufferSize int) *LocalDataFileWriter {

func (*LocalDataFileWriter) Close

func (writer *LocalDataFileWriter) Close() error

func (*LocalDataFileWriter) Finalized

func (writer *LocalDataFileWriter) Finalized() bool

func (*LocalDataFileWriter) Flush

func (writer *LocalDataFileWriter) Flush() error

func (*LocalDataFileWriter) WriteHeader

func (writer *LocalDataFileWriter) WriteHeader() error

func (*LocalDataFileWriter) WriteIndex

func (writer *LocalDataFileWriter) WriteIndex() error

func (*LocalDataFileWriter) WriteSeries

func (writer *LocalDataFileWriter) WriteSeries(series common.Series) error

type Store

type Store struct {
	// contains filtered or unexported fields
}

func CreateStore added in v0.1.0

func CreateStore(config Config) (*Store, error)

func NewDiskStore

func NewDiskStore(config Config) (*Store, error)

func (*Store) QuerySeries

func (store *Store) QuerySeries(queries []common.Query) ([]common.QueryResult, []common.Series, error)

func (*Store) Shutdown

func (store *Store) Shutdown()

func (*Store) StoreType

func (store *Store) StoreType() string

func (*Store) WriteDoubleSeries

func (store *Store) WriteDoubleSeries(series []common.DoubleSeries) error

func (*Store) WriteIntSeries

func (store *Store) WriteIntSeries(series []common.IntSeries) error

type StoreMap

type StoreMap struct {
	// contains filtered or unexported fields
}

Source Files

  • config.go
  • data_reader.go
  • data_writer.go
  • disk.pb.go
  • doc.go
  • encoding.go
  • pkg.go
  • reader.go
  • store.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL