Influxdb中TSM文件结构解析之WAL

存储在Influxdb中的数据类型 存储每条数据时的时间戳类型

  • time
Field字段的类型
  • interger - int64
  • unsigned - uint64
  • float64
  • boolean
  • string
Field字段的类型在源码中对应类型 对应的类型是Value,这是个interface,定义在tsdb/engine/tsm1/encoding.go
  • IntegerValue
  • UnsignedValue
  • FloatValue
  • BooleanValue
  • StringValue
    上面的每个类型都包括一个时间戳,这个时间戳就是这个数据被写入时的时间戳,我们看一下FloadValue的定义:
type FloatValue struct { unixnano int64 valuefloat64 }

编解码 每种类型在存储时都需要作编码,尽可能地作压缩,所有针对各个类型均提供了Encoder和Decoder。
这些Encoder负责将一组相同类型的Value作压缩编码,具体的编码算法我们这里不再展开。
我们针对FloatValue作一下分析encodeFloatBlockUsing
参数中values []Value就是一系列的FloatValue`,不仅包括Float值,还包括对应的时间戳,都需要被编码
func encodeFloatBlockUsing(buf []byte, values []Value, tsenc TimeEncoder, venc *FloatEncoder) ([]byte, error) { tsenc.Reset() venc.Reset()for _, v := range values { vv := v.(FloatValue) tsenc.Write(vv.unixnano) //使用TimeEncoder编码每个时间戳 venc.Write(vv.value) //使用FloatEncoder编码每个Float值 } venc.Flush()// Encoded timestamp values tb, err := tsenc.Bytes() if err != nil { return nil, err } // Encoded float values vb, err := venc.Bytes() if err != nil { return nil, err }// Prepend the first timestamp of the block in the first 8 bytes and the block // in the next byte, followed by the block // 将这一组FloatValue打包到一个Block return packBlock(buf, BlockFloat64, tb, vb), nil }

打包到DataBlock 【Influxdb中TSM文件结构解析之WAL】DataBlock是写入和读取TSM文件的最小单位,每个DataBlock里存储的都是同样类型的Value,每个DataBlock里的Value对应都是同一个写入的Key,这个Key是series key + field;
Influxdb算是列存储,在这里所有的Value是连续存在一起,这些Value对应的时间戳也是连续存在一起,这样更有利于作压缩

Influxdb中TSM文件结构解析之WAL
文章图片
influxdb_data_block.png
这个结构中并没有记录Values部分的长度,这是因为我们记录了时间戳部分的总长,在解析时间戳部分时候我们可以得知有几个时间戳,也就知道了有几个Value。
我们来看一下打包过程,结合上面的结构图,这个过程就很简单了:
func packBlock(buf []byte, typ byte, ts []byte, values []byte) []byte { // We encode the length of the timestamp block using a variable byte encoding. // This allows small byte slices to take up 1 byte while larger ones use 2 or more. sz := 1 + binary.MaxVarintLen64 + len(ts) + len(values) if cap(buf) < sz { buf = make([]byte, sz) } b := buf[:sz] b[0] = typ i := binary.PutUvarint(b[1:1+binary.MaxVarintLen64], uint64(len(ts))) i += 1// block is , , copy(b[i:], ts) // We don't encode the value length because we know it's the rest of the block after // the timestamp block. copy(b[i+len(ts):], values) return b[:i+len(ts)+len(values)] }

解包DataBlock 我们还以FloatValue为例
func DecodeFloatBlock(block []byte, a *[]FloatValue) ([]FloatValue, error) { // Block type is the next block, make sure we actually have a float block blockType := block[0] if blockType != BlockFloat64 { return nil, fmt.Errorf("invalid block type: exp %d, got %d", BlockFloat64, blockType) }// 跳过1字节的block type block = block[1:]tb, vb, err := unpackBlock(block) if err != nil { return nil, err }//计算有多少组Value sz := CountTimestamps(tb)if cap(*a) < sz { *a = make([]FloatValue, sz) } else { *a = (*a)[:sz] }tdec := timeDecoderPool.Get(0).(*TimeDecoder) vdec := floatDecoderPool.Get(0).(*FloatDecoder)var i int err = func(a []FloatValue) error { // Setup our timestamp and value decoders tdec.Init(tb) err = vdec.SetBytes(vb) if err != nil { return err }// Decode both a timestamp and value j := 0 for j < len(a) && tdec.Next() && vdec.Next() { a[j] = FloatValue{unixnano: tdec.Read(), value: vdec.Values()} j++ } i = j// Did timestamp decoding have an error? err = tdec.Error() if err != nil { return err }// Did float decoding have an error? return vdec.Error() }(*a)timeDecoderPool.Put(tdec) floatDecoderPool.Put(vdec)return (*a)[:i], err

Dabablock的其他操作
  • BlockType:取block byte buffer的第一个字节
func BlockType(block []byte) (byte, error) { blockType := block[0] switch blockType { case BlockFloat64, BlockInteger, BlockUnsigned, BlockBoolean, BlockString: return blockType, nil default: return 0, fmt.Errorf("unknown block type: %d", blockType) } }

  • BlockCount: 获取一个DabaBlock中包含的Value数量
func BlockCount(block []byte) int { if len(block) <= encodedBlockHeaderSize { panic(fmt.Sprintf("count of short block: got %v, exp %v", len(block), encodedBlockHeaderSize)) } // first byte is the block type tb, _, err := unpackBlock(block[1:]) if err != nil { panic(fmt.Sprintf("BlockCount: error unpacking block: %s", err.Error())) } return CountTimestamps(tb) }

  • DecodeBlock: 解码一个DabaBlock,根据BlockType的不同调用不同的Decode方法
func DecodeBlock(block []byte, vals []Value) ([]Value, error) { if len(block) <= encodedBlockHeaderSize { panic(fmt.Sprintf("decode of short block: got %v, exp %v", len(block), encodedBlockHeaderSize)) }blockType, err := BlockType(block) if err != nil { return nil, err }switch blockType { case BlockFloat64: var buf []FloatValue decoded, err := DecodeFloatBlock(block, &buf) if len(vals) < len(decoded) { vals = make([]Value, len(decoded)) } for i := range decoded { vals[i] = decoded[i] } return vals[:len(decoded)], err case BlockInteger: ... case BlockUnsigned: ... case BlockBoolean: ... case BlockString: ... default: panic(fmt.Sprintf("unknown block type: %d", blockType)) } }

WALEntry
  1. WAL在写入TSM文件时用作预写日志。
  2. 每个DB的每个RetentionPolicy下面的每个Shard下都有自己的一个单独的WAL文件目录,Influxdb在启动的配置文件中需设置单独的WAL目录,来存储所有Shard的WAL文件。
  3. 每个Shard都对应一个WAL目录,目录下有多个wal文件,每个称作一个WALSegment,默认大小是10M。文件命名规则是,以_开头,中间是ID,扩展名是wal, 比如 _00001.wal
  4. 每次写入WAL的内容称为一个WALEntry, 在写入和读取这个Entry时需要序列化和反序列化,我们先来看一下其定义:
type WALEntry interface { Type() WalEntryType// Entry的类型: WriteWALEntry, DeleteWALEntry, DeleteRangeWALEntry Encode(dst []byte) ([]byte, error) MarshalBinary() ([]byte, error) //使用上面的Encode方法作序列化 UnmarshalBinary(b []byte) error //反序列化 MarshalSize() int }

我们下面来分析一下具体的三种WALEntry
WriteWALEntry
  • 一组point组成一个WriteEALEntry,然后写入WALSegment;
    point是一个series对应的一些field的集合,每个point被唯一的series + timestamp标识,可以简单将point理解为就是一个insert语句写入的内容。
  • 定义:
type WriteWALEntry struct { Values map[string][]Value szint }

其中Valuse是个map,它的key是series key + field, 它的value是具有相同的key的所有field value; 其实就是把多个point按series key + field作了合并
  • 结构图

    Influxdb中TSM文件结构解析之WAL
    文章图片
    influxdb_write_wal_entry.png
  • 序列化Encode:完全按照上面的结构图来写入,比较清晰明了
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { // 计算总大小,欲分配内存 encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key// allocate or re-slice to correct size if len(dst) < encLen { dst = make([]byte, encLen) } else { dst = dst[:encLen] }// Finally, encode the entry var n int var curType byte// 遍历Values,逐一编码 for k, v := range w.Values { // 确定field的类型 switch v[0].(type) { case FloatValue: curType = float64EntryType case IntegerValue: curType = integerEntryType case UnsignedValue: curType = unsignedEntryType case BooleanValue: curType = booleanEntryType case StringValue: curType = stringEntryType default: return nil, fmt.Errorf("unsupported value type: %T", v[0]) }// 写入类型 dst[n] = curType n++ // 写入key长度,key = series key + field binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k))) n += 2 // 写入 key n += copy(dst[n:], k)// 写入 value个数 binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v))) n += 4// 逐一写入合部的value for _, vv := range v { binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano())) n += 8switch vv := vv.(type) { case FloatValue: if curType != float64EntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value)) n += 8 case IntegerValue: if curType != integerEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value)) n += 8 case UnsignedValue: if curType != unsignedEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value)) n += 8 case BooleanValue: if curType != booleanEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } if vv.value { dst[n] = 1 } else { dst[n] = 0 } n++ case StringValue: if curType != stringEntryType { return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv) } binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value))) n += 4 n += copy(dst[n:], vv.value) default: return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv) } } }return dst[:n], nil }

DeleteWALEntry
  • 删除Series的WALEntry
  • 定义:
type DeleteWALEntry struct { Keys [][]byte szint }

  • 结构图

    Influxdb中TSM文件结构解析之WAL
    文章图片
    influxdb_wal_delete_entry.png
  • 编码Encode, 各个key间以\n分隔
func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) { sz := w.MarshalSize()if len(dst) < sz { dst = make([]byte, sz) }var n int for _, k := range w.Keys { n += copy(dst[n:], k) n += copy(dst[n:], "\n") }// We return n-1 to strip off the last newline so that unmarshalling the value // does not produce an empty string return []byte(dst[:n-1]), nil }

DeleteRangeWALEntry
  • 删除某个时间范围内的series的WALEntry
  • 定义:
type DeleteRangeWALEntry struct { Keys[][]byte Min, Max int64// 开始时间戳和结束时间戳 szint }

  • 结构图

    Influxdb中TSM文件结构解析之WAL
    文章图片
    influxdb_delete_ragne_wal_entry.png
  • 编码Encode
func (w *DeleteRangeWALEntry) Encode(b []byte) ([]byte, error) { sz := w.MarshalSize()if len(b) < sz { b = make([]byte, sz) }// 写入开始和结束时间戳 binary.BigEndian.PutUint64(b[:8], uint64(w.Min)) binary.BigEndian.PutUint64(b[8:16], uint64(w.Max))i := 16 // 逐一写入key for _, k := range w.Keys { binary.BigEndian.PutUint32(b[i:i+4], uint32(len(k))) i += 4 i += copy(b[i:], k) }return b[:i], nil }

WALEntry的写入
  • 上面我们介绍了三种WALEntry,在序列化后就可以被写入到WALSegment文件中了,在写之前可能还需要作压缩
  • 写入时候为了读取时便于解析,还需要按一定格式写入
    1. 先写入 1字节 的 WALEntry类型
    2. 再写入 4字节 的 序列化后且作了压缩的WALEntry的长度
    3. 最后写入 序列化后且作了压缩的WALEntry的具体内容
  • 使用 WALSegmentWriter类来写入:
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error { var buf [5]byte // 写入类型和具体内容的长度 buf[0] = byte(entryType) binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))if _, err := w.bw.Write(buf[:]); err != nil { return err }// 写入具体内容 if _, err := w.bw.Write(compressed); err != nil { return err }w.size += len(buf) + len(compressed)return nil }

WAL WAL封装了一个预写日志的所有操作,正如前面提到了,一个Shard对应一个WAL,一个WAL在写入时又会产生多个WALSegment。
我们来分析一下一些主要的方法:
Open操作 遍历一个Shard目录下的所有Segment文件,这些文件按id从小到大排序,作初始化操作
func (l *WAL) Open() error { l.mu.Lock() defer l.mu.Unlock() ..if err := os.MkdirAll(l.path, 0777); err != nil { return err }// 获取所有segment 文件列表,按id从小到大排序,最后一个就是当前正写入的文件 segments, err := segmentFileNames(l.path) if err != nil { return err }if len(segments) > 0 { // 最后一个就是当前正写入的文件 lastSegment := segments[len(segments)-1]// 获取最新的segment id id, err := idFromFileName(lastSegment) if err != nil { return err }// 初始化当前的segment id l.currentSegmentID = id stat, err := os.Stat(lastSegment) if err != nil { return err }if stat.Size() == 0 { // 如果文件大小为0, 删除 os.Remove(lastSegment) segments = segments[:len(segments)-1] } else { //为写入,打开该文件 fd, err := os.OpenFile(lastSegment, os.O_RDWR, 0666) if err != nil { return err } if _, err := fd.Seek(0, io.SeekEnd); err != nil { return err }// 初始化当前的SegmentWriter l.currentSegmentWriter = NewWALSegmentWriter(fd)// Reset the current segment size stat atomic.StoreInt64(&l.stats.CurrentBytes, stat.Size()) } }...l.closing = make(chan struct{})return nil }

writeToLog写入操作
func (l *WAL) writeToLog(entry WALEntry) (int, error) { // 从buytesPool获取byte slice, 避免反复重新分配内存 bytes := bytesPool.Get(entry.MarshalSize())// 将entry作编码,前面已经介绍过 b, err := entry.Encode(bytes) if err != nil { bytesPool.Put(bytes) return -1, err }// 使用snappy压缩强词编码后的entry内容 encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))compressed := snappy.Encode(encBuf, b) bytesPool.Put(bytes)syncErr := make(chan error)segID, err := func() (int, error) { l.mu.Lock() defer l.mu.Unlock()// Make sure the log has not been closed select { case <-l.closing: return -1, ErrWALClosed default: }// roll the segment file if needed if err := l.rollSegment(); err != nil { return -1, fmt.Errorf("error rolling WAL segment: %v", err) }// write and sync // 使用SegmentWriter来写入entry内容 if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil { return -1, fmt.Errorf("error writing WAL entry: %v", err) }select { case l.syncWaiters <- syncErr: default: return -1, fmt.Errorf("error syncing wal") }// 将执行file sync操作,刷到磁盘文件 l.scheduleSync()// Update stats for current segment size atomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))l.lastWriteTime = time.Now().UTC()return l.currentSegmentID, nil }()bytesPool.Put(encBuf)if err != nil { return segID, err }// schedule an fsync and wait for it to complete return segID, <-syncErr }

Cache
  1. 时序数据在写入时,会先写入到上面介绍的WAL,然后写入到Cache,最后按照一定的策略Flush到磁盘文件。现在我们来介绍这个Cache。
  2. 这个Cache里缓存的是什么呢?
  3. 这个Cache用什么结构来作内存存储?
    我们下面来一一解答这些问题:
Entry
  • 既然是Cache,那肯定是key-value结构,其中的key是series key + field name, 对应的value就是这个key所对应的若干个field value的集合,也就组合成了一个entry,我们来看下entry的定义:
type entry struct { musync.RWMutex values Values // All stored values.// The type of values stored. Read only so doesn't need to be protected by // mu. vtype byte }

由这个定义我们可知,同一个entry里面的所有value的类型都是相同的,都是这个 vtype里所保存的类型。
  • Entry的创建:使用[]Value来创建,比较简单,但需要先判断这组value的类型是否一致
func newEntryValues(values []Value) (*entry, error) { e := &entry{} e.values = make(Values, 0, len(values)) e.values = append(e.values, values...)// No values, don't check types and ordering if len(values) == 0 { return e, nil }// 个人感觉应该先校验这组value的类型是否一致,不一致就不要作上面的make, append了。 et := valueType(values[0]) for _, v := range values { // Make sure all the values are the same type if et != valueType(v) { return nil, tsdb.ErrFieldTypeConflict } }// Set the type of values stored. e.vtype = etreturn e, nil }

  • Entry的add操作:和上面的创建类似,需要先判断这组value的类型是否一致
  • Entry的去重操作,去掉Values中时间戳相同的value,只保留其中的一个
func (e *entry) deduplicate() { e.mu.Lock() defer e.mu.Unlock()if len(e.values) <= 1 { return } e.values = e.values.Deduplicate() }

实际上是调用了Values.Deduplicate,这个Values提供了若干实用的方法,比如去掉,过滤等。
  • Entry过滤:过滤掉在给定时间戳范围内的Value
func (e *entry) filter(min, max int64) { e.mu.Lock() if len(e.values) > 1 { e.values = e.values.Deduplicate() } e.values = e.values.Exclude(min, max) e.mu.Unlock() }

实际上是调用了Values.Exclude
storer
  • 上面解决了Cache存什么的问题,下面我们来解决怎么存的问题。这个存储器是storer,它是个interface,之前是实现了这个interface的struct都可以用来存Cache里的entry,我们先来看一下这个interface
type storer interface { entry(key []byte) *entry// Get an entry by its key. write(key []byte, values Values) (bool, error)// Write an entry to the store. add(key []byte, entry *entry)// Add a new entry to the store. remove(key []byte)// Remove an entry from the store. keys(sorted bool) [][]byte// Return an optionally sorted slice of entry keys. apply(f func([]byte, *entry) error) error// Apply f to all entries in the store in parallel. applySerial(f func([]byte, *entry) error) error // Apply f to all entries in serial. reset()// Reset the store to an initial unused state. split(n int) []storer// Split splits the store into n stores count() int// Count returns the number of keys in the store }

注释很清晰,我们这里不累述。
  • 那实际是用什么来存的呢? influxdb里实现了ring,它实现了这个storer的所有接口,定义在tsdb/engine/tsm1/ring.go中。简单来说,一个ring内部分为若干个固定数量的桶,这里叫partition, 一个key进来后,按key作hash,对桶的数量取模,确定好要存在哪个桶里,然后每个桶其实又是一个map,最后也就是将key-value存在这个桶的map里。因为cache的添加,读取可能很频繁且都需要加锁,分桶后,各个桶单独加锁,提升性能。代码很简单,这里不详述了。

    推荐阅读