Apache Flink在快手实时多维分析场景的应用
如果无法正常显示,请先停止浏览器的去广告插件。
1. The application of Flink in Kuaishou realtime multidimensional analysis scenario.
2. 01
The application scenarios and cluster size of Flink in Kuaishou.
Contents
02
The application of realtime multidimensional analysis scenario.
03
SlimBase - Less IO
embedded
shared state storage.
3. The application scenarios and cluster size of Flink in
Kuaishou.
01
4. The application of realtime multidimensional analysis scenario.
Data access
Data compute
Data application
5. The application of realtime multidimensional analysis scenario.
Realtime calculate all kinds of metrics and monitor
items.
Assist business to make decision and alarm in realtime.
Data clean and split and join etc.
Data extract and transform and load.
Realtime business process.
Specific business processing: realtime scheduling.
6. The application of realtime multidimensional analysis scenario.
Realtime monitor the quality of short and live video App analysis
Realtime calculate multiple live broadcast metric, like
interruption and views and failure rate. Realtime calculate active and new and retain users
according to multi channel and platform and version.
Realtime scheduling of live video CDN
Realtime data process
Show stream join click stream of advertising data
Dispatch cdn flow ratio according to the quality of each cdn
firm.
7. The size of Flink cluster.
Cluster deployment on yarn
Offline Flink cluster is isolated by yarn label
Realtime Flink cluster is for high stability requirements
8. The application of realtime multidimensional analysis scenario.
02
9. The application of realtime multidimensional analysis scenario.
Ten billion level data volume.
Data modeling within 5 dimensions to calculate pv and uv and new and retained metrics.
Quickly graphical display metric result.
10. Technical solutions
✘
11. Technical Solutions
Learn from Kylin
precalculation
12. Content security scope
KwaiBI: self-designed BI analysis tool.
l
Configure the cube mode: specify dimension and metrics.
l
Realtime graphical display for user analysis.
Use realtime warehouse system to do data preprocessing.
Use Flink Job to calculate metrics on all cube dimensions.
Store metric results to Kudu.
13. Data Preprocessing
Use internal metadata system to provide unified schema services.
Everything is table: kafka topic, redis table, Hbase table and so on.
More than one half of data format is protobuf and json.
Use realtime data warehouse to do data cleaning and filtering.
14. Data Modeling Calculation
Data Modeling Calculation
l
Modeling: use cube or groupingset to specify dimension group.
l
Calculate metrics of hour or day accumulation unique or new or retain users.
l
Output results by fixed time interval.
Aggregate by dimension
l
Layer by Layer dimension reduction calculation.
•
Reduce data volume by upper layer results
the dag is complex
l
Two layers dimension reduction calculation
•
Divide into full dimension layer and remaining dimension layer.
•
Use the result of full dimension layer aggregation and simplifying dag graph
15. Data Modeling Calculation
Unique visitors metric calculation Example
16. Data Modeling Calculation
Dimension data skew
l
Hash same dimension to different bucket to pre aggregate.
l
Full aggregate by same dimension.
Exactly deduplication
l
Use bitmap to achieve exactly deduplication.
l
Use Dictionary Service to get long id according device Id string.
l
Custom BitmapState to divide large bitmap to multi blocks.
17. Data Modeling Calculation
New user metric calculation Example
l
Judge the user user: use async function to access external services.
l
According to new user data stream to calculate new unique user.
18. Data Modeling Calculation
Retention metric calculation Example
19. Content security scope
Kudu characteristic
l
Low latency random read and write.
l
Fast column scan.
l
Suitable for real-time interactive analysis of scene.
Storage
l
Encode by dimension.
l
Use time and dimension group and dimension value group as primary key.
l
Partition by dimension group and dimension value group and time.
20. KwaiBI Display
Use KwaiBI to interaction analysis
l
Metric display.
l
User arbitrarily choose dimensions.
21. SlimBase - Less IO
03
embedded
shared state storage
22. Challenge we meet
Training sample
Interval-Join logic ad-click-stream real-time join ad-view-stream before last 20 minutes
Data Scale: ad-view-stream 1TB+ last 20 minutes
AD Service
AD View Log
State selected: RocksDBStateBackend
Flink Interval-Join
Checkpoint interval: time is 5 min
Kwai App
AD Click Log
23. Challenge we meet
Disk IO used 70%
and 50% IO used comes from compaction
Back pressure
During the checkpoint, disk IO raised to 100%
Checkpoint time is 1~5 minutes more long then the checkpoint interval
During the checkpoint
state will produce 4x data copy IO
During big scale writing to RocksDB, disk IO will be
1TB local read + 3TB HDFS write
24. Solution
Write data into shared storage directly to avoid data copy by checkpoint
Use less IO compaction policy
•
Use SizeTieredCompaction
•
Take advantage of real-time data, use and improve FIFOCompaction
Conclusion1
Using HBase instead of RocksDB
25. Solution
Weight/Distributed
relay on zookeeper master Client and RS
Resource isolation hard
memory and cpu is shared by container
Has network cost
Light/Embedded
only a lib
Resource isolation easy memory and cpu is isolated by container
Has no network cost
Conclusion2
Has to change HBase to embedded shared storage
26. Implement
Step1: Slim HBase
Step2: Implement adapter and operation interface
Step3: Implement SlimBaseStateBackend
27. Implement -Slim HBase
Prune fat meat
•
Cut out roles of client, zookeeper and master, reserving RS only
•
Cut out: ZK
•
Reserve: Cache, Memstore
Compaction
Flusher
Gain lean meat
•
Move HFileCleaner module from master to RS
•
Implement merge interface
Fs
28. Implement -interface and adapter
Interface
•
Similar to RocksDB, logic view is two level: DB and CF
•
Basic interface: put, get, delete, merge
•
Add restore interface, which will be used to recover from snapshot
Adapter
•
A SlimBase is adapted to a namespace of HBase
•
A ColumnFamily in SlimBase is adapted to a table of HBase
29. Implement -SlimBaseStateBackend
l
Complete multi states List/Map/Value/Reduce State
l
Snapshot and Restore process changed
30. Test Conclusion
Checkpoint/Restore Time: second level
Disk IO decrease 66%
SlimDB
Disk write bytes per second decrease 50%
RocksDB
Disk Util
66%
Cpu cost decrease 33%
31. Improvement in the future
1.Implement FIFOCompaction based on OldestUnexpiredTime, goals to no disk IO.
•
FIFOCompaction: A no disk IO compaction policy based TTL
•
OUT: Set OUT=t2, means all data before t2 is expired
Put
win(t0~t2)
Del
win(t0~t2)
Put
win(t3~t5)
Del
win(t3~t5)
Put
win(t6~t8)
Put
win(t0~t2)
Put
win(t3~t5)
Put
win(t6~t8)
32. Improvement in the future
2.Use InMemoryCompaction to decrease cost of memstore flush and Compaction IO.
3.Support
to improve performance of scan
4.Support short circuit read
5.Improve HDFS replication write disk policy: Write remote replication directly to disk to increase hit rate of pagecache
33. Plan in the future
RocksDB will be replaced completely by SlimBase in all business in Kuaishou
34. THANKS