2. 系统架构
注:所有节点都是有状态的。
FE(Frontend)负责管理元数据,管理客户端连接,进行查询规划、查询调度等工作。
Follower
Leader:Follower会通过类Paxos的BDBJE协议选主出一个Leader,所有事务的提交都是由Leader发起,并完成;
3. 存储架构
在StarRocks里,一张表的数据会被拆分成多个Tablet,而每个Tablet都会以多副本的形式存储在BE节点中,如下图:
Table数据划分 + Tablet三副本的数据分布:
StarRocks支持Hash分布、Range-Hash的组合数据分布(推荐)。
为了等到更高的性能,强烈建议使用Range-Hash的组合数据分布,即先分区后分桶的方式。
分区机制:高效过滤,提升查询性能。
分区类似分表,是对一个表按照分区键进行分割,可以按照时间分区,根据数据量按照天/月/年划分等等。可以利用分区裁剪对少数访问量,也可以根据数据的冷热程度把数据分到不同介质上。
分桶机制:充分发挥集群性能,避免热点问题。
使用分桶键Hash以后,把数据均匀的分布到所有的BE上,不要出现bucket数据倾斜的情况,分桶键的选择原则就是高基数的列或者多个列组合成为一个高基数的列,尽量将数据充分打散。
注:Bucket数量的需要适中,如果希望充分发挥性能可以设置为:BE数量 * CPU core/2,最好tablet控制在1GB左右,tablet太少并行度可能不够,太多可能远数据过多,底层scan并发太多性能下降。
Segment:如果一个Rowset数据量比较大,则拆分成多个Segment数据断落盘。
4. 需求背景
指标工厂服务主要面向业务人员,通过对业务指标的采集和处理,实时反映产品状态,为运营提供数据支撑、检测产品漏洞或服务异常、提供指标异常告警功能等。
业务指标埋点方式多样,并不局限于某种方式,只要符合埋点标识明确、业务参数丰富、数据满足可解析的基本要求皆可作为数据源,大致可以分为:SDK、MySQL BinLog、业务日志、阿里云ODPS数据分析。
各种监控图、报表展示、业务实时查询等,即较高的并非查询。
埋点数据数据量巨大,且对明细数据不要求溯源,直接做聚合计算,比如计算PV、UV场景;
数据可以通过配置动态分区来配置过期策略。
数据可以通过配置动态分区来配置过期策略。
需要实时的数据写入场景,我也沿用了业内流行的解决方案,即数据采集到 Kafka 之后,使用Flink做实时写入到StarRocks。StarRocks提供了非常好用的Flink-connector插件。
小tips:
1. 虽然StarRocks已经很好的优化了写入性能,当写入压力大,仍会出现写入拒绝,建议可适当增大单次导入数据量,降低频率,但同时也会导致数据落库延迟增加。所以需要做好一定的取舍,做到收益最大化。
2. Flink的sink端不建议配置过大,会引起并发事务过多的报错,建议每个flink任务source可以配置多些,sink的连接数不能过大。
内部系统业务看板,主要服务于全公司员工,提供项目及任务跟踪等功能。
当初数据库选型时,结合业务特点,用户需要动态、灵活的增删记录自己的任务,因而选择了JOSN 模型减少了应用程序代码和存储层之间的阻抗,选择MongoDB作为数据存储。
伴随着公司快速快发,当需要报表展示,特别是时间跨度比较大,涉及到多部门、多维度、细粒度等报表展示时,查询时间在MongoDB需要执行10s甚至更久。
调研了StarRocks、ClickHouse两款都是非常优秀的分析型数据库,在选型时,分析了业务应用场景,主要集中在单表聚合查询、多表关联查询、实时更新读写查询。维度表更新频繁,即存储在MySQL中,StarRocks比较好的支持外表关联查询,很大程度上降低了开发难度,最终决定选用StarRocks作为存储引擎。
改造前,MongoDB查询,写法复杂,多次查询。
db.time_note_new.aggregate(
[
{'$unwind': '$depart'},
{'$match': {
'depart': {'$in': ['部门id']},
'workday': {'$gte': 1609430400, '$lt': 1646064000},
'content.id': {'$in': ['事项id']},
'vacate_state': {'$in': [0, 1]}}
},
{'$group': {
'_id': '$depart',
'write_hour': {'$sum': '$write_hour'},
'code_count': {'$sum': '$code_count'},
'all_hour': {'$sum': '$all_hour'},
'count_day_user': {'$sum': {'$cond': [{'$eq': ['$vacate_state', 0]}, 1, 0]}},
'vacate_hour': {'$sum': {'$cond': [{'$eq': ['$vacate_state', 0]}, '$all_hour', 0]}},
'vacate_write_hour': {'$sum': {'$cond': [{'$eq': ['$vacate_state', 0]}, '$write_hour', 0]}}}
-- ... more field
},
{'$project': {
'_id': 1,
'write_hour': {'$cond': [{'$eq': ['$count_day_user', 0]}, 0, {'$divide': ['$vacate_write_hour', '$count_day_user']}]},
'count_day_user': 1,
'vacate_hour': 1,
'vacate_write_hour': 1,
'code_count': {'$cond': [{'$eq': ['$count_day_user', 0]}, 0, {'$divide': ['$code_count', '$count_day_user']}]},
'all_hour': {'$cond': [{'$eq': ['$count_day_user', 0]}, 0, {'$divide': ['$vacate_hour', '$count_day_user']}]}}
-- ... more field
}
]
)
改造后,直接兼容SQL,单次聚合。
WITH cont_time as (
SELECT b.depart_id, a.user_id, a.workday, a.content_id, a.vacate_state
min(a.content_second)/3600 AS content_hour,
min(a.write_second)/3600 AS write_hour,
min(a.all_second)/3600 AS all_hour
FROM time_note_report AS a
JOIN user_department AS b ON a.user_id = b.user_id
-- 更多维表关联
WHERE b.depart_id IN (?) AND a.content_id IN (?)
AND a.workday >= '2021-01-01' AND a.workday < '2022-03-31'
AND a.vacate_state IN (0, 1)
GROUP BY b.depart_id, a.user_id, a.workday, a.content_id,a.vacate_state
)
SELECT M.*, N.*
FROM (
SELECT t.depart_id,
SUM(IF(t.content_id = 14, t.content_hour, 0)) AS content_hour_14,
SUM(IF(t.content_id = 46, t.content_hour, 0)) AS content_hour_46,
-- ...more
FROM cont_time t
GROUP BY t.depart_id
) M
JOIN (
SELECT depart_id AS join_depart_id,
SUM(write_hour) AS write_hour,
SUM(all_hour) AS all_hour
-- 更多指标
FROM cont_time
GROUP BY depart_id
) N ON M.depart_id = N.join_depart_id
ORDER BY depart_id ASC
以查询报表2021/01/01~2022/03/01之间数据对比:
StarRocks: 1次查询聚合,可完全通过复杂SQL聚合函数计算,耗时 295ms
Mongodb: 需分2次查询+计算,共耗时3s+9s=12s
在使用StarRocks时遇到的一些报错和解决方案(网上资料较少的报错信息):
原因:超过了每个数据库中正在运行的导入作业的最大个数,默认值为100。可以通过调整max_running_txn_num_per_db参数来增加每次导入作业的个数,最好是通过调整作业提交批次。即攒批,减少并发。
if [[ $(ulimit -n) -lt 60000 ]]; then
ulimit -n 65535
fi
c. StarRocks 支持使用 Java 语言编写用户定义函数 UDF,在执行函数报错:“rpc failed, host: x.x.x.x”,be.out日志中报错:
start time: Tue Aug 9 19:05:14 CST 2022
Error occurred during initialization of VM
java/lang/NoClassDefFoundError: java/lang/Object
原因:在使用supervisor管理进程,需要注意增加JAVA_HOME环境变量,即使是BE节点也是需要调用Java的一些函数,也可以直接将BE启动脚本增加JAVA_HOME环境变量配置。
SQL > delete from tableName partition (p20220809,p20220810) where `c_time` > '2022-08-09 15:20:00' and `c_time` < '2022-08-10 15:20:00';
ERROR 1064 (HY000): Where clause only supports compound predicate, binary predicate, is_null predicate and in predicate
kafka log-4-FAIL, event: [thrd:x.x.x.x:9092/bootstrap]: x.x.x.x:9092/1: ApiVersionRequest failed: Local: Timed out: probably due to broker version < 0.10 (see api.version.request configuration) (after 10009ms in state APIVERSION_QUERY)
接下来我们会有更多业务接入 StarRocks,替换原有 OLAP 查询引擎;运用更多的业务场景,积累经验,提高集群稳定性。未来希望 StarRocks 优化提升主键模型内存占用,支持更灵活的部分列更新方式,持续优化提升 Bitmap 查询性能,同时优化多租户资源隔离。今后我们也会继续积极参与 StarRocks 的社区讨论,反馈业务场景。
*文/沈睿