cover_image

Flink+Hologres在网校策略算法的实践和应用

王振宇 学而思网校技术团队
2021年09月10日 10:28

一、业务背景及传统架构分析

01 背景

网校的服务策略团队,专注于学员分班、师资调度、客服机器人等算法方向,该类业务场景下,需要实时获取用户的行为特征,通常是将行为日志以及相关数据库的Binlog写入kafka,再通过Flink消费Kafka数据产生实时行为特征或者统计指标后提供交互,这个过程中需要做几件事情,比如Preprocessing(预处理),Pre-aggregated(预聚合),在线训练过程中还需要关联一些维表或者聚合特征,这些特征可能会全量加载到计算节点里面,也有可能需要历史数据二次计算,就需要一个实时的OLAP平台和高并发的点查服务,形成一个交互过程,最后将实时产生的特征推到算法模块中。这个过程难点在于确定一个既可以提供实时的OLAP还能提供高并发点查服务数据库。

图片

02 传统架构分析

目前常用的大数据架构有Lambda架构和Kappa架构,两者在流处理方面多选用Flink、Storm作为实时计算框架,ClickHouse或者Druid等能构建index或者通过向量化计算的OLAP引擎做实时分析计算,通过Hbase提供点查询服务。但是有些场景又需要基于历史数据做全量分析,Lambda架构则把实时数据离线归档至Hive中,在Hive中做历史数据分析,Kappa架构则通过消息中间件缓存历史数据,需要分析历史数据时再次经过消息队列重播一次。


Lambda 架构

Lambda架构原始数据都是一个源头,例如用户行为日志、Binlog等,分别走了两条链路:一条是实时链路,通过流计算处理,把数据写入实时的存储系统;另一条链路就是离线链路,最典型的就是将数据写入至Hdfs,再通过查询层如Hive、Spark Sql对数据做分析处理。
Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。

图片

● 批处理层使用可处理大量数据的分布式处理系统预先计算结果,通过处理所有的已有历史数据来实现数据的准确性,输出通常存储在只读数据库中,数据结构一般不做改变,只是追加数据。批处理还负责创建和维护批处理视图(比如我们常做的Hive ETL ,统计一些数据,最后将结果保存在Hive表中或者数据库中,就属于批处理层)。

● 速度层通过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整,但它们几乎在收到数据后立即使用。而当同样的数据在批处理层处理完成后,在速度层的数据就可以被替代掉了。

● 所有在批处理层和速度层处理完的结果都输出存储在服务层中,服务层通过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询。


Lambda架构的每个存储产品都是一个数据孤岛,比如需要点查询服务,数据会在HBase里面存储一份;为了快速分析计算,数据需要在Druid或者Clickhouse里面存一份;为了离线计算又需要在Hive里面存储一份,这样带来的问题就是:

● 数据将会存储在多个系统中,增加冗余。

● 每个系统的数据格式不一致,数据需要做转换,增加维护成本,尤其是当业务到达一定量级时,维护成本剧增。

● 多个系统之间需要完全打通,不同的产品有不同的开发方式。


Kappa架构

为了降低开发和维护成本,在改进Lambda 架构的方案上提出了Kappa架构,Kappa架构删除了批处理层(Batch Layer),将数据通道以消息队列进行替代。对于Kappa架构来说,以流处理为主,但是数据却在数据湖层面进行了存储,当需要进行离线分析或者再次计算的时候,将数据湖的数据再次经过消息队列重播一次则可。

图片


Kappa架构的优点在于将实时和离线代码统一起来,方便维护而且统一了数据口径。而Kappa的缺点也很明显:

● 消息中间件缓存的数据量和回溯数据有性能瓶颈。算法一般需要使用180天内的数据,如果都存在消息中间件,无疑有非常大的压力。同时,一次性回溯订正180天级别的数据,对实时计算的资源消耗也非常大。

● 在实时数据处理时,遇到大量不同的实时流进行关联时,非常依赖实时计算系统的能力,很可能因为数据流先后顺序问题,导致数据丢失。


综上所述,Lambda架构保证了离线计算的稳定性,但双系统的维护成本高且两套代码也带来后期维护困难。Kappa架构在抛弃了离线数据处理模块的同时也抛弃了离线计算更加稳定可靠的特点。

二、Flink+Hologres实践

以网校行为特征生产场景为例,实时计算用户近一个月的行为特征,同时确保离线和实时数据一致性,传统的Lambda、Kappa架构满足这个场景是比较费力的,通过多方面调研,最后选型Flink+Hologres,Flink作流批一体计算引擎,Hologres作为OLAP引擎并提供点查服务,即日志采集至Kafka,Flink消费Kafka数据做实时预处理(比如实时ETL或者实时训练等),把处理的结果直接写入Hologres,Hologres提供维表关联点查、结果缓存、复杂实时交互、离线查询和联邦查询等,这样整个链路只需要通过Hologres来做唯一的数据入口和查询服务,无需对接其他系统。

图片

01 为什么选用Hologres

Hologres作为一款兼容PostgreSQL 11协议的一站式实时数仓,与大数据生态无缝打通,支持PB级数据高并发、低延时的分析处理,并且拥有同于Hbase的高并发写入、高性能点查等特性,又有类似Mysql管理便捷、查询简易等好处,还可以作为OLAP多维分析的数据库,在复杂的业务场景中Hologres的优势就表现得极为突出了。

02 开发流程

目前网校点击日志日均量级为亿级别,实时行为特征需要计算近30天的数据,但是实时计算30天的数据需要很多的资源且计算时间也较久,因此先单独聚合计算前29天的历史数据,再结合当天的实时数据产出30天聚合行为特征,以此来缩短计算时间并减少资源占用,最后通过T-Service平台提供服务,离线数据通过Airflow Datax Operator定时同步到Hive。

图片

数据接入

网校App端行为日志,正常日均量在1亿条左右,峰值时数据量大,单条实时写入速度不够,形成数据积压,造成延时消费,考虑批次写入,采用滚动窗口(WindowAll Tumbling)批次统计,窗口大小设置为10ms。

//窗口计算部分代码SingleOutputStreamOperator<List<ClickFeature>> apply = flatMap                .timeWindowAll(Time.milliseconds(10)) //10ms窗口                .apply(new AllWindowFunction<ClickFeature, List<ClickFeature>, TimeWindow>() {
//重写apply方法过滤空数据 @Override public void apply(TimeWindow timeWindow, Iterable<ClickFeature> iterable, Collector<List<ClickFeature>> collector) throws Exception { ArrayList<ClickFeature> clickFeatures = Lists.newArrayList(iterable); if (clickFeatures.size() > 0) { collector.collect(clickFeatures); } }

数据处理

App行为日志中有一部分数据是没有对应的埋点信息,针对此类数据埋点类型赋予默认值,其他数据按照埋点类型和数据量分为三部分(学习中心相关埋点、我的界面相关埋点和其他类型埋点)方便后期计算,减少不必要的聚合操作。


批量写入

Hologres 提供了实时写⼊服务 Holohub

1、Maven Pom

<dependency>  <groupId>com.aliyun.datahub</groupId>  <artifactId>aliyun-sdk-datahub-holo</artifactId>            <version>2.15.0-SNAPSHOT</version></dependency>

2、Jdk

Jdk: >= 1.8

3、批量Sink代码实例
自定义Flink Sink方法,继承 RichSinkFunction,重写 Open、Close 和 Invoke 方法。

● 重写 Open 方法连接 Hologres

public void open(Configuration parameters) throws Exception {    //建立hologres客户端    datahubClient = buildClient(endpoint, accessId, accessKey);    //获取结果表schema信息    schema = datahubClient.getTopic(database, tableName).getRecordSchema();    getRuntimeContext().addAccumulator("outAccumulateInsert", this.outAccumulateInsert);}

● 重写 Invoke 方法进行数据写入的相关操作

public void invoke(List<ClickFeature> value, Context context) throws Exception {                 List<RecordEntry> records = new ArrayList<RecordEntry>();         for (ClickFeature value_ : value) {                        TupleRecordData recordData = new TupleRecordData(schema);             //创建Datahub RecordEntry对象                          RecordEntry dhRecord = new RecordEntry();            //自定义setValuesForTableField方法,根据结果表字段赋值            TupleRecordData resultTupleRecordData = setValuesForTableField(recordData, value_);                                    dhRecord.setRecordData(resultTupleRecordData);            records.add(dhRecord);                }                try {            //批次写入           PutRecordsResult result = datahubClient.putRecords(database, tableName, records);                        int i = result.getFailedRecordCount();           //获取写入结果,判断是否要重写           if (i > 0) {                                retry(datahubClient, result.getFailedRecords(), retryTimes, database,  tableName);                        }                }         catch (DatahubClientException e) {                        System.out.println("requestId:" + e.getRequestId() + "\tmessage:" + e.getErrorMessage());                    }           }

● Hologres重写机制

public static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String database, String tableName) {                   boolean suc = false;          //判断是否需要重写              while (retryTimes != 0) {                           retryTimes = retryTimes - 1;                  //重写数据                         PutRecordsResult recordsResult = client.putRecords(database, tableName, records);                 //判断重写状态                           if (recordsResult.getFailedRecordCount() > 0) {                                 //再次重写retry(client,recordsResult.getFailedRecords(),retryTimes,database,tableName);                               }                          suc = true;                           break;                   }                   if (!suc) {                       System.out.println("retryFailure");                   }          }

4、注意事项

⼀批写⼊数据量不能超过4M, 凑批时注意控制⼤⼩留⼀些余量。


聚合计算

计算近30天的行为数据,如果30天的全量行为数据加载到内存中计算相当耗费资源,为减少资源开销,将整个聚合计算过程按以下拆分计算:

● Join需要的维度表。

● 明细数据按照字段类型拆分,分基础信息、学科类信息(课程、学科等)、统计类信息(关注、评论次数等)。

● 30天的聚合统计按时间拆分,前29天的数据计算一次,在结合当天实时数据产出用户实时近30天行为特征。

图片

产出特征

基础类特征:登陆终端系统、登陆终端类型、用户类型、省份等。


学科类:年级、学科、课程、课程类型、学年、学期、课程难度、课程类型1、课程类型2、课程类型3、上课排期、版本、直播、讲座等。


兴趣类:参与活动、找客服、加购物车、关注、评论、分享朋友圈等。


服务部署

线上:使用Airflow PostgresOperator 定时生产分钟级行为特征,通过集团中台的T-service部署服务。


离线:使用Airflow DataxOperator T+1同步Hologres中行为特征至Hive提供离线查询。


考虑到节约Hologres资源,对明细数据只保留近三个月内的,之前历史数据定期持久化到Hdfs中。

03 结束语

通过Hologres提供OLAP分析和点查服务,避免了实时数据同步到批处理系统中做分析计算、行为特征同步到KVStore中提供点查服务等操作,Hologres自身满足多个使用场景,极大减少了开发和维护成本。目前任务整体运行稳定,服务平均响应时长在20ms左右,后期会在KV查询上进一步优化,做到10ms内响应。

图片

致力于互联网教育技术的创新和推广

扫码

关注我们

@学而思网校

技术团队 

也许你也关注

网校学研新web直播架构升级演进之路

RTMP直播和Nginx-RTMP实践

网校技术名人堂MTSC大会之星



图片

分享、在看与点赞

只要你点,我们就是胖友

图片




继续滑动看下一个
学而思网校技术团队
向上滑动看下一个