编辑:邵一帆、邵加佳
在线广告在互联网技术快速发展的过程中,已经形成了以人群为投放目标、以产品为导向的技术型投放模式。它不仅为广告主带来了以准确接触目标受众为方法论的全新营销渠道,也为互联网免费产品和媒体找到了规模化变现的手段。收钱吧秉持着“服务千万商家,全能生意帮手”的愿景,聚焦移动支付开拓一定的市场规模后,逐步开始探索商业化模式,收钱吧广告平台应运而生。
广告计费系统是整个广告平台非常重要的组成部分,在很多方面发挥着重要的支撑作用,例如:
本文着重讲述收钱吧在广告计费系统上的一些探索与实践。
上一代计费系统采用OLAP引擎Apache Druid为基础架构,采用ECS自建的方式,线上运行时发现不少缺点,比如维护成本高昂、故障率高、平滑升级版本难度大等。架构图如下:
曾经引发一次故障印象较为深刻,由于Druid内部原因导致Historical节点和ZooKeeper的连接session超时,导致Historical节点被ZooKeeper从服务注册中踢出,查询端无法获取到该节点的数据产生大量超时请求。
而在云计算蓬勃发展的当下,相比ECS本地自建的方式,Paas产品开箱即用、总成本更低、简单高效更灵活,更受大家的追捧。经过多番调研,结合收钱吧广告的实际业务需求,最终选择废弃Apache Druid改用Kafka Streams作为基础架构。
在收钱吧的流量场景下,广告投放系统从广告库中拉取广告,层层筛选过滤,挑选出合适的广告下发用户端进行展示。广告下发成功发送一次下发事件,广告成功展示则发送一次曝光事件,用户点击广告则发送一次点击事件,用户在广告页面产生特定的转化行为例如下单、注册,则由广告主在广告页内上报一次转化事件。
广告事件消息体包含了广告相关的很多信息,比如广告内容、出价类型、出价价格等广告信息,用户ID、IP等设备端信息,以及交易信息、商户信息、门店信息等等。这些事件消息写入Kafka消息队列后,除了给广告计费系统计算以外,还会在用户行为分析、数据报表、投放效果分析等诸多场景下发挥作用。
广告计费系统对查询接口响应时间、数据更新延迟时间比较敏感,这两个时间影响了广告投放下发广告的速度,也影响了广告投放匹配策略下的最终结果。广告计划的实时费用的及时性会直接对流量塑形、预算控制、防超投等方面产生影响。因此需要实现一套快速响应、低延迟的广告计费系统。
广告计费系统集成了Kafka Streams,它提供了对存储在Kafka内数据进行流式处理和分析的能力。流式处理就是利用连续计算来处理无限数据流的能力。
Kafka Streams作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式都交给开发者控制,调试、使用、打包部署都很方便。基于Kafka的Rebalance机制,Kafka Streams可以在线动态调整并行度。Kafka本身提供数据持久化,因此Kafka Streams提供滚动部署和滚动升级以及重新计算的能力。出于这几项优点才选择了将Kafka Streams集成到广告计费系统中。
在Kafka的分区机制下,单个消费者分配到n个分区时,Kafka Streams实例创建n个Stream Task,一个Stream Task对应一个分区,它是Kafka Streams的最小单元。每个Task包含了一个Topology,Topology由多个处理单元Processor组成。开发者构建多个Processor按照计算逻辑结构进行排布,最终组成满足计算结果的Topology。
广告费用查询需要满足查询指定时间跨度的广告计划的曝光、点击、花费等指标。为了提高查询性能,加上实际业务场景下的考量,采取了对明细事件数据做一次分钟级的预聚合。从存储了明细事件消息的Kafka Topic源头消费,提取事件消息中计费相关的字段,包括事件时间戳(单位毫秒)、事件类型(下发事件、曝光事件或点击事件等)、广告计划ID、出价类型(按曝光出价、按点击出价等)、出价价格(单价,如果是按曝光出价则曝光事件才会产生费用,如果是按点击出价则点击事件才会产生费用)。提取字段完成后,以广告计划ID和分钟时间戳作为key,将新消息写入Repartition Topic,这个Topic是由Kafka Streams自动创建的,目的就是为了将事件根据key再分区写入一个中间Topic,确保GroupBy相同的字段在同一个分区下,这样在聚合计算时各个分区的计算结果不会有关联关系,也就不会有并发问题产生。如下图所示:
输入侧消费Kafka Repartition Topic,每个Task对应的一个分区,读取到明细事件数据,根据自定义的聚合规则实现Reducer接口,广告此处聚合规则就是做求和操作,对相同Key的下发数、曝光数、点击数、花费进行求和。Repartition的过程保证了相同Key的数据只会被一个Task处理,求和过程规避了并发问题。Kafka Streams的State Store里持久化维护了一份临时数据结果,聚合结果发生变化的数据会发送到输出侧。如下图所示:
流式数据是在时间上无界的数据,而聚合操作只能作用在有界的数据集上。因此需要从无界的数据集上按特定的规则选取出有界的数据。窗口是一种很常用的设定计算边界的方式,不同的流式处理系统都有窗口机制。Kafka Streams提供了时间窗口windowedBy控制数据流聚合的时间,支持很多种类的时间窗口,此处我们采用了固定时间窗口的模式。提取事件时间戳作为窗口时间,时间戳在同一分钟下的事件归属在同一个时间窗口下,对于延迟抵达的事件保留一定的时间延迟容忍度,窗口关闭时的数据结果作为这一分钟的最终结果。
样例代码如下(代码中获取字段采用中文名是为了便于上下文理解并非实际代码如此):
final KStream<String, GenericRecord> stream = builder
.stream(sourceTopic);
Serde<TitanMetrics> titanMetricsSerde = StreamsSerdes.titanMetricsSerde();
KTable<Windowed<String>, TitanMetrics> metricsKTable = stream
//过滤非法广告事件
.filter((k, v) -> {
if (v.get("事件类型") == null || v.get("事件时间戳") == null) {
log.warn("consume invalid titan metrics message, k: {}, v: {}", k, v);
return false;
}
return true;
})
//提取字段
.flatMap((k, v) -> {
List<KeyValue<String, TitanMetrics>> result = new LinkedList<>();
String sliceId = String.valueOf(v.get("广告计划ID"));
String event = v.get("事件类型").toString();
Integer priceMode = (Integer) v.get("出价类型");
Long biddingPrice = (Long) v.get("出价价格");
TitanMetrics sm = TitanMetrics.transformSliceMetrics(sliceId, event, priceMode, biddingPrice);
if (sm != null) {
result.add(new KeyValue<>(sm.generateMetricsKey(), sm));
}
return result;
})
//以key进行GroupBy,写入指定的RepartitionTopic
.groupByKey(Grouped.with(partOfRepartitionTopic, Serdes.String(), titanMetricsSerde))
//固定时间窗口1分钟,窗口接受最大延迟4分钟的数据,否则丢弃
.windowedBy(TimeWindows.of(Duration.ofSeconds(60L)).grace(Duration.ofMinutes(4L)))
.reduce(TitanMetrics::sum,
//写入指定的ChangelogTopic
Materialized.<String, TitanMetrics, WindowStore<Bytes, byte[]>>as(partOfReduceChangelogTopic));
Kafka Streams的时间计算窗口会持续输出发生变更的数据结果。当新数据到来,进行聚合计算后得到新结果后会立即发送给下游,所以下游得到的是持续更新变化的数据结果。但我们期望写入MySQL的是一个分钟级的最终数据结果,而不是不断去Update,这么做会大幅减轻MySQL写入压力。Kafka Streams提供了suppress机制,可以suppress中间结果,当窗口关闭时才发送最终结果数据给下游。
数据查询时仍然需要获取当前的最新结果,持续更新的数据结果是必要的,如下图所示,拆分了两条并行流同时处理,一条流持续更新变化的持久化至Redis,另一条流只产出最终数据结果持久化至MySQL。
示例代码如下:
//最终数据结果流
metricsKTable.suppress(Suppressed.untilWindowCloses(unbounded()).withName(partOfSuppressChangelogTopic)).toStream()
.map((window, v) -> {
Window w = window.window();
v.setLast(true);
v.setStartTime(w.start());
v.setEndTime(w.end());
return KeyValue.pair(window.key() + ":" + w.start(), v);
})
.to(targetTopic, Produced.with(Serdes.String(), titanMetricsSerde));
//持续更新的数据结果流
metricsKTable.toStream()
.map((window, v) -> {
Window w = window.window();
v.setLast(false);
v.setStartTime(w.start());
v.setEndTime(w.end());
return KeyValue.pair(window.key() + ":" + w.start(), v);
})
.to(targetTopic, Produced.with(Serdes.String(), titanMetricsSerde));
Kafka Streams基于Kafka(版本大于0.11.0)提供的exactly-once(恰好一次)语义与事务机制来保证消息数据在多个Topic之间流转时的可靠传输。
Kafka的事务机制实现了
在Kafka这样的可靠保证下,才有了Kafka Streams的端到端exactly-once的可靠传输。
流量高峰期,计费服务查询QPS维持在5000左右,总平均响应耗时在3ms以内,99线在13ms左右。如图所示:
服务请求量:
平均响应时间和分位数:
单机POD容器CPU使用:
广告的根本目的是广告主通过媒体达到低成本的用户接触。收钱吧广告主要围绕支付后的流量场景,具备低成本触达支付后用户的能力,但用户支付后停留时间往往很短暂,利用精准营销,在第一时间吸引住用户的眼球,才能不断提高广告的转化效果,提升eCPM(千次展示期望收入,计算广告中最为核心的量化指标之一),创造更大的价值。广告计费系统在广告投放、展示、点击、转化等环节进行在线数据计算,再由数据驱动投放决策,为一系列用户与上下文的组合找到最合适的广告投放策略以优化整体广告活动的利润。广告计费系统作为广告的基础能力服务之一,只有满足数据准确、低延迟、高可用等要求,才能为精准营销铺一条宽阔的探索之路。
倪天熠,来自增值业务开发部