100Gb/s
,早期使用OSS作为Checkpoint数据存储,单个Bucket 每 1P数据量只有免费带宽10Gb/s
,超出部分单独计费,当前规模每月需要增加1x w+/月。7x24
小时 Long run 的,挂了之后,就会有以下两个问题,首先给一个实际场景:一个消费上游 Kafka,使用 Set 去重计算 DAU 的实时任务。数据质量问题:当这个实时任务挂了之后恢复,Set空了,这时候任务再继续从上次失败的 Offset 消费 Kafka 产出数据,则产出的数据就是错误数据了
数据时效问题:一个实时任务,产出的指标是有时效性(主要是时延)要求的。你可以从今天 0 点开始重新消费,但是你回溯数据也是需要时间的。举例:中午 12 点挂了,实时任务重新回溯 12 个小时的数据能在 1 分钟之内完成嘛?大多数场景下是不能的!一般都要回溯几个小时,这就是实时场景中的数据时效问题。
而 Flink的Checkpoint就是把 Set 定期的存储到远程 HDFS 上,当任务挂了,我们的任务还可以从 HDFS 上面把这个数据给读回来,接着从最新的一个 Kafka Offset 继续计算就可以,这样即没有数据质量问题,也没有数据时效性问题。
将任务的本地状态数据,复制到一个远程的持久化存储(HDFS)空间上。
继续处理新流入的数据,包括刚才缓存起来的数据。
3.1 为什么用HDFS?
Flink 做为一个成熟的流计算引擎,对外宣称可以实现 Exactly Once。为了实现业务上的 Exactly Once,Flink 肯定不能丢数据,也就是状态数据必须保障高可靠性,而HDFS作为是一个分布式文件系统,具备高容错率、高吞吐量等特性,是业界使用最广泛的开源分布式文件系统,针对大状态的Checkpoint任务非常契合,带宽易扩展且成本低廉。
HDFS主要有如下几项特点:
和本地文件系统一样的目录树视图
Append Only 的写入(不支持随机写)
顺序和随机读
超大数据规模
易扩展,容错率高
3.2 得物自建HDFS架构
架构层面是典型的主从结构,架构见下图,核心思想是将文件按照固定大小进行分片存储,
主节点:称为 NameNode,主要存放诸如目录树、文件分片信息、分片存放位置等元数据信息
从节点:称为 DataNode,主要用来存分片数据
比如用户发出了一个1GB
的文件写请求给HDFS客户端,HDFS客户端会根据配置(默认是128MB
),对这个文件进行切分,HDFS客户端会切分成8个Block,然后询问NameNode应该将这些切分好的Block往哪几台DataNode上写,此后client端和NameNode分配的多个DataNode构成pipeline管道,开始以packet为单位向Datanode写数据。
4.1 集群规划
早期使用OSS的主要瓶颈在于带宽,为了匹配将大状态的任务从OSS迁移到Hdfs带宽需求,支撑写入流量100Gib+/s
,对比OSS的带宽成本,结合到成本与带宽瓶颈考虑,内部大数据d2s.5xlarge机型做了一次性能压测,单节点吞吐能达到12Gib/s
,按100Gib/s
预估,算上Buffer,3
副本集群需要xx台机器,满足现在的带宽及写入吞吐需求,最终选择d2s.5xlarge类型Ecs机器,对应实例详情如下:
4.2.1 Hdfs组件指标采集
_hadoop_namenode_metrics[指标分类(通常是MBean的名称)][指标名称]
4.2.2 指标采集架构
结合当前集群的规模,我们通过集中是Pull的方式采集架构,只需要启动时指定集群Namenode及Jn的Jmx的url信息,就能采集集群的所有组件的指标信息,这样当有集群扩展或变更时,会自动采集上报到apm里,方便运维,具体采集架构如下图:
监控:基于已采集汇报上的指标数据,目前配置了Namenode、Datanode组件核心指标监控大盘,包括HDFS节点健康状态、HDFS服务健康状态、数据块健康状态、节点的写入吞吐量等指标。
告警:当前监控数据已完成接入公司天眼监控平台,我们将影响hdfs服务可用性的指标统一配置了告警模版,比如集群总的写入带宽、Callqueue队列、DN存活数量、集群节点基础io值班等,可以动态覆盖多集群,实现定制化告警,更加灵活及方便感知问题,减少故障止损时长,满足线上HDFS稳定性保障SLA目标。
4.3.1 DN 心跳汇报于删除共用一把写锁问题
现象:自建Flink平台大部分大状态任务迁移后,自建HDFS集群节点整体的水位各个ecs的网络带宽峰值,出现偶发部分任务因checkpiont 写入失败问题,报错信息如下:
1. 根据客户端日志的堆栈信息,查看Namenode的日志找到对应的文件、块,发现了错误日志,文件块在写入成功后不能及时上报,块的状态一直处于not COMPLETE。
客户端向datanode写入块结束后,datanode通过IBR(增量块汇报)向namenode汇报新写入的块
namenode收到汇报后更新文件的块副本数,当文件块副本数>=1时,文件写入状态为COMPLETE
客户端写入结束后不断向namenode询问文件写入状态是否COMPLETE,失败5(默认)次后报错写入失败。
3. 我们根据猜测的方向,继续定位什么原因导致心跳阻塞了IBR汇报,于是在每台节点上,部署了脚本(见下图),根据Datanode的Jmx指标监听本节点心跳间隔,大于10s时就打印Datanode的Jstack。
Datanode 每个节点上的metric信息里包含心跳汇报间隔的数据。
4. 分析多个Jstack代码(具体内容见下),可以发现BPServiceActor线程被CommandProcessingThread线程阻塞,而CommandProcessingThread线程在调用invalidate()方法,而invalidate()是在调用删除操作。
heartbeating to ****:8020" #56 daemon prio=5 os_prio=0 tid=0x00007f8fc6417800 nid=0x77e0 waiting on condition [0x00007f8f933f5000]
WAITING (parking) :
at sun.misc.Unsafe.park(Native Method)
parking to wait for <0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
at org.apache.hadoop.hdfs.server.datanode.BPOfferService.writeLock(BPOfferService.java:118)
at org.apache.hadoop.hdfs.server.datanode.BPOfferService.updateActorStatesFromHeartbeat(BPOfferService.java:570)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.offerService(BPServiceActor.java:699)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor.run(BPServiceActor.java:879)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
None
processor" #54 daemon prio=5 os_prio=0 tid=0x00007f8fc640f800 nid=0x77de runnable [0x00007f8f935f7000]
RUNNABLE :
at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
at java.io.File.isDirectory(File.java:858)
at java.io.File.toURI(File.java:741)
at org.apache.hadoop.hdfs.server.datanode.LocalReplica.getBlockURI(LocalReplica.java:256)
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2133)
at org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.invalidate(FsDatasetImpl.java:2099)
at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActive(BPOfferService.java:738)
at org.apache.hadoop.hdfs.server.datanode.BPOfferService.processCommandFromActor(BPOfferService.java:684)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processCommand(BPServiceActor.java:1359)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.lambda$enqueue$2(BPServiceActor.java:1405)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread$$Lambda$75/2086554487.run(Unknown Source)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.processQueue(BPServiceActor.java:1332)
at org.apache.hadoop.hdfs.server.datanode.BPServiceActor$CommandProcessingThread.run(BPServiceActor.java:1315)
Locked ownable synchronizers:
<0x00000007204cf938> (a java.util.concurrent.locks.ReentrantReadWriteLock$FairSync)
<0x0000000720e9d988> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
class BPOfferService {
private final Lock mWriteLock = mReadWriteLock.writeLock();
void writeLock() {
mWriteLock.lock();
}
void writeUnlock() {
mWriteLock.unlock();
}
void updateActorStatesFromHeartbeat(
BPServiceActor actor,
NNHAStatusHeartbeat nnHaState) {
writeLock();
try {
//... 心跳汇报
} finally {
writeUnlock();
}
}
boolean processCommandFromActor(DatanodeCommand cmd,
BPServiceActor actor) throws IOException {
assert bpServices.contains(actor);
// ...省略
writeLock();
try {
//...执行删除逻辑
} finally {
writeUnlock();
}
}
}
5. 确认问题:查看Namenode审计日志发现,集群持续有大量文件删除(Flink删除过期Checkpoint meta文件)操作,修改Datanode端代码,在调用processCommandFromActive方法超过一定10s后打印调用时长与CommandAction日志。查看datanode日志发现确实存在删除操作大于30s的情况,由此确认问题就是出现在删除操作耗时过长影响了Datanode的增量块汇报。
找到问题就是出现在BPServiceActor 线程做了太多的事,包含FBR、IBR、心跳汇报,而且心跳汇报和删除共同持有一把写锁,那解决方案一个就把这两把锁进行拆分,一个就是将IBR逻辑单独独立出来,不受心跳汇报影响。
而社区3.4.0版本已经将IBR从BPServiceActor 线程独立出来了,所有我们最终将HDFS-16016 patch 合并到自建Hdfs3.3.3版本中,IBR不会被invalidate()阻塞,问题得到根治!
总结:Oss的流量已从早期137Gib/s降低到30Gib/s左右(下图一),自建Hdfs集群峰值流量达到120Gb/s(下图二),且平稳运行
整个项目已完成全部大状态任务从Oss迁移到自建Hdfs,当前Hdfs集群规模xx台,成本x w/月,原OSS带宽费用报价1x w/月,相比节省xx w/月。
未来规划:对于全量 checkpoint 来说,TM 将每个 Checkpoint 内部的数据都写到同一个文件,而对于 RocksDBStateBackend 的增量 Checkpoint 来说,则会将每个 sst 文件写到一个分布式系统的文件内,当作业量很大,且作业的并发很大时,则会对底层 HDFS 形成非常大的压力,
1)大量的 RPC 请求会影响 RPC 的响应时间。
2)大量文件对 NameNode 内存造成很大压力。
针对上面的问题我们未来考虑引入小文件合并方案降低 HDFS 的压力,包括 RPC 压力以及 NameNode 内存的压力。
*文/希贤
“
扫码添加小助手微信
如有任何疑问,或想要了解更多技术资讯,请添加小助手微信: