👆 这是第 345 篇不掺水的原创,想要了解更多,请戳下方卡片关注我们吧~
众所周知,作为一个出色的分布式消息中间件,RocketMQ 在全球范围内获得了广泛的应用,那么作为一个分布式消息中间件,最重要的是什么?
协议?持久化?消息分发实现?高可用?高可靠?
好的协议可以保证通讯的稳定,持久化可以保证数据的存储,消息分发实现可以结合多场景加速业务,高可用可以保证业务大量运行,高可靠可以保证业务的持续运行。
今天,我们想谈一谈 RocketMQ 的高可用机制
主从复制方式(非对称):这种方式下,一主多从的架构被广泛应用。消息中间件的写入操作只能在主节点上进行,而读取操作在所有节点上都可以执行。主节点负责数据的同步复制到所有从节点上,当主节点出现故障时,一个备份从节点会自动地竞选为新的主节点,以保证系统的持续运行。这种方式下,主节点和从节点之间的延时可能会导致数据不一致或者消息丢失。
对等网络方式(对称):这种方式下,每个节点都可以承担读写操作的任务,数据可以通过节点之间的同步来保持一致性。每个节点的能力相同,不存在主从的概念。当一个节点出现故障时,其他节点会自动接管它的任务,因此系统没有单点故障。这种方式下,网络的复杂性和节点间的通信有可能造成性能瓶颈。
在4.5版本之前,RocketMQ 不支持节点的自动晋升,那么如果主节点挂了,未消费的数据会从从节点上被继续消费,但是这一组节点就失去了作用,无法再被写入,若集群中主节点数量较少,可能会引起故障,于是在4.5版本,升级支持 DLedger 模式完成自动选主。
我们现在看一下 DLedger 如何实现自动选主。
首先,我们要明白 DLedger 模式的本质便是使用了 Raft 算法,接着我们来看一下 RocketMQ 的代码实现。在启动 broke r时,会将 CommitLog 转换为 DLedgerCommitLog 类型,并添加 DLedgerRoleChangeHandler 处理器,那么我们来看看 CommitLog 与 DLedgerCommitLog 的区别
commitLog 是 RocketMQ 最核心的数据存储。它是一个顺序写的文件,用于存储 Producer 发送的消息和 Consumer 消费的消息,也就是全部通过消息中间件传递的消息。每一个写入 commitLog 的消息都会被分配一个唯一的 offset (偏移量),用于标识该条消息在 commitLog 中的位置。
commitLog 中消息的存储格式包括消息长度、消息属性(如是否压缩、是否顺序消费、是否是事务消息等)、消息体等信息。
public class CommitLog {
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 空文件结束标识
protected final static int BLANK_MAGIC_CODE = -875286124;
// 文件队列,用于存储在磁盘上的消息
protected final MappedFileQueue mappedFileQueue;
// 默认的消息存储对象
protected final DefaultMessageStore defaultMessageStore;
// 用于刷盘和提交的服务
private final FlushCommitLogService flushCommitLogService;
// 如果启用了TransientStorePool,我们必须在固定的时间内将消息刷新到FileChannel
private final FlushCommitLogService commitLogService;
// 消息发送的回调函数
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
// 用于存储每个 topic 的队列
protected HashMap<String/* topi c-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
// 确认偏移量
protected volatile long confirmOffset = -1L;
// 加锁期间的起始时间
private volatile long beginTimeInLock = 0;
// 消息发送的锁,用于防止并发发送。
protected final PutMessageLock putMessageLock;
}
DLedgerCommitLog 是 RocketMQ 用作持久化存储的一种实现方式,它基于Apache DistributedLog (DLedger) 实现了高可靠、高性能的分布式日志存储,也正是它,使得 CommitLog 拥有了选举复制的能力。
public class DLedgerCommitLog extends CommitLog {
// DLedger实例
private final DLedgerServer dLedgerServer;
// DLedger的配置信息
private final DLedgerConfig dLedgerConfig;
// 用于存储mmap文件的存储类
private final DLedgerMmapFileStore dLedgerFileStore;
// mmap文件列表
private final MmapFileList dLedgerFileList;
// id标识代理角色,0表示主,其他表示从
private final int id;
private final MessageSerializer messageSerializer;
// 进入DLedger锁的开始时间
private volatile long beginTimeInDledgerLock = 0;
// 分隔旧的commitlog和DLedgerCommitlog的偏移量
private long dividedCommitlogOffset = -1;
// 是否正在恢复旧的commitlog
private boolean isInrecoveringOldCommitlog = false;
}
了解完 CommitLog,我们回到 broker 的启动,核心类 BrokerController 的 initialize 方法,首先会根据配置生成一个 DefaultMessageStore,这里会判断当前是否支持 DLedgerCommitLog,如果支持,则会创建一个 DLedgerRoleChangeHandler 对象并注册为 leader 选举的回调方法。接着与老版本一致会创建一个 BrokerStats 对象和一个 MessageStorePluginContext 对象。最后,会将 CommitLogDispatcherCalcBitMap 对象添加到 MessageStore 的 DispatcherList 中。
public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
……
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
// 如果支持DLegerCommitLog
if (messageStoreConfig.isEnableDLegerCommitLog()) {
// 创建DLedgerRoleChangeHandler
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
// 将CommitLog转换为DLegerCommitLog,并添加DLedgerRoleChangeHandler处理器
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
// 加载插件
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
}
……
}
}
接着我们进入 DLedgerCommitLog 的代码,这里可以看到使用了 openmessaging 包下的 DLedgerServer 组件
public class DLedgerCommitLog extends CommitLog {
@Override
public void start() {
// 启动dLedgerServer
dLedgerServer.startup();
}
}
基于 dLedgerLeaderElector 做了 Leader 选举的操作
public class DLedgerServer extends AbstractDLedgerServer {
public synchronized void startup() {
if (!isStarted) {
this.dLedgerStore.startup();
this.fsmCaller.ifPresent(x -> {
// 启动状态机调用程序并加载现有快照以进行数据恢复
x.start();
x.getSnapshotManager().loadSnapshot();
});
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
}
this.dLedgerEntryPusher.startup();
// 进行leader选举
this.dLedgerLeaderElector.startup();
executorService.scheduleAtFixedRate(this::checkPreferredLeader, 1000, 1000, TimeUnit.MILLISECONDS);
isStarted = true;
}
}
}
启动状态机
public class DLedgerLeaderElector {
public void startup() {
// 启动状态机
stateMaintainer.start();
for (RoleChangeHandler roleChangeHandler : roleChangeHandlers) {
roleChangeHandler.startup();
}
}
}
状态机
public class StateMaintainer extends ShutdownAbleThread {
public StateMaintainer(String name, Logger logger) {
super(name, logger);
}
@Override
public void doWork() {
try {
// 是否支持Leader选举
if (DLedgerLeaderElector.this.dLedgerConfig.isEnableLeaderElector()) {
DLedgerLeaderElector.this.refreshIntervals(dLedgerConfig);
// 状态机核心方法
DLedgerLeaderElector.this.maintainState();
}
sleep(10);
} catch (Throwable t) {
DLedgerLeaderElector.LOGGER.error("Error in heartbeat", t);
}
}
}
状态机核心方法,Raft 中的三个角色,Leader、Follower、Candidate,这里对三个角色不做过多叙述,可以参见《In Search of an Understandable Consensus Algorithm》 一文
public class DLedgerLeaderElector {
private void maintainState() throws Exception {
// Leader角色
if (memberState.isLeader()) {
maintainAsLeader();
} else if (memberState.isFollower()) {
// Follower角色
maintainAsFollower();
} else {
// Candidate角色
maintainAsCandidate();
}
}
}
接下来我们来看 raft 的核心实现,首先看 Leader 角色的实现代码
当上次发送心跳时间大于心跳包间隔时,会重新发送心跳
public class DLedgerLeaderElector {
private void maintainAsLeader() throws Exception {
// 上次发送心跳时间是否大于心跳包间隔时间
if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {
// 任期
long term;
// 主节点id
String leaderId;
synchronized (memberState) {
if (!memberState.isLeader()) {
//stop sending
return;
}
term = memberState.currTerm();
leaderId = memberState.getLeaderId();
lastSendHeartBeatTime = System.currentTimeMillis();
}
// 发送心跳
sendHeartbeats(term, leaderId);
}
}
}
当我们看心跳代码前,首先回顾下 raft 理论中对于心跳返回的描述
再让我们回到代码
public class DLedgerLeaderElector {
private void sendHeartbeats(long term, String leaderId) throws Exception {
……
for (String id : memberState.getPeerMap().keySet()) {
if (memberState.getSelfId().equals(id)) {
continue;
}
HeartBeatRequest heartBeatRequest = new HeartBeatRequest();
heartBeatRequest.setGroup(memberState.getGroup());
heartBeatRequest.setLocalId(memberState.getSelfId());
heartBeatRequest.setRemoteId(id);
// 主节点id
heartBeatRequest.setLeaderId(leaderId);
// 任期
heartBeatRequest.setTerm(term);
// 异步发送心跳
CompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);
future.whenComplete((HeartBeatResponse x, Throwable ex) -> {
try {
if (ex != null) {
memberState.getPeersLiveTable().put(id, Boolean.FALSE);
throw ex;
}
// 获取心跳结果
switch (DLedgerResponseCode.valueOf(x.getCode())) {
// 成功
case SUCCESS:
succNum.incrementAndGet();
break;
// 主节点的Term小于从节点
case EXPIRED_TERM:
maxTerm.set(x.getTerm());
break;
// 从节点的主节点非当前节点
case INCONSISTENT_LEADER:
inconsistLeader.compareAndSet(false, true);
break;
// 从节点尚未准备完毕
case TERM_NOT_READY:
notReadyNum.incrementAndGet();
break;
default:
break;
}
……
} catch (Throwable t) {
LOGGER.error("heartbeat response failed", t);
} finally {
allNum.incrementAndGet();
if (allNum.get() == memberState.peerSize()) {
beatLatch.countDown();
}
}
});
}
long voteResultWaitTime = 10;
beatLatch.await(heartBeatTimeIntervalMs - voteResultWaitTime, TimeUnit.MILLISECONDS);
Thread.sleep(voteResultWaitTime);
// 当从节点返回的term大于自身时,直接退化为candidate
if (maxTerm.get() > term) {
LOGGER.warn("[{}] currentTerm{} is not the biggest={}, deal with it", memberState.getSelfId(), term, maxTerm.get());
changeRoleToCandidate(maxTerm.get());
return;
}
// 当半数以上正常返回心跳时,Leader状态正常,重置心跳时间
if (memberState.isQuorum(succNum.get())) {
lastSuccHeartBeatTime = System.currentTimeMillis();
} else {
LOGGER.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",
memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));
// 当正常心跳 + 未准备的心跳大于半数时,立即发送心跳
if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {
lastSendHeartBeatTime = -1;
// 当从节点中有其他主节点时,直接退化为candidate
} else if (inconsistLeader.get()) {
changeRoleToCandidate(term);
// 如果上次心跳包时间大于3次心跳间隔时间,直接退化为candidate
} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {
changeRoleToCandidate(term);
}
}
}
}
从简单上来说,Leader 只做了一件事,那就是发送心跳,根据心跳结果判断服务是否正常及自己的地位。接着让我们看 Follower 做了什么,Follower 在选举中的流程比较简单
public class DLedgerLeaderElector {
private void maintainAsFollower() {
// 如果上次心跳时间大于2次心跳间隔
if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2L * heartBeatTimeIntervalMs) {
synchronized (memberState) {
// 如果当前角色是Follower,并且心跳大于3次心跳间隔,升级到candidate
if (memberState.isFollower() && DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > (long) maxHeartBeatLeak * heartBeatTimeIntervalMs) {
LOGGER.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());
changeRoleToCandidate(memberState.currTerm());
}
}
}
}
}
最后我们来看 Candidate 角色,这块的代码比较多,让我们逐行来进行分析
public class DLedgerLeaderElector {
private void maintainAsCandidate() throws Exception {
// 如果当前时间小于下次发起投票时间或者不应该立即发起投票,返回
if (System.currentTimeMillis() < nextTimeToRequestVote && !needIncreaseTermImmediately) {
return;
}
// 任期
long term;
// Leader节点的投票任期
long ledgerEndTerm;
// 日志的最大索引
long ledgerEndIndex;
// 如果不是Candidate,直接返回
if (!memberState.isCandidate()) {
return;
}
synchronized (memberState) {
if (!memberState.isCandidate()) {
return;
}
// 如果上次投票结果是等待下一轮或者需要立即投票
if (lastParseResult == VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT || needIncreaseTermImmediately) {
// 记录当前任期
long prevTerm = memberState.currTerm();
// 记录下一任期
term = memberState.nextTerm();
LOGGER.info("{}_[INCREASE_TERM] from {} to {}", memberState.getSelfId(), prevTerm, term);
// 将上次投票结果重置为等待重新投票
lastParseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
} else {
term = memberState.currTerm();
}
// 设置日志的最大索引
ledgerEndIndex = memberState.getLedgerEndIndex();
// 设置节点的投票任期
ledgerEndTerm = memberState.getLedgerEndTerm();
}
// 需要立即投票
if (needIncreaseTermImmediately) {
// 设置下次投票超时时间,这中间为了减少同时发起的概率,会有随机时间
nextTimeToRequestVote = getNextTimeToRequestVote();
// 重置需要立即投票标识
needIncreaseTermImmediately = false;
return;
}
// 发起投票申请,包含当前任期,自身维护的最大任期,自身维护的最大日志索引,直接给自己投票
final List<CompletableFuture<VoteResponse>> quorumVoteResponses = voteForQuorumResponses(term, ledgerEndTerm, ledgerEndIndex);
……
for (CompletableFuture<VoteResponse> future : quorumVoteResponses) {
future.whenComplete((VoteResponse x, Throwable ex) -> {
try {
if (ex != null) {
throw ex;
}
LOGGER.info("[{}][GetVoteResponse] {}", memberState.getSelfId(), JSON.toJSONString(x));
if (x.getVoteResult() != VoteResponse.RESULT.UNKNOWN) {
validNum.incrementAndGet();
}
synchronized (knownMaxTermInGroup) {
switch (x.getVoteResult()) {
// 赞成,成功数加一
case ACCEPT:
acceptedNum.incrementAndGet();
break;
// 被已有Leader的节点拒绝
case REJECT_ALREADY_HAS_LEADER:
alreadyHasLeader.compareAndSet(false, true);
break;
// 任期小于其他选举人
case REJECT_TERM_SMALL_THAN_LEDGER:
case REJECT_EXPIRED_VOTE_TERM:
if (x.getTerm() > knownMaxTermInGroup.get()) {
// 维护最大任期
knownMaxTermInGroup.set(x.getTerm());
}
break;
// 任期小于对方
case REJECT_EXPIRED_LEDGER_TERM:
// 日志小于对方
case REJECT_SMALL_LEDGER_END_INDEX:
biggerLedgerNum.incrementAndGet();
break;
// 对方尚未准备完成
case REJECT_TERM_NOT_READY:
notReadyTermNum.incrementAndGet();
break;
// 已投票
case REJECT_ALREADY_VOTED:
// 拒绝接受领导
case REJECT_TAKING_LEADERSHIP:
default:
break;
}
}
// 如果已经有leader或已接受的投票数量满足 quorum 或者已接受和未准备好的数量之和满足 quorum,释放阻塞状态
if (alreadyHasLeader.get()
|| memberState.isQuorum(acceptedNum.get())
|| memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
voteLatch.countDown();
}
} catch (Throwable t) {
LOGGER.error("vote response failed", t);
} finally {
allNum.incrementAndGet();
// 所有异步请求结束时,释放阻塞状态
if (allNum.get() == memberState.peerSize()) {
voteLatch.countDown();
}
}
});
}
try {
// 生成一个随机数的阻塞时间
voteLatch.await(2000 + random.nextInt(maxVoteIntervalMs), TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
}
lastVoteCost = DLedgerUtils.elapsed(startVoteTimeMs);
VoteResponse.ParseResult parseResult;
if (knownMaxTermInGroup.get() > term) {
// 已知的最大任期比当前任期要大,则返回 WAIT_TO_VOTE_NEXT,并转变为Candidate
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
changeRoleToCandidate(knownMaxTermInGroup.get());
} else if (alreadyHasLeader.get()) {
// 已经存在Leader,则返回 WAIT_TO_VOTE_NEXT
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote() + (long) heartBeatTimeIntervalMs * maxHeartBeatLeak;
} else if (!memberState.isQuorum(validNum.get())) {
// 有效响应的数量无法满足 quorum,则返回 WAIT_TO_REVOTE
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote();
} else if (!memberState.isQuorum(validNum.get() - biggerLedgerNum.get())) {
// 有效响应的数量减去日志条目大于自身的数量无法满足 quorum,则返回 WAIT_TO_REVOTE
parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;
nextTimeToRequestVote = getNextTimeToRequestVote() + maxVoteIntervalMs;
} else if (memberState.isQuorum(acceptedNum.get())) {
// 接受的投票数量满足 quorum,则本次投票通过
parseResult = VoteResponse.ParseResult.PASSED;
} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {
// 已接受和未准备好的数量之和满足 quorum,则立即进行投票
parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;
} else {
parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;
nextTimeToRequestVote = getNextTimeToRequestVote();
}
lastParseResult = parseResult;
LOGGER.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}",
memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult);
if (parseResult == VoteResponse.ParseResult.PASSED) {
LOGGER.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);
// 如果是通过,则转变为Leader对象
changeRoleToLeader(term);
}
}
}
由于篇幅的问题,我们并没有一一将 Dledger 的核心代码在这里展现,在此仅展示了选主等基本流程,对于写入、复制、日志存储、消息传递等都不多加诉说。Dledger 算法的核心原理是 Raft 协议,当一个节点发起投票请求时,其他节点会收到请求并发送响应,响应的结果将根据投票数量判断是否达成共识。如果共识达成,则新的 leader 将被选举出来,同时新的日志将被追加到磁盘上。如果共识未达成,则需要等待一定时间后重新发起投票请求。
那么这写法有问题吗?看起来似乎非常完美,解决了选主的问题,但是同时给用户造成了很大的困扰,首先,Broker 的副本必须是三个及以上,副本的 ACK 必须遵循多数派协议,这一点造成了成本与性能损耗的上升,其次,这使得 RocketMQ 存在两套 HA 复制流程,且 Raft 模式下的复制无法利用 RocketMQ 原生的存储能力。
于是 RocketMQ 在 5.0 版本出了一个全新的模式,Controller 模式。一个基于 Raft 的一致性模块(DLedger Controller),并当作一个可选的选主组件,支持独立部署,也可以嵌入在 Nameserver 中,Broker 通过与 Controller 的交互完成 Master 的选举。
然而,即使是 5.0 中做了优化的 DLedger Controller,仍然存在一些问题,下面让我们看看这个模式带来的优缺点
本文简单介绍了 DLedger 基于 Raft 协议实现的 Leader 选举机制,让大家深入地理解分布式系统中的 Leader 选举过程。然而如何在实际场景下选取、优化和扩展分布式一致性算法,都是非常值得探讨的问题。
Apache RocketMQ
In Search of an Understandable Consensus Algorithm (Extended Version)
如果你觉得这篇内容对你挺有启发,我想邀请你帮我两件小事
1.点个「在看」,让更多人也能看到这篇内容(点了「在看」,bug -1 😊)
招贤纳士
政采云技术团队(Zero),Base 杭州,一个富有激情和技术匠心精神的成长型团队。规模 500 人左右,在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。
如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 zcy-tc@cai-inc.com