再请教一下老师哈,有没有办法限制查询 dsl 语句长度,我们公司有个系统有时会传入 10 万个字符的 query,导致集群爆掉。
系统传入的查询请求中,可能包含非常长的 DSL 查询语句,导致 Elasticsearch 集群压力过大,甚至宕机。
经过研究 Elasticsearch 官方文档,结合 grok3(2025-02-20 初步可用,昨天不能用)、chatGPT o3-mini-high、deepseek 全球最厉害的三个模型调研,Elastic 官方并没有直接的解决方案。
仅仅 Elasticsearch 的 HTTP 接口支持配置最大请求体大小,可以间接限制 DSL 查询的长度。
http.max_content_length: 10kb
修改后需要重启 Elasticsearch 节点生效。
这种方式优点是:全局生效,所有请求都会受限,无需改动客户端。
但实话说缺点也很明显:限制的是整个请求体大小,不仅限于 DSL,可能影响其他正常的大请求(比如批量索引操作)。
且不够精细,可能需要根据实际情况调整阈值。
看来,还得外部加一层,这时候想到了 Nginx 以及最近研究的极限网关产品线。
极限网关能够在请求到达 ES 集群之前,对查询语句进行拦截和检查。通过设置最大查询长度限制,防止过长的查询语句导致集群资源消耗过多。
对传入的 DSL 语句进行预处理,拆分过大的查询语句,确保每个查询的大小都在可接受范围内。
需要设置查询长度限制——在极限网关上配置请求长度限制,超过指定长度的请求将被拒绝,避免传入过长的查询。
具体参考官方文档:
https://docs.infinilabs.com/gateway/main/zh/docs/references/flow/
限制 DSL 查询语句的长度可以通过以下两种方式进行:
基于请求上下文 (_ctx.request.body_length) 限制查询的大小。
极限网关支持条件类型 range,可以用于判断请求体的长度。如果查询的 DSL 长度超过设定的范围,极限网关可以选择拒绝该请求或者进行其他处理。
假设想要限制查询 DSL 长度不超过 1,000 字节(方便举例起见,大家根据业务调整即可),可以使用如下配置:
flow:
- name: limit_dsl_length
filter:
- basic_auth:
valid_users:
elastic: changeme
- elasticsearch:
elasticsearch: prod
- if:
range:
_ctx.request.body_length:
gte: 1000
then:
- echo:
message: 'Query DSL is too large. Please reduce the size.'
repeat: 1
else:
- echo:
messsage: 'Query DSL is ok.'#<继续执行其他过滤器
router:
- name: my_router
default_flow: limit_dsl_length
上述配置的解析如下:
range 条件用来判断 _ctx.request.body_length 是否大于等于 10,00 字节。
如果请求体的长度超过了 10,00 字节,则执行 then 部分,返回一条错误信息表示查询 DSL 过大。
如果请求体的长度在可接受范围内,else 部分将继续执行其他过滤器或流程。
通过这种方式,我们可以在极限网关中限制传入请求的 DSL 查询大小,避免集群过载。
def execute_query(es, index_name, dsl_query, verbose=False):
try:
start_time = time.time()
response = es.search(
index=index_name,
body=dsl_query
)
elapsed = time.time() - start_time
if verbose:
print( f"\n{'=' * 30} Query Execution Report {'=' * 30}" )
print( f"Index: {index_name}" )
print( f"Took: {response['took']}ms (Client-side: {elapsed * 1000:.2f}ms)" )
print( f"Total Hits: {response['hits']['total']['value']}" )
print( f"Shards: Success={response['_shards']['successful']} Failed={response['_shards']['failed']}" )
print( f"First 3 results:" )
for i, hit in enumerate( response['hits']['hits'][:3] ):
print( f"\nResult {i + 1}:" )
print( f"Score: {hit['_score']}" )
print( json.dumps( hit['_source'], indent=2, ensure_ascii=False ) )
return response
except Exception as e:
print( f"\nQuery execution failed: {str( e )}" )
return None
DSL 语句咱们自行构造就可以,大一些满足给定值 1000 的上限就可以。或者把 1000 改小也可以。
9200 端口(不走极限网关)访问结果如下:
如果把配置改成 8000 端口的极限网关,那么就会出问题。
Query execution failed: Unable to deserialize as JSON: b'{"took":0,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}
Query DSL is too large. Please reduce the size.'
到此为止,说明配置生效了!也就是说可以通过极限网关限制传输字符的多少,以防止集群爆掉。
类似问题,要转换思路,找到适合业务系统的方案。
相关推荐:
更短时间更快习得更多干货!
和全球超2000+ Elastic 爱好者一起精进!
elastic6.cn——ElasticStack进阶助手
抢先一步学习进阶干货!