👆 这是第 330 篇不掺水的原创,想要了解更多,请戳下方卡片关注我们吧~
本文将以 Dubbo 源码为例,和您讨论如何使用 Lambda 和面向对象两种方法,对 Java 应用进行重构。并以实例展示了两者结合,写出简洁优雅的代码。
Java 8 的 Lambda 新特性也可以帮助提升代码的可读性:
在回调场景,可以使用 Lambda 简化代码。
比如,我们在使用 JDBC 的时候,资源用完要马上释放,否则会资源泄漏,如代码示例。
public String query(String sql) {
connection = getConnection();
statement = connection.getStatement(sql);
ResultSet rs = statement.executeQuery();
String result;
try {
while (rs.next()) {
//执行具体操作
result = rs.getString("name");
}
} finally {
rs.close();
statement.close;
}
}
query 函数中,除了取结果的 result = rs.getString("name") 这句,其他都是样板代码。这个时候,是不是可以使用模板方法的设计模式解决,抽象一个 AbstractQuery,并使用不同的子类实现查询,如下:
public class AbstractQuery {
public String query(String sql) {
connection = getConnection();
statement = connection.getStatement(sql);
ResultSet rs = statement.executeQuery();
String result;
try {
while (rs.next()) {
//执行具体操作
result = getResult(rs);
}
return result;
} finally {
rs.close();
statement.close;
}
}
abstract String getResult();
}
public class NameQuery extends AbstractQuery {
public String query(ResultSet rs) {
return rs.getString("name");
}
public static void main(String[] args) {
new NameQuery().query("xxxxx");
}
}
显然,这种方式不可取,因为查询的 sql 是动态的,得写无数类去实现。
这个时候,我们可以使用函数式来实现。
public class JbdcUtil {
public String query(String sql, Function<ResultSet, String> resFunc) {
connection = getConnection();
statement = connection.getStatement(sql);
ResultSet rs = statement.executeQuery();
String result;
try {
while (rs.next()) {
//执行具体操作
result = resFunc.apply(rs);
}
return result;
} finally {
rs.close();
statement.close;
}
}
public static void main(String[] args) {
String result = JbdcUtil.query("select xxxxx", rs ->rs.getString("name"));
}
}
这个样例中,使用了 Jdk 自带的 Function 来实现,因为 Function 的 apply 方法一个参数一个返回,和场景对应。
通常,我们可以用以下几步来确定函数:
1、查看需要的方法数量和是否需要返回
2、根据1的结果,选择 jdk 自带函数,没有合适的则自己写。
@FunctionalInterface
public interface Function<T, R> {
//根据参数和返回类型确定范型类型。上面示例中,我们需要根据 ResultSet 参数,获取 String 的返回
//则在参数中写 Function<ResultSet, String> resFunc ,从而与 apply 方法对应
R apply(T t);
}
JbdcUtil.query("xxxxx", rs ->rs.getString("name")); 这里的实参 ,-> 前面的代表参数,后面则是 apply 的 override 实现,如果不习惯,可以写成以下形式,idea可以自动帮助简写。
String result = JbdcUtil.query("select xxxxx", new Function<ResultSet, String>() {
@Override
public String apply(ResultSet rs) {
return rs.getString("name");
}
});
Dubbo 在 3.2 中,引入了完整的可观测性特性。可观测指标,需要对业务代码进行埋点,为统一埋点形式,使用了事件机制。如服务暴露时,需要获取向注册中心的注册 rt 延时、及异常次数指标。
// Provider 启动,向注册中心注册当前实例
public void register() {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
}
事件机制埋点
public void register() {
MetricsEventBus.post(new RegisterEvent(System.currentMills()));
try {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
} catch (Exception e) {
MetricsEventBus.error(new RegisterEvent(System.currentMills()));
throw e;
}
MetricsEventBus.finish(new RegisterEvent(System.currentMills()));
}
可以看到,加入事件后,比初始代码多了不少样板代码。使用 lambda 优化如下
public void register() {
MetricsEventBus.post(new RegisterEvent(),
() -> {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
}
);
}
//使用 jdk 自带 Supplier 函数,因为没有参数,只有返回
public class MetricsEventBus {
public static <T> T post(MetricsEvent event, Supplier<T> targetSupplier) {
dispatcher.publishEvent(event);
T result;
try {
result = targetSupplier.get();
} catch (Throwable e) {
dispatcher.publishErrorEvent(event);
throw e;
}
return result;
}
}
以上代码对 Dubbo 源码进行了精简,保留了主要逻辑。
Dubbo 收集的指标维度及类型非常的多,比如一次 rpc 请求,需要统计成功、失败次数,rt 方面,需要统计单次 rt,最近一次 rt,平均 rt,总 rt 等,需要一些容器来存放数据。
// lastRT, totalRT, rtCount, avgRT share a container, can utilize the system cache line
private final ConcurrentMap<M, AtomicLongArray> rtSample = new ConcurrentHashMap<>();
private final ConcurrentMap<M, LongAccumulator> minRT = new ConcurrentHashMap<>();
private final ConcurrentMap<M, LongAccumulator> maxRT = new ConcurrentHashMap<>();
private final ConcurrentMap<K, ConcurrentMap<M, AtomicLongArray>> rtGroupSample = new ConcurrentHashMap<>();
private final ConcurrentMap<K, ConcurrentMap<M, LongAccumulator>> groupMinRT = new ConcurrentHashMap<>();
private final ConcurrentMap<K, ConcurrentMap<M, LongAccumulator>> groupMaxRT = new ConcurrentHashMap<>();
这些指标数据,有许多相同之处,也有部分差异,比如:
初始化也不相同:不通类型的数据,初始化动作不同。如 int num = 0; long num = 0L; AtomicLongArray = new AtomicLongArray(4);
计算方法不同:比较多的是自增,有取当前值(比如最近一次请求的rt), 也有比较大小(比如rt的最大值,每次需要当前值和集合中的最大值进行比较),还有计算平均值等等。
导出方法不同:集合中数据,需要导出成统一格式(Dubbo 使用了 micrometer)。因为本身数据格式差异,导出方法也有相应差异。
这一系列过程中,很容易把代码写出面向过程,比如对请求 rt 指标的初始化和计算:
public void addRT(S source, Long rt) {
MetricsCountSampleConfigurer<S, K, M> sampleConfigure = new MetricsCountSampleConfigurer<>();
sampleConfigure.setSource(source);
this.rtConfigure(sampleConfigure);
M metric = sampleConfigure.getMetric();
// 初始化 AtomicLongArray 类型(不存在时)
AtomicLongArray rtCalculator = ConcurrentHashMapUtils.computeIfAbsent(this.rtSample, metric, k -> new AtomicLongArray(4));
// 设置 rt 类型的值(last类型rt)
rtCalculator.set(0, rt);
// 自增类型的 rt 更新(累加类型rt)
rtCalculator.addAndGet(1, rt);
rtCalculator.incrementAndGet(2);
// 计算 rt 最小值
LongAccumulator min = ConcurrentHashMapUtils.computeIfAbsent(minRT, metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));
min.accumulate(rt);
LongAccumulator max = ConcurrentHashMapUtils.computeIfAbsent(maxRT, metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));
max.accumulate(rt);
sampleConfigure.setRt(rt);
sampleConfigure.getFireEventHandler().accept(sampleConfigure);
}
以上代码来自 SimpleMetricsCountSampler.addRT(),较明显的面向过程写法,把各容器相同阶段(初始化、计算,这里不包含导出)的不同计算操作(赋值、累加、平均值等),耦合在一个方法中。如果增加了一种容器及新类型计算(假如中位数),只能在addRt方法修改。面向过程的代码特点是,容易出 bug 且不易维护。
1、容器类的抽象
请求指标代码中,对不同数据容器,简单地使用 Map 存储。map 能存储数据,但是没有数据的处理能力,只能依赖调用代码执行处理。我们可以使用功能更全面的独立对象来表示容器。
public class LongContainer<N extends Number> extends ConcurrentHashMap<String, N> {
/**
* 指标的 key 类型,比如注册、订阅、通知变更等
*/
private final transient MetricsKeyWrapper metricsKeyWrapper;
/**
* 初始化函数
*/
private final transient Function<String, N> initFunc;
/**
* 计算函数
*/
private final transient BiConsumer<Long, N> consumerFunc;
/**
* 导出函数
*/
private transient Function<String, Long> valueSupplier;
public LongContainer(MetricsKeyWrapper metricsKeyWrapper, Supplier<N> initFunc, BiConsumer<Long, N> consumerFunc) {
this.metricsKeyWrapper = metricsKeyWrapper;
this.initFunc = s -> initFunc.get();
this.consumerFunc = consumerFunc;
this.valueSupplier = k -> this.get(k).longValue();
}
//省略其他代码
}
以上容器类,包含了数据和数据处理函数,代替之前的 Map。
结合函数式编程,对容器类进行初始化,从而简化后续的计算、导出过程。
public class RegistryStatComposite implements MetricsExport {
public Map<ApplicationType, Map<String, AtomicLong>> applicationNumStats = new ConcurrentHashMap<>();
public Map<ServiceType, Map<ServiceKeyMetric, AtomicLong>> serviceNumStats = new ConcurrentHashMap<>();
//rt 容器变为 LongContainer 的集合,更方便遍历及统一处理
public List<LongContainer<? extends Number>> rtStats = new ArrayList<>();
public RegistryStatComposite() {
for (ApplicationType type : ApplicationType.values()) {
// Application key and increment val
applicationNumStats.put(type, new ConcurrentHashMap<>());
}
for (ServiceType type : ServiceType.values()) {
// Service key
serviceNumStats.put(type, new ConcurrentHashMap<>());
}
// App-level
rtStats.addAll(initStats(OP_TYPE_REGISTER, false));
rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE, false));
rtStats.addAll(initStats(OP_TYPE_NOTIFY, false));
// Service-level
rtStats.addAll(initStats(OP_TYPE_REGISTER_SERVICE, true));
rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE_SERVICE, true));
//如果需要新增指标类型,此处增加即可
}
//初始化容器,设置初始、计算及导出函数
private List<LongContainer<? extends Number>> initStats(String registryOpType, boolean isServiceLevel) {
List<LongContainer<? extends Number>> singleRtStats = new ArrayList<>();
singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_LAST, isServiceLevel)));
singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MIN, isServiceLevel), new LongAccumulator(Long::min, Long.MAX_VALUE)));
singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MAX, isServiceLevel), new LongAccumulator(Long::max, Long.MIN_VALUE)));
singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_SUM, isServiceLevel), (responseTime, longAccumulator) -> longAccumulator.addAndGet(responseTime)));
// AvgContainer 比较特殊,存储了总数,但是导出函数是平均数计算函数
AtomicLongContainer avgContainer = new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_AVG, isServiceLevel), (k, v) -> v.incrementAndGet());
avgContainer.setValueSupplier(applicationName -> {
LongContainer<? extends Number> totalContainer = rtStats.stream().filter(longContainer -> longContainer.isKeyWrapper(MetricsKey.METRIC_RT_SUM, registryOpType)).findFirst().get();
AtomicLong totalRtTimes = avgContainer.get(applicationName);
AtomicLong totalRtSum = (AtomicLong) totalContainer.get(applicationName);
return totalRtSum.get() / totalRtTimes.get();
});
singleRtStats.add(avgContainer);
return singleRtStats;
}
}
指标计算:
public void calcApplicationRt(String applicationName, String registryOpType, Long responseTime) {
for (LongContainer container : appRtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {
Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, applicationName, container.getInitFunc());
//使用容器的计算函数,执行埋点后的指标计算
container.getConsumerFunc().accept(responseTime, current);
}
}
可以看出,数据容器,在使用对象代替 map 后,看上去变的精简,可维护性也上升了。
如果你觉得这篇内容对你挺有启发,我想邀请你帮我两件小事
1.点个「在看」,让更多人也能看到这篇内容(点了「在看」,bug -1 😊)
招贤纳士
政采云技术团队(Zero),Base 杭州,一个富有激情和技术匠心精神的成长型团队。规模 500 人左右,在日常业务开发之外,还分别在云原生、区块链、人工智能、低代码平台、中间件、大数据、物料体系、工程平台、性能体验、可视化等领域进行技术探索和实践,推动并落地了一系列的内部技术产品,持续探索技术的新边界。此外,团队还纷纷投身社区建设,目前已经是 google flutter、scikit-learn、Apache Dubbo、Apache Rocketmq、Apache Pulsar、CNCF Dapr、Apache DolphinScheduler、alibaba Seata 等众多优秀开源社区的贡献者。
如果你想改变一直被事折腾,希望开始折腾事;如果你想改变一直被告诫需要多些想法,却无从破局;如果你想改变你有能力去做成那个结果,却不需要你;如果你想改变你想做成的事需要一个团队去支撑,但没你带人的位置;如果你想改变本来悟性不错,但总是有那一层窗户纸的模糊……如果你相信相信的力量,相信平凡人能成就非凡事,相信能遇到更好的自己。如果你希望参与到随着业务腾飞的过程,亲手推动一个有着深入的业务理解、完善的技术体系、技术创造价值、影响力外溢的技术团队的成长过程,我觉得我们该聊聊。任何时间,等着你写点什么,发给 zcy-tc@cai-inc.com