为了满足业务的需求,信也科技基于SeaweedFS搭建了分布式文件存储服务,目前已在生产环境落地,陆续接入了部分业务场景,积累了一些经验。本人也在参与该项目的过程中也对SeaweedFS源码进行了深入研究,下面将会给大家分享SeaweedFS Volume节点数据存储设计,也会对其核心源码进行解读。
之前已经对SeaweedFS集群和Master节点进行了讲解,如果有对相关概念不了解的同学请看这几篇文章:分布式文件存储SeaweedFS简介与Mount功能原理解密、SeaweedFS Master节点Raft协议源码解析
SeaweedFS是一个分布式文件存储中间件,由Filer、Master、Volume三种节点组成集群。在文件上传流程中Filer节点负责将大文件切分成小的文件块,Master节点会为每一个小文件块分配文件ID和存储位置,Volume节点负责将文件块存入磁盘并维护索引。
基本概念
Volume节点:存储数据的节点。
Volume:数据节点管理文件的单元。本质是一个数据文件和一个索引文件。
Needle:表示一个文件块,存储在Volume的数据文件中。
Volume节点和Volume是两个比较容易混淆的概念:Volume节点是运行在服务器中的一个进程(对应代码中的VolumeServer结构体),Volume是Volume节点中为了方便管理文件而设计的结构体(对应代码中的Volume结构体)。在物理层面Volume就是两个文件:一个最大为30GB的数据文件,一个索引文件。用户上传的文件会封装为一个Needle然后以追加的方式写入数据文件中,并在索引文件中记录文件ID、起始Offset、文件Size,在下载时可以通过索引文件快速定位到Needle在数据文件中的位置。
type Volume struct {
// volume id
Id needle.VolumeId
dir string
dirIdx string
// 该volume所属bucket
Collection string
// 数据文件
DataBackend backend.BackendStorageFile
// needle 索引
nm NeedleMapper
tmpNm TempNeedleMapper
// 索引类型(内存+文件)
needleMapKind NeedleMapKind
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteLock sync.RWMutex
hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32
super_block.SuperBlock
dataFileAccessLock sync.RWMutex
superBlockAccessLock sync.Mutex
// 异步操作队列
asyncRequestsChan chan *needle.AsyncRequest
// 数据最后修改、追加时间
lastModifiedTsSeconds uint64 // unix time in seconds
lastAppendAtNs uint64 // unix time in nanoseconds
lastCompactIndexOffset uint64
lastCompactRevision uint16
isCompacting bool
isCommitCompacting bool
volumeInfo *volume_server_pb.VolumeInfo
location *DiskLocation
lastIoError error
}
通过上面的简单介绍,我们已经了解了文件在单个节点上是如何存储的,但在分布式系统中,数据必须具有备份、容灾能力,所以下面将介绍SeaweedFS中提供的两种数据备份方式:多副本、纠删码(EC)。
与其他存储中间件相同,SeaweedFS提供多副本数据存储能力。在启动Master节点时可以通过-defaultReplication=XYZ配置副本数,其中xyz分别代表:
例如,我们在搭建集群时启动参数为-defaultReplication=020,代表在不同的机架额外维护两个副本,也就是总共存储三份数据。
多副本数据一致性
为了保障数据的一致性,需要满足公式 W + R > N,其中 W 代表写入几个副本才算写入成功,R 代表读取数据需要读取几个副本,N 代表数据总共有几个副本。在SeaweedFS的实现中 W = N、R = 1,也就是必须所有副本写入成功才认为写入成功,读取时只需要读任意副本即可。
文件上传代码实现
func ReplicatedWrite(masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(r)
// 计算副本数,并查找副本volume所在的节点
// check whether this is a replicated write request
var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" {
// this is the initial request
remoteLocations, err = GetWritableRemoteReplications(s, grpcDialOption, volumeId, masterFn)
if err != nil {
glog.V(0).Infoln(err)
return
}
}
// read fsync value
fsync := false
if r.FormValue("fsync") == "true" {
fsync = true
}
// volume节点先查找本地是否有对应的Volume,如果有先写入本地
if s.GetVolume(volumeId) != nil {
start := time.Now()
isUnchanged, err = s.WriteVolumeNeedle(volumeId, n, true, fsync)
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToLocalDisk).Observe(time.Since(start).Seconds())
if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToLocalDisk).Inc()
err = fmt.Errorf("failed to write to local disk: %v", err)
glog.V(0).Infoln(err)
return
}
}
// 写入副本节点
if len(remoteLocations) > 0 { //send to other replica locations
start := time.Now()
// 分布式操作,并发写入其他多个副本
err = DistributedOperation(remoteLocations, func(location operation.Location) error {
u := url.URL{
Scheme: "http",
Host: location.Url,
Path: r.URL.Path,
}
q := url.Values{
"type": {"replicate"},
"ttl": {n.Ttl.String()},
}
if n.LastModified > 0 {
q.Set("ts", strconv.FormatUint(n.LastModified, 10))
}
if n.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
pairMap := make(map[string]string)
if n.HasPairs() {
tmpMap := make(map[string]string)
err := json.Unmarshal(n.Pairs, &tmpMap)
if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorUnmarshalPairs).Inc()
glog.V(0).Infoln("Unmarshal pairs error:", err)
}
for k, v := range tmpMap {
pairMap[needle.PairNamePrefix+k] = v
}
}
// volume server do not know about encryption
// TODO optimize here to compress data only once
uploadOption := &operation.UploadOption{
UploadUrl: u.String(),
Filename: string(n.Name),
Cipher: false,
IsInputCompressed: n.IsCompressed(),
MimeType: string(n.Mime),
PairMap: pairMap,
Jwt: jwt,
}
_, err := operation.UploadData(n.Data, uploadOption)
return err
})
stats.VolumeServerRequestHistogram.WithLabelValues(stats.WriteToReplicas).Observe(time.Since(start).Seconds())
if err != nil {
stats.VolumeServerRequestCounter.WithLabelValues(stats.ErrorWriteToReplicas).Inc()
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err)
return false, err
}
}
return
}
func DistributedOperation(locations []operation.Location, op func(location operation.Location) error) error {
length := len(locations)
results := make(chan RemoteResult)
// 多个协程并发写入副本数据
for _, location := range locations {
go func(location operation.Location, results chan RemoteResult) {
results <- RemoteResult{location.Url, op(location)}
}(location, results)
}
ret := DistributedOperationResult(make(map[string]error))
for i := 0; i < length; i++ {
result := <-results
ret[result.Host] = result.Error
}
// 只要一个副本写入报错,则直接返回error
return ret.Error()
}
根据上面的一致性公式 W = N,数据写入需要所有副本都写入成功,那是不是意味着只要有一个节点宕机就无法写入数据了呢?当然不是。下面我们将会介绍SeaweedFS在多副本模式下如何保证服务高可用。
在多副本模式下,所有的Volume都会存储 N 个副本,副本会分散在多个 Volume 节点上,根据上面讲的一致性策略,读文件只需要读取一个副本,所以只要有一个副本存活就可以提供读服务。
如图,我们以3节点2副本举例,假设3号Volume节点宕机,那么Volume-2会被标记为只读状态。新的文件上传请求会被分配到Volume-1中,如果Volume-1也处于无法写入状态(比如写满了),那么会新建一个Volume-3并将新的文件写入Volume-3中。
多副本存储会导致存储空间翻倍,空间利用率低,例如:三副本存储空间利用率为33%。纠删码存储可以在实现数据备份、容灾的前提下提高空间利用率,降低存储成本。在实现时根据计算纠删码的时机不同可以分为在线、离线两种方式:
纠删码存储会将原始数据切分为 N 个大小相同的数据块,然后通过计算的到 M 个相同大小的校验块,将 N + M 个块均匀的存储在多台服务器中,即可实现丢失任意 M 个块都可以通过逆向计算恢复数据。在SeaweedFS中使用RS(10, 4)算法,也就是将原始数据平均分为 10 个数据块,然后通过纠删码计算得到 4 个校验块,任意丢失 4 个块都可以通过逆向计算恢复数据。RS(10, 4)算法只额外占用了40%的存储空间就可以容忍4个分区数据的丢失,空间利用率为70%,而多副本存储需要至少5个副本才能实现,空间利用率为20%。
SeaweedFS实现的是离线EC,也就是数据会先按照三副本的方式存储,通过定时任务的方式将存储空间达到95%(默认值为95)的Volume转换为EcVolume,计算过程如下:
在SeaweedFS设计中,纠删码存储的数据是不变的,所以不存在一致性问题,也不存在写操作高可用问题,只需要保证读高可用即可。上面其实也提到了,当节点宕机时可以通过计算实时恢复数据保证读高可用,所以接下来直接看代码。
func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
// 如果要读取的数据在本节点,则直接读取
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
if _, err = shard.ReadAt(data, actualOffset); err != nil {
glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
return
}
} else {
ecVolume.ShardLocationsLock.RLock()
sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
ecVolume.ShardLocationsLock.RUnlock()
// try reading directly
if hasShardIdLocation {
// 如果需要读取的数据存储在其他节点,则尝试从其他节点读取
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
}
// 从当前存活的节点中恢复数据
// try reading by recovering from other shards
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
}
return
}
func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
return 0, false, fmt.Errorf("failed to create encoder: %v", err)
}
bufs := make([][]byte, erasure_coding.TotalShardsCount)
var wg sync.WaitGroup
ecVolume.ShardLocationsLock.RLock()
for shardId, locations := range ecVolume.ShardLocations {
// skip currnent shard or empty shard
if shardId == shardIdToRecover {
continue
}
if len(locations) == 0 {
glog.V(3).Infof("readRemoteEcShardInterval missing %d.%d from %+v", ecVolume.VolumeId, shardId, locations)
continue
}
// read from remote locations
wg.Add(1)
// 从存活节点读取数据
go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) {
defer wg.Done()
data := make([]byte, len(buf))
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)
}
if isDeleted {
is_deleted = true
}
if nRead == len(buf) {
bufs[shardId] = data
}
}(shardId, locations)
}
ecVolume.ShardLocationsLock.RUnlock()
wg.Wait()
// 恢复数据
if err = enc.ReconstructData(bufs); err != nil {
glog.V(3).Infof("recovered ec shard %d.%d failed: %v", ecVolume.VolumeId, shardIdToRecover, err)
return 0, false, err
}
glog.V(4).Infof("recovered ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
copy(buf, bufs[shardIdToRecover])
return len(buf), is_deleted, nil
}
本文简单介绍了Volume节点的设计,讲解了三副本存储和纠删码存储的一致性设计与高可用设计。但受篇幅所限,部分细节并没有过多的展开,例如:HayStack海量小文件存储设计、纠删码算法、在线纠删码、Volume状态维护等。我计划在后续的文章中逐步讲解,敬请期待。
LGW 信也科技基础架构研发专家,主要负责分布式文件存储的研发工作。