相对于单例数据库的查询操作,分布式数据查询会有很多技术难题。本文记录 Mysql 分库分表 和 Elasticsearch Join 查询的实现思路,了解分布式场景数据处理的设计方案。
/**
* 执行查询 SQL 切入点,从这里可以完整 debug 执行流程
* @see ShardingPreparedStatement#execute()
* @see ParsingSQLRouter#route(String, List, SQLStatement) Join 查询实际涉及哪些表,就是在路由规则里匹配得出来的。
*/
public boolean execute() throws SQLException {
try {
// 根据参数(决定分片)和具体的SQL 来匹配相关的实际 Table。
Collection<PreparedStatementUnit> preparedStatementUnits = route();
// 使用线程池,分发执行和结果归并。
return new PreparedStatementExecutor(getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits).execute();
} finally {
JDBCShardingRefreshHandler.build(routeResult, connection).execute();
clearBatch();
}
}
# 打印的代码,就是在上述route 得出 ExecutionUnits 后,打印的
sharding.jdbc.config.sharding.props.sql.show=true
-- 参数不明,不能定位分片的情况
select * from order o inner join order_item oi on o.order_id = oi.order_id
-- 路由结果
-- Actual SQL: db1 ::: select * from order_1 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db1 ::: select * from order_1 o inner join order_item_0 oi on o.order_id = oi.order_id
-- Actual SQL: db1 ::: select * from order_0 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db1 ::: select * from order_0 o inner join order_item_0 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_1 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_1 o inner join order_item_0 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_0 o inner join order_item_1 oi on o.order_id = oi.order_id
-- Actual SQL: db0 ::: select * from order_0 o inner join order_item_0 oi on o.order_id = oi.order_id
二、Elasticsearch Join 查询场景
/**
* Execute the ActionRequest and returns the REST response using the channel.
* @see ElasticDefaultRestExecutor#execute
* @see ESJoinQueryActionFactory#createJoinAction Join 算法选择
*/
public void execute(Client client, Map<String, String> params, QueryAction queryAction, RestChannel channel) throws Exception{
// sql parse
SqlElasticRequestBuilder requestBuilder = queryAction.explain();
// join 查询
if(requestBuilder instanceof JoinRequestBuilder){
// join 算法选择。包括:HashJoinElasticExecutor、NestedLoopsElasticExecutor
// 如果关联条件为等值(Condition.OPEAR.EQ),则使用 HashJoinElasticExecutor
ElasticJoinExecutor executor = ElasticJoinExecutor.createJoinExecutor(client,requestBuilder);
executor.run();
executor.sendResponse(channel);
}
// 其他类型查询 ...
}
EXPLAIN FORMAT=JSON
SELECT * FROM
sale_line_info u
JOIN sale_line_manager o ON u.sale_line_code = o.sale_line_code;
{
"query_block": {
"select_id": 1,
// 使用的join 算法:nested_loop
"nested_loop": [
// 涉及join 的表以及对应的 key,其他的信息与常用explain 类似
{
"table": {
"table_name": "o",
"access_type": "ALL"
}
},
{
"table": {
"table_name": "u",
"access_type": "ref"
}
}
]
}
}
Elasticsearch Nested类型
[1] 如何在分布式数据库中实现 Hash Join:https://zhuanlan.zhihu.com/p/35040231
[2]