cover_image

Apache Pulsar在微信实时推荐场景下的优化与实践

微信大数据团队 微信后台团队
2023年05月17日 03:20

背景

Pulsar 作为微信大数据平台 Gemini-2.0 的消息队列,支持微信实时大数据及推荐场景的业务。

Gemini-2.0 是微信内部的云原生大数据平台,构建在腾讯云 TKE 容器平台之上,为微信各业务提供大数据及 AI 计算的基础支撑,主要具备如下特性:

· 计算存储完全分离

· 大数据及AI计算框架的统一编排调度

· 优化后的高性能计算组件

· 灵活高效的扩展及二次开发能力

随着微信推荐类业务飞速发展,推荐系统已经全面进入了“大规模+全实时+深度学习”时代。

为了支持推荐系统的高要求,大数据平台必须提升数据处理能力。其中,消息队列(MQ)成为了大数据实时处理的“数据总线”,是推荐系统的“基石”,各组件和模块,实时往MQ大量写入和消费数据。在支撑大规模的数据写入与消费的同时,MQ还必须保持高可用性,并且能够友好支持上下游各组件和模块。

在消息队列的技术选型中,我们选取了 Apache Pulsar,主要考虑到Pulsar以下的优点:

  • 云原生特性。Pulsar 的云原生特性,包括分布式、弹性伸缩、读写分离等都在业务上云背景下体现出优势。Pulsar 逻辑层 Broker 无状态,直接提供服务。存储层 Bookie 有状态,但是节点对等,且自带多副本容灾;

  • 支持资源隔离。可以软隔离或硬隔离,避免不同业务之间互相影响;

  • 灵活的策略管控。Pulsar 支持对 Namespace/Topic 分别设置策略来满足业务场景需求;

  • 快速/便捷扩容。逻辑层 Broker 的无状态和负载均衡策略允许快速扩容,存储层 Bookie 节点之间互相对等也便于快速扩容,可以轻松应对流量暴涨场景;

  • 多语言客户端。可以丝滑对接各种AI组件,方便分析与算法工程师使用。

图片

Pulsar在设计上,有非常好的合理性,但在实际的开发与运营中,我们仍然面临高性能、高可用、易维护、低成本等一系列的挑战。

高性能

1. 负载均衡优化

在线上生产环境中,我们遇到了 Broker 负载反复波动的情况。而当时集群整体负载并不高,却存在少量 Broker 节点高负载。

图片

集群负载不均衡的维护主要有两点:

  1. 大流量场景下,集群的服务质量呈木桶效应。Pulsar Client 连接 Topic 时会连接所有的分区。那么对于这个 Topic,只要有分区被分配到了高负载的机器,此时就可能由于该分区的响应时延高进而影响到 Client 的服务性能;

  2. 反复负载均衡会增加 Client 和 Broker 之间的连接压力。尽管 Client 侧本身有很多保护措施保障稳定性,但反复断开重连会额外增加 Client 和 Broker 之间的连接压力,容易服务不稳定。


我们发现直接原因为负载均衡策略导致 Bundle(Pulsar集群负载的最小单元)反复卸载与加载,当时集群的负载均衡配置为:

loadManagerClassName=org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImplloadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedderloadBalancerBrokerThresholdShedderPercentage=10# 负责加载 Bundle 的处理类为:org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate

图片

以上图为例,假设三个 Broker 平均负载是 50%,则需要进行负载均衡的阈值就是 60%,即 Broker 中超出 60% 的部分负载需要均衡。理想状态下,Broker 1 的多余 20% 负载应该转移到 Broker 3 上,实际却转移到了 Broker 2 上。之后由于 Broker 2 超载所以又会卸载下来,再回到 Broker 1 上。结果流量就在 Broker 1 和 Broker 2 上反复横跳。

跟踪代码发现,负责加载 Bundle 的处理类(org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate)是根据消息量判断哪个 Broker 应该承载多余流量。但是实际生产环境中消息量与机器负载并不完全正相关,且负责卸载 Bundle 的处理类(org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder)是根据 CPU、出入流量、内存等多种指标平均加权得出 Broker 负载,所以 Bundle 的加载和卸载逻辑并不一致。与此同时,在选择候选 Broker 时,原生实现会过滤掉已加载 Namespace 下最多 Bundle 的 Broker,目的为尽可能让一个 Namespace 的数据平摊在多个 Broker 中,导致了本应由 Broker 3 加载流量却将 Broker 3 移除出了候选集。

对此我们进行了代码优化改进:

  • 统一 Bundle 的加载和卸载处理类关于 Broker 负载计算的逻辑;

  • 满足隔离策略条件下,允许选取所有 Broker 节点作为候选节点加载 Bundle;

  • 加载 Bundle 时,优先选择低于平均负载的节点。

优化后,整体集群流量均衡了很多,并且日均负载调整次数由原来的 1000 次降低至个位数,甚至全天都不会出现负载均衡调整。

图片

2. 追赶读优化

所谓追赶读(Catch-up Reads),其实就是消费历史数据。Broker 层本身存在 Cache,默认缓存各个 Topic 的最新部分消息。因此在消费历史数据时,读请求往往会穿透 Cache 到达 Bookie 层,既增加集群内部的穿梭流量,也会增加存储层的压力。追赶读严重时可能导致雪崩,消费者将始终无法追上消息生产速度。

推荐场景有着以下特征:

  • 消费任务数量众多。实时训练的模型众多,每一个模型都对应着一份订阅;

  • 消费速度参差不齐。受限于每个模型的资源情况与复杂度,消费速度并不总是能跟上消息生产速度,经常出现追赶读;

  • 消费任务进程重启。模型迭代迅速,每一次迭代都需要短暂停止消费,从而出现数据积压。


对此,社区原有的 Broker Cache 逻辑效果不佳。默认策略会找出当前消费不活跃(由阈值控制,Cursor 消费的消息数超过阈值即被认为是不活跃)的 Cursor,对 Cursor 之前的数据做驱逐。但是在我们的场景下,使用消息数阈值往往并不能总是满足我们的预期的。因为不同的 Topic 的消息大小是不一样的,如果一个 Topic 的消息很大,那么相对应的消息数阈值条件下所需要的 Cache 大小就会很大,很容易导致 Cache 空间紧缺引发数据驱逐,从而影响其他 Topic 的 Cache 命中率。

我们重写了 Cache 部分策略逻辑,支持 Topic 粒度配置 Cache 时长,按照实际业务场景分别配置 Cache 策略,在同等总 Cache 大小的情况下,Cache 命中率由 85% 提升至 95%,相对应地,存储层的读流量也大大降低。

图片

追赶读优化的最终目的就是减少存储层读数据的流量,那么除了提升 Cache 命中率,我们也重新审视了发出的读请求情况,看这里是否可以进一步减少读请求/流量。在稳定情况下,如果一个 Topic 有众多订阅,那么消费追不上的这部分订阅肯定都是一直在消费 TTL 附近的数据。在原有的实现中,每个订阅的读数据处理都是独立的,因此对于一直在消费 TTL 附近的这部分订阅,他们所请求的数据是存在交集的。那这里我们就可以考虑对这些存在交集的读请求进行缩减,从而进一步降低集群读流量。

我们在负责读 Bookie 数据这里,对于一个 Topic 发出的读请求全部管理起来。当一个读请求即将发出时,记录下该读请求以及其所访问的数据范围,在新的读请求即将发出时,检测是否存在已发出的读请求的数据范围中存在交集。如果存在,则拆分读请求后再访问,仅发出没有交集数据范围的读请求。

图片

优化后,我们发现线上环境中存在真实的追赶读场景,可过滤非常多的读请求,相对应地,存储层的读流量也进一步减少了。

3. 针对SSD调优

社区版本的Pulsar Helm Chart中,只允许Bookie的journal/ledger各使用一个PVC,即各只能使用一块盘。实际上,Bookie是支持多盘的,使用多盘可以充分利用与叠加多盘的IO能力。我们对Pulsar Helm Chart进行了改进,使得单台Bookie节点可以支持使用多块journal盘或者多块ledger盘,该特性也已合入社区。在实际生产环境中,我们建议journal与ledger不要共用一块硬盘,以免互相影响从而导致服务性能抖动。

在压测时,我们发现bookie吞吐上不去,并没有达到机器的各项性能指标,尤其是journal盘的IO能力没有被充分利用。实际跟踪代码发现,在Bookie中,一个journal目录对应着一个写入线程。因此可以尝试增加journal目录来增加journal的吞吐能力。为了更加充分利用SSD硬盘的IO,最好设置一块硬盘多个目录来提升吞吐,尤其是journal盘。调整过后,Bookie压测基本可以达到硬盘吞吐上限。

Bookie 自身存在 Compaction 机制,我们也对其进行了触发间隔与限速的调整,降低对线上流量的实际硬盘IO影响,实际观测可以避免读写时延毛刺。

我们也调整了 Bookie 的 read/write buffer 大小,适当调大 write buffer 和 rocksDB write buffer 提升写入性能,适当调小read buffer 避免大批量读引发读时延的增加,实际观测也可减少读时延毛刺。

writeBufferSizeBytes=67108864dbStorage_rocksDB_writeBufferSizeMB=128readBufferSizeBytes=4096

注意,以上参数需要结合实际业务场景与硬件情况进行调整。

高可用

1. 过载保护

任何服务都无法无限制地提供服务,Pulsar 也不例外。我们对线上集群的所有 Topic 都有配置生产速度、消费速度、订阅数、单分区流量等告警,及时发现各类异常。我们也会默认给每个 Topic 进行限速,防止异常流量拖跨集群。

在推荐场景下,一个数据流可能需要非常多的实验与测试,由此会导致非常多的订阅。我们针对具体的业务场景进行数据轻重分离,例如重要模型独立 Topic 提供服务,引导业务按具体场景拆分 Topic,避免单 Topic 流量过大。

除此之外,我们也对集群配置了自动扩容策略,当集群平均负载到达阈值后会触发自动扩容,规避过载风险。

2. 系统容灾

Pulsar 单集群内所有组件都进行多园区部署,避免单园区故障时影响服务。Bookie 层配置多园区机架感知,实现数据多园区备份。

对于Bookie容灾参数也进行调整,调大rereplicationEntryBatchSize,增加bookie在数据复制时的并行处理entry数,使得出现bookie宕机时ledger复制的速度,加速bookie的故障容灾效率。调整该参数时要留意常规情况下bookie的ledger盘的IO压力。如果本身ledger盘IO压力已较大,不宜将该参数值调高。

rereplicationEntryBatchSize=100

易维护

1. 日志采集与检索

我们将所有 Pulsar 组件都配置了日志采集,接入使用了腾讯云的日志服务,其底层为ES,接入后可以比较便利地检索日志。尤其是在节点较多的情况下,不再需要重新定位日志在哪台机器上,再一台台登机器看日志了。

图片

2. Prometheus集群化部署

我们最初是直接使用的Pulsar Helm Charts中提供的Prometheus方案,其为单点Prometheus实例的方式。随着数据和使用量的增加,Prometheus的硬盘占用以及内存占用都越来越大。最终,硬盘占用与内存占用超过了单节点上限。显然,单机的Prometheus已经满足不了我们的监控需求了。于是我们开始寻求分布式的Prometheus集群解决方案。

经过一段时间调研,我们最终选用了Kvass + Thanos + Prometheus方案。Kvass负责各个采集target的分发调度,Thanos负责整合各个Prometheus实例的查询结果,并且可支持将多个Prometheus集群融合在一起提供查询服务,最终解决了我们单实例Prometheus无法支撑更大数据量的问题。该方案可以进行横向扩展,理论上可支持无限容量与采集目标。

针对我们的实际线上场景,我们一个 Pulsar 集群会对应一套独立的 Prometheus 集群方案,进而实现数据隔离。同时,我们在 Kvass Coordinator 中集成了连接我们内部 CMDB 的能力,从而实现 Pulsar 集群动态扩缩容的感知,实现全自动化调度采集。

3. 告警监控

微信内部已有非常优秀的监控平台,提供了完善的监控与告警能力。我们从Prometheus集群中获取Pulsar集群的各项关键指标与用户相关指标,旁路一份进入内部监控平台,及时感知与处理集群异常。

针对用户的消费积压场景,我们也配置了用户级别的监控告警,及时提醒用户有消费积压并关注处理。

低成本

1. 集群网络优化

我们使用了 Pulsar 官网提供的 K8s Helm chart 部署方式。

图片

原生部署架构中,流量全部从 Proxy 代理层进入集群,经过 Broker 逻辑服务层写入 Bookie 存储层。Proxy 代理层代理客户端和 Broker 之间的连接,Broker 层管理 Topic 负责提供服务,Bookie 层负责持久化消息存储。在上图中,入流量和出流量分别用 In 和 Out 进行标记,Replica 是配置的副本。

在应用的过程中,我们发现了两个问题:首先 Proxy 代理了 Pulsar 客户端的请求,导致 Broker 无法获取客户端 IP,增加了运维难度;其次,当集群流量较大时,集群内部带宽会成为瓶颈。上图架构内,集群入流量为 (2+副本数)倍;出流量最大为 3 倍,Consumer、Proxy、Broker 和 Bookie 间分别有一倍流量,极端情况下(全部消费历史数据)流量会全量从 Bookie 流出。假设出入流量是 10 GBps,副本数为 3,那么在集群内入流量会放大为 50 GBps,出流量会放大为 30 GBps。另外默认情况下 Proxy 服务只有一个负载均衡器承载所有流量,带宽压力非常大。

这里可以看出由于 Proxy 层的存在,造成了很大流量浪费。而 Pulsar 实际上支持 Broker 直连,因此我们在此基础上进行了一些优化。

我们利用了腾讯云 K8s 集群的能力,给 Broker 配置了弹性网卡,并使 Broker 的 IP 暴露在集群外,可以被外部客户端直接访问。我们也给 Broker 服务配置了负载均衡器。这样客户端可以直接访问负载均衡器 IP,再经过 Pulsar 内部协议的 Lookup 操作找到要访问的 Topic 所处的 Broker,由此节省了 Proxy 带来的额外带宽消耗。我们经过实测,使用 Broker 直连的方式,可以减少 Pulsar 集群内部相当一部分的穿梭流量。

2. 应用非持久化Topic

Pulsar 有独特的非持久化(Non Persistent) Topic 特性。生产者和消费者是与 Broker 中的 Dispatcher 模块进行交互的,而持久化(Persistent) Topic 中生产者数据会通过 Dispatcher 进入 Managed Ledger 模块,再调用 Bookie 客户端与 Bookie 交互。非持久化 Topic 中的数据则不会进入 Managed Ledger,而是直接发送给消费者。如果消费者的内部队列已满则会出现数据丢失的现象。因此非持久化 Topic 的应用一定需要可容忍数据丢失。

图片

在大规模实时推荐场景中,存在场景其对应的实时模型训练时刻只需要最新的样本数据,不需要历史样本,此时就非常适合使用非持久化 Topic。非持久化 Topic 由于不需要与 Bookie 交互,不会有追赶读引发的一系列问题,对集群的带宽压力会明显降低,并且不存在存储压力,可节省大量存储资源。

我们在实际应用非持久化 Topic 的过程中进行了多项调优与bugfix,只要消费能力足够,即便是非持久化 Topic 也能做到几乎不丢数据。

3. 使用COS Offloader

当前由于业务需求以及性能考虑,bookie集群的journal和ledger都使用了SSD盘,但实际上业务本身也有需求希望能够存较长时间较大量的数据。如果所有数据都使用SSD进行存储则会产生较大的成本,因此我们需要做到业务需求与成本之间的平衡。

Pulsar 提供了分层存储(Tiered Storage)能力,可以将存储转移到廉价的存储层。Pulsar Offloader 可以将超过一定时长的 Ledger 搬运到远端存储,不再停留在 Bookie 层,由 Broker 接管这部分的数据管理。我们使用 Pulsar Offloader 的原因有:

  • Bookie Journal/Ledger 盘都使用 SSD,成本较高;

  • 业务需求存储时间长、数据存储量大;

  • 数据消费任务异常,需要容忍较长时间的数据 Backlog;

  • 数据回放需求。


腾讯云对象存储(COS)应用较多并且性价比较高,但Pulsar 社区版本并不支持COS,所以我们开发了云上 COS Offloader 插件并应用于线上。使用COS作为冷存长期存储,相对于存放于 Bookie 本机SSD的模式,总体成本大大降低。我们也已提PR将对应代码贡献给社区。

我们会持续往低成本方向进行优化。

总结

经过我们一系列的改造优化,Pulsar在高性能、高可用、易维护、低成本等方面,都有很大的提升,很好支持了“大规模+全实时+深度学习”类型的推荐系统。

在这过程中,我们也跟社区有充分的互动,累计递交20+ PR,并参与了Pulsar Meetup进行技术分享。

参考资料

  1. https://pulsar.apache.org/

  2. https://github.com/apache/pulsar-helm-chart

  3. https://github.com/tkestack/kvass

  4. https://github.com/thanos-io/thanos

大数据平台 · 目录
上一篇Apache Pulsar在微信实时推荐场景下的优化与实践下一篇WeOLAP 亚秒级实时数仓 —— BitBooster 10倍查询优化实践
继续滑动看下一个
微信后台团队
向上滑动看下一个