一、背景
另一方面解决抢占问题,提高开发效率。在多业务线多需求同时开发时,都需要各自一套环境隔离,相互不造成影响,但测试环境的机器是有限,就会造成大家相互抢占,或者共用同一环境,造成测试效率降低。因此多泳道方案提出一种规范,收回开发人员部署主泳道权限,由系统定期自动部署最新稳定代码,而开发人员只在分支泳道部署变动服务,其余全部依赖主泳道中服务,减少部署量和资源浪费,由于分支泳道非常轻量,可以非常快捷创建和销毁。
架构主要包括三部分,网关层,RPC 层和数据层。
网关层主要负责环境识别与环境标识注入,前台通过测试域名隔离环境,例如:b2.missfresh.net/xx,请求到网关后解析出环境标识 b2,然后植入 HTTP Header 中,往下透传。
RPC 层主要负责服务发现与选择,环境标识透传等。通过服务发现找到对应环境下服务,再通过自定义路由策略选择指定服务执行,并把环境标识继续透传给下游。
数据层主要负责测试环境数据隔离与共用。
逻辑结构主要分为主泳道和分支泳道。
主泳道部署全链路稳定代码,作为公共环境,承载其他环境缺省服务,保证请求链路通畅。
分支泳道只需部署改动服务,未改动服务使用主泳道中服务,比如底层的商品,库存等服务,发号器,推送等组件,目的是减少公共服务的维护成本,提高使用效率等。
根据以上流程,需要对组件做一些改动:
网关新增环境标识注入。测试环境使用开源的流量网关(Kong),然后额外定制一个插件,解析域名并把环境标识注入到 HTTP Header 中,往下透传。
链路标识的透传。使用开源的链路追踪系统(Pinpoint),新增或者增强链路透传插件,把链路标识透传下去。选择 Pinpoint 是因为它以 JavaAgent 方式嵌入到服务中,对服务是无感知的,可以结合部署系统做到无感知升级,比使用 SDK 方式更友好。
服务的感知。共用一套 Zookeeper,保证各泳道服务被及时发现,各泳道服务注册时带上环境标识。
服务的选择。使用 Dubbom 新增的路由策略,根据服务自身环境标识和链路中的环境标识做匹配选择。
数据存储。不同的需求需要隔离级别不同,如果多环境共用底层数据,则代码中使用域名配置数据库,由 DNS 服务指向同一套数据库,例如:配置 b2.mysql.missfresh.net 与 b15.mysql.missfresh.net 域名指向同一实例 IP;如果多套环境隔离底层数据,则 MySQL,Redis 需要封装一套 SDK,通过环境标识把数据写到不同的库或者实例,RocketMQ 则需要封装一套 SDK,通过环境标识把消息发往不同的队列,ElasticSearch 则可以在上一层封装一套网关,通过网关的路由功能转发到不同索引或者实例。
dubbo://127.0.0.1:10080/com.missfresh.xxxxService?anyhost=true&application=mryx&bean.name=ServiceBean:com.missfresh.xxxxService:1.0&default.dispatcher=message&default.service.filter=notice&default.threadpool=fixed&default.threads=300&default.timeout=1000&dubbo=2.0.2&interface=com.missfresh.mpush.xxxxService&logger=slf4j&methods=xxxx&pid=1®istry=127.0.0.1:2181&revision=1.0.0&side=provider×tamp=1622925206798&version=1.0&zone=b2
/**
* 根据当前环境生成唯一标识
* @return {当前环境}-{是否基准环境}-{PID}-{自增保证唯一}
*/
public static String genInstanceName() {
String instanceName = String.valueOf(UtilAll.getPid()) + SPLIT + COUNT.incrementAndGet();
instanceName = (Boolean.TRUE.toString().equalsIgnoreCase(benchmark) ? "1" : "0") + SPLIT + instanceName;
instanceName = (StringUtils.isNotEmpty(zone) ? zone : DEFAULT_ZONE) + SPLIT + instanceName;
return instanceName;
}
protected List<MessageQueue> groupByZone(List<MessageQueue> mqs) {
// 优先从链路中获取环境标识
String zone = Extractor.getGray();
List<MessageQueue> localQueueList = new ArrayList<>(mqs.size());
List<MessageQueue> benchmarkQueueList = new ArrayList<>(mqs.size());
for (MessageQueue messageQueue : mqs) {
String[] brokerNameArray = messageQueue.getBrokerName().split(MryxConfig.SPLIT);
String queuePrefix = brokerNameArray[0];
if (zone.equalsIgnoreCase(queuePrefix)) {
// 当前环境队列
localQueueList.add(messageQueue);
} else if (brokerNameArray.length > 2 && RocketMQConfig.IS_BENCHMARK.equals(brokerNameArray[1])) {
// 基准环境队列
benchmarkQueueList.add(messageQueue);
}
}
if (!localQueueList.isEmpty()) {
return localQueueList;
}
if (!benchmarkQueueList.isEmpty()) {
return benchmarkQueueList;
}
return mqs;
}
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
// 根据messageQuery中brokerName的环境标识分组
Map<String/*machine zone */, List<MessageQueue>> mr2Mq = new TreeMap<>();
for (MessageQueue mq : mqAll) {
String brokerMachineZone = machineRoomResolver.brokerDeployIn(mq);
mr2Mq.putIfAbsent(brokerMachineZone, new ArrayList<>());
mr2Mq.get(brokerMachineZone).add(mq);
}
// 根据clientId的环境标识分组
Map<String/*machine zone */, List<String/*clientId*/>> mr2c = new TreeMap<>();
// 基准环境的clientId
List<String> benchmarkClientIds = new ArrayList<>();
for (String cid : cidAll) {
String consumerMachineZone = machineRoomResolver.consumerDeployIn(cid);
mr2c.putIfAbsent(consumerMachineZone, new ArrayList<>());
mr2c.get(consumerMachineZone).add(cid);
if (machineRoomResolver.consumerIsBenchmark(cid)) {
benchmarkClientIds.add(cid);
}
}
List<MessageQueue> allocateResults = new ArrayList<>();
// 1、匹配同机房的队列
String currentMachineZone = machineRoomResolver.consumerDeployIn(currentCID);
List<MessageQueue> mqInThisMachineZone = mr2Mq.remove(currentMachineZone);
List<String> consumerInThisMachineZone = mr2c.get(currentMachineZone);
if (mqInThisMachineZone != null && !mqInThisMachineZone.isEmpty()) {
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineZone, consumerInThisMachineZone));
}
// 寻找没有匹配上zone的MessageQueueList
for (String machineZone : mr2Mq.keySet()) {
if (mr2c.containsKey(machineZone)) {
continue;
}
// 2、如果存在基准环境consumer,则把没有消费者的messageQueue分配给基准环境
if (!benchmarkClientIds.isEmpty()) {
if (machineRoomResolver.consumerIsBenchmark(currentCID)) {
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineZone), benchmarkClientIds));
}
} else {
// 3、如果没有基准环境,则没有消费者的messageQueue再次分配给consumer
allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineZone), cidAll));
}
}
return allocateResults;
}