抛开业务场景聊接口性能就是耍流氓。时效预估接口依赖于很多数据源:模型基础数据、模型兜底数据、仓库数据、SPU类目数据、卖家信息数据等,如何快速批量获取到内存中进行逻辑运算,是性能提升的关键。
单个SKU时效查询都达到了76.5ms,以商详浮层页30个现货SKU时效批量查询估算,一次请求需要76.5*30=2295ms,这是不可接受的,性能提升刻不容缓。
序号 | 方案描述 | 优点 | 缺点 | 结论 |
1 | 离线处理好后刷MySQL | 现有方案,无开发成本 | 查询性能一般 | 查询性能不满足要求,不采用 |
2 | 离线处理好后刷到Redis | 查询性能好 | 数据量过大时成本较高 | 采用 |
3 | 离线处理好后刷到本地内存 | 查询性能很好 | 对数据量有限制 | 模型数据量约为15G,方案不可行 |
序号 | 查询描述 | 外部域 | 优化方案 | 原因 |
1 | 城市名称转code | TMS | 本地缓存 | 由于城市名称和code 的映射关系数据仅约20K左右,可以在应用启动时请求一次后放入本地缓存。另外城市名称和code发生变化的频率很低,通过jetcache的@CacheRefresh每隔8小时自动刷新完全满足要求 |
2 | 获取卖家信息 | 商家 | Redis缓存 | 由于得物全量卖家数据量较大,不适合放在本地缓存,且卖家信息是低频变化数据,可以采用T+1同步到Redis |
3 | 获取商品类目 | 商品 | Redis缓存 | 同样商品类目数据也是低频变化数据,采用T+1同步到Redis |
27ms仍然没法满足要求。当前的瓶颈在查询Redis上(耗时占比96%),是否可以再进一步优化?
// pipeline查询类
public class RedisBasePipelineRegister {
// 存放查到的数据
private ThreadLocal<Map<String, Object>> context = ThreadLocal.withInitial(HashMap::new);
// 查询
public void fetch(final RedisConsumers redisConsumers){
if (redisConsumers.isNotEmpty()){
List<Object> ret = redisClient.executePipelined((RedisCallback<Object>) connection -> {
connection.openPipeline();
redisConsumers.get().forEach(t -> t.accept(connection));
return null;
});
addValueToContext(ret,redisConsumers.getKeyList());
}
}
/**
* 将pipeline查到的数据存入threadlocal中
* 注意,redis读取的数据可能是空的,如果是空,会填充一个null obj,这样可以防止后面用的时候,发现thread local里面没有数据,重新查redis
*/
private void addValueToContext(List<Object> val, List<String> keys) {
Map<String, Object> t = context.get();
IntStream.range(0, keys.size())
.forEach(i -> t.put(keys.get(i), val.get(i) == null ? NULL_OBJ : val.get(i)));
}
public Object get(String key) {
return context.get().get(key);
}
}
// redis查询类
public class RedisClient {
// threadlocal没查到,再查redis(兜底)
public Object get(String key) {
Object value = Optional.ofNullable(redisBatchPipelineRegister.get(key)).orElseGet(()->
redisTemplate.opsForValue().get(key)
);
return value;
}
}
即使pipeline部分失败后,可用Redis单指令查询作为兜底。
自此,进入并发区域。
Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务的结果合并成总的计算结果。ForkJoinPool的工作窃取是指在每个线程中会维护一个队列来存放需要被执行的任务。当线程自身队列中的任务都执行完毕后,它会从别的线程中拿到未被执行的任务并帮助它执行,充分利用多核CPU的优势。下图为ForkJoinPool执行示意:
优势区域 | 实际分析 | 结论 | |
ForkJoinPool | ForkJoinPool能用使用数据有限的线程来完成非常多的父子关系任务。由于工作窃取机制,在多任务且任务分配不均情况具有优势。 | 1.不存在父子关系任务。 2.获取不同出价类型的时效RT相近,不存在任务分配不均匀情况。 | 不采用 |
ThreadPoolExecutor | ThreadPoolExecutor不会像ForkJoinPool一样创建大量子任务,不会进行大量GC,因此单线程或任务分配均匀情况下具有优势。 | 采用 |
选定ThreadPoolExecutor后,需要考虑如何设计参数。根据实际情况分析,交易请求时效QPS峰值为1000左右,而我们一个请求一般会拆分3~5个线程任务,不考虑机器数的情况下,每秒任务数量:taskNum = 3000~5000。单个任务耗时taskCost = 0.01s 。上游容忍最大响应时间 responseTime = 0.015s。
1)核心线程数 = 每秒任务数 * 单个任务耗时 corePoolSize = taskNum * taskCost = (3000 ~ 5000) * 0.01 = 30 ~ 50,取40 2)任务队列容量 = 核心线程数 / 单个任务耗时 * 容忍最大响应时间 queueCapacity = corePoolSize / taskCost * responseTime = 40 / 0.01 * 0.015 = 60 3)最大线程数 = (每秒最大任务数 - 任务队列容量)* 每个任务耗时 maxPoolSize = (5000 - 60) * 0.01 ≈ 50
按单机300QPS(高于预估峰值QPS两倍左右)进行压测,接口性能和线程池运行状态均满足。
// 并行时效预估类
public class ConcurrentEstimateCaller {
// 自定义线程池
private Executor executor;
// 时效预估策略工厂
private EstimateStrategyFactory estimateStrategyFactory;
//存放异步返回的结果,KEY为出价类型,VALUE为对应的时效结果
private ConcurrentHashMap<String, CompletableFuture<List<PromiseEstimateRes>>> futures = new ConcurrentHashMap<>();
// 提交并行任务
public ConcurrentEstimateCaller submit(PromiseEstimateAggreRequest request) {
for (String scene : request.getMap().keySet()) {
futures.put(scene, CompletableFuture.supplyAsync(() -> {
EstimateStrategy estimateStrategy = estimateStrategyFactory.getStrategy(scene);
if (estimateStrategy != null) {
Result<PromiseEstimateBatchResponse> tmp =
estimateStrategy.promiseEstimateBatch(EstimateAggreConvertor.INSTANCE.convertBatchRequest(request, scene));
if (Result.SUCCESS_CODE.equals(tmp.getCode())) {
return tmp.getData().getEstimateRes();
}
}
return null;
}, executor));
}
}
// 指定时间内等待和获取所有子任务返回结果
public Map<String, List<PromiseEstimateRes>> join(long timeout, TimeUnit unit) throws Exception {
// 等待所有的子任务执行完成
CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[]{})).get(timeout, unit);
Map<String, List<PromiseEstimateRes>> res = new HashMap<>();
for (Map.Entry<String, CompletableFuture<List<PromiseEstimateRes>>> entry : futures.entrySet()) {
if (entry.getValue().get() != null) {
res.put(entry.getKey(), entry.getValue().get());
}
}
return res;
}
}
*文/Yang