导读
JSF作为京东内部日常最常用的RPC组件,通过JSF扩展,为大家提供JSF扩展思路及本地联调实践。
在项目开发中,研发同学会经历下面的过程:
图1 研发日常开发周期
但是针对团队的测试环境,涉及到几个痛点:
多人联调使用时尤其是跨部门联调时,强依赖环境稳定,频繁启停发布或者由于某些原因没有启动成功,影响的不是一个人的,而是多方的
jsf接口逻辑较多,联调环节存在隐晦的bug时,大部分做法是打点日志重启再次调用排查,如果没有发现问题,继续反复加日志重启排查。这样的过程是苦恼且费时的,终于找到问题了,进行修复,但是修复完成又懒得把排查时添加的日志删了。这一系列问题是由于不能打断点
联调的双方有一方没有测试环境,想联调只能上uat
上面列举的痛点,大家或多或少都有体会,尤其是测试同学体更多一些。其实在项目周期中,将问题暴露的越早,项目时间越可控。为了不在痛,引出本章的主角:jsf本地联调工具。
对于设计工具,有两个设计准则:
非侵入,不影响项目代码
简单,只需要简单的配置就可以看到效果,目的就是要做到简单好用
首先看下整体架构图,这里面分成三层:
黄色代表jsf调用者
蓝色代表redis中转者
绿色代表jsf提供者
为什么要使用中间件作为中转,这是由于内网环境下,两台电脑是无法pin通的,所以无法进行直连,所以需要中转来做数据传输。
图2 JSF本地联调工具调用关系
看过架构图大家会对工具有个大体的了解,利用redis的发布订阅模式来完成jsf请求到响应的数据传递工作。除此之外要解决的问题还有很多,比如如何做到非侵入、如何在线程中发出请求后同步得到结果、数据传输过程中使用哪种序列化方式、怎么做到简单配置即可使用等等,针对这些问题绘制了思维脑图,方便整理思路。
下面是对工具的思维脑图:
图3 JSF本地联调工具设计的思维脑图
通过思维脑图,列出了解决这些问题的办法,下面就一一讲解。
3.1 连通方式
3.1.1 中间件
针对本地环境无法连通的难点,有句老话:逢山开路,遇水搭桥。这是一种精神,也是这个难点的解决方案。
内网环境下,两个电脑无法连通,但是每台电脑都可以和中间件连通。通过中间件作为连通两个本地环境的桥梁,做到连通。
图4 JSF本地联调工具核心思路
通过上图可以分析,中间件需要具备的能力就是发布订阅,那么能想到的就是jmq和redis。与其每次使用工具时都去配置topic、用户名、密码、链接地址等等,还不如在组件里集成好,并固定一组请求和响应topic,让使用者无感的使用。基于这个思路,继续进行假设:现在有A机器调用者,B机器和C机器是提供者,B和C都在本地启动了,A想和B进行联调,如何保证B可以接收到消息呢?
这就分两种情况:
使用jmq的情况
两台提供者同时启动,如果用户相同,那么相当于在同一组消费topic,A请求打到B机器还是C机器无法确定。但是jmq支持广播模式,可以实现B机器和C机器同时受到消息。那如何确定B才是真是本次请求的目标机器呢?可以在jsf请求消息体里携带目的机器的ip信息,当B和C同时接到消息后,判断是否与自身ip匹配,匹配就说明这次请求是自己该处理的,否则不处理即可。
图5 MQ方式本地联调交互图
使用redis的情况
redis的订阅发布模式,本身就是广播模式,只要订阅的机器都可以收到请求。多个提供者时处理办法和上面的一致即可
这样通过jmq或者redis都可以实现想要的效果了,那么到底选择哪中更加合适呢?那就要从jmq和redis特点来说明:
先说jmq中间件,jmq的topic需要在jmq平台进行申请,如果联调双方有一方无法使用工具里集成的jmq环境进行连通,那就要自定义jmq环境,并且还需要在jmq平台申请topic。再一点,也是重要的一点,jmq在广播模式下,由于消费者是拉取模式,如果第一次判断队列里没有数据,则第二次轮询的时间会比较慢。这样就会导致使用这个jsf联调工具时,一次请求响应的时间比较长,测试结果,一次请求响应可以达到30s以上,这还是比较难接受的
再说redis,redis发布订阅模式不需要通过平台申请topic这一步。redis会判断订阅的topic是否存在,不存在就自己创建一个,当没有订阅者时会自己删除topic,这个实现的目的就是节约内存。同时redis基于内存实现,响应速度非常快,经过测试,可以达到秒级甚至更快
通过上面的分析决定使用redis作为本工具的中间件。
3.1.2 序列化
这是几乎所有通过网络传输都需要考虑的问题,使用jsf时,jsf支持的序列化方式很多,在公司使用比较多的就是默认的方式msgpack和hessian。msgpack携带的数据更少,性能更好。而hessian更加灵活,但是性能不如msgpack。
如果使用redis,redis本身支持的序列化方式里没有msgpack和hessian,但是使用redistemplete时,可以通过扩展的方式对序列化进行自定义,可以自己根据msgpack和hessian进行序列化。
但是问题又来了,redistemplete在springboot启动时,需要定义好针对哪种类型数据使用哪种序列化方式,也就是很难做到像jsf那样,根据请求。目前做法是针对msgpack和hessian通过不同的topic区分,请求时根据携带的序列化方式发送给不同序列化方式的topic。
redis序列化扩展需要实现RedisSerializer<T>泛型接口,实现serialize和deserialize方法即可。结合了jsf提供序列化工具进行扩展。
public class RedisJsfSerializer<T> implements RedisSerializer<T> {
private final JavaType javaType;
private final Codec codec;
public RedisJsfSerializer(Class<T> type, Constants.CodecType codecType) {
this.javaType = getJavaType(type);
this.codec = CodecFactory.getInstance(codecType);
}
public byte[] serialize(T t) throws SerializationException {
if (t == null) {
return new byte[0];
}
try {
return this.codec.encode(t);
} catch (Exception ex) {
throw new SerializationException("Could not write codec: " + ex.getMessage(), ex);
}
}
public T deserialize(byte[] bytes) throws SerializationException {
try {
return (T)codec.decode(bytes, javaType.getRawClass());
} catch (Exception e) {
return (T)codec.decode(bytes, javaType.getRawClass().getTypeName());
}
}
protected JavaType getJavaType(Class<?> clazz) {
return TypeFactory.defaultInstance().constructType(clazz);
}
}
构造中传入的是jsf定义的序列化类型,通过CodecFactory.getInstance(codecType)获取对应的序列化器。
这里需要注意的是,如果是msypack方式对ResponseMessage进行反序列化时,jsf提供了一个反序列化模板,如果不使用反序列化模板,则反序列化会报错,所以在上面deserialize方法中对应做了处理。
有了序列化器,就可以针对jsf请求和响应提供hessian和msgpack两种序列化处理。
public class RedisSerializerConfiguration {
@Bean
public RedisSerializer<RequestMessage> hessianRequestSerializer() {
return new RedisJsfSerializer<>(RequestMessage.class, Constants.CodecType.hessian);
}
@Bean
public RedisSerializer<ResponseMessage> hessianResponseSerializer() {
return new RedisJsfSerializer<>(ResponseMessage.class, Constants.CodecType.hessian);
}
@Bean
public RedisSerializer<RequestMessage> msgpackRequestSerializer() {
return new RedisJsfSerializer<>(RequestMessage.class, Constants.CodecType.msgpack);
}
@Bean
public RedisSerializer<ResponseMessage> msgpackResponseSerializer() {
return new RedisJsfSerializer<>(ResponseMessage.class, Constants.CodecType.msgpack);
}
@Bean
public RedisSerializer<String> stringRedisSerializer() {
return new StringRedisSerializer();
}
}
通过上面的配置,就可以在监听redis topic和发送topic根据序列化方式使用不同的序列化对象处理了。
3.1.3 异步转同步
在发送一个请求之后要同步等待结果的返回,现在发送请求变成的想redis发送一条消息,返回是需要监听响应topic拿到的。如何做到同步等待响应结果呢?
第一个就是CountDownLatch,先通过伪代码看下思路:
//创建
CountDownLatch countDownLatch = new CountDownLatch(1);
//监听响应topic
subscribe(responseMessage -> countDownLatch.countDown());
//发送请求topic
publish(requestMessage);
//等待结果
countDownLatch.await(10, TimeUnit.SECONDS);
这样就可以在收到响应的时候结束等待,继续流程了。
这里使用CountDownLatch有个好处就是可以设置等待时长,这个与jsf设置超时时长很相似。
3.2 触发时机
通过上面的准备工作基本捋清了工具使用的关键技术。下面就要考虑在什么时机触发工具,让工具帮完成调用工作。
这里就说道了jsf的扩展性,每次jsf调用过程都会经过一些列过滤器,无论是调用者还是提供者。
借助jsf官方文档上的图来看下。
图6 JSF filter调用顺序图
既然是这样,扩展一个filter,每次调用者调用方法时,就可以通过自定义filter拦截请求,完成通过中间件通信的目的。
至于如何扩展filter,在写工具的时候jsf文档上还没有说明,现在文档已经更新了,大家可以参阅jsf文档。
3.3 易用性
1. 首先就是不需要使用者去了解内部细节,引入就能用。之前可以看到项目里会有引入通用jar包后,需要手动显式在xml中配置对应bean才能正常使用这个功能。除此之外,通过springbootstarter方式直接将对应bean引入到容器中,这不乏是一种好的方案。
2. 其次就是jsf中的组件是否由于spring托管,只有被spring容器托管,才能通过简单配置来控制插件行为,比如插件是否开启。
最简单的验证方式就是项目启动后查看jsf内置filter是否可以从spring容器中拿到
这里用一个集成jsf的springboot项目启动后,从容器中获取jsf内置系统时间检查过滤器。并未能获取到
图7 查看JSF内部对象是否被Sping容器管理
这个问题也可以解决,通过提供一个静态工具类,在容器启动时拿到spring应用上下文对象就可以了。例如:
4j
public class SpringUtils implements ApplicationContextAware {
private static ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(SpringUtils.applicationContext == null) {
SpringUtils.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通过name获取 Bean.
* @param name
* @return
*/
public static <T> T getBean(String name,Class<T> clazz){
if(getApplicationContext() == null) {
return null;
}
try {
return getApplicationContext().getBean(name,clazz);
} catch (Exception e) {
log.warn("通过spring获取配置参数异常", e);
}
return null;
}
}
这样通过这个工具就可以从容器中拿到配置参数,被任何非spring容器中的类使用了。
3.4 整体流程图
说明:
虚线部分是jsf原调用流程,工具不会通过的
蓝线部分是工具走的路线,线上的描述序号为执行顺序
图8 JSF本地联调工具调用顺序图
由于篇幅有限,这里只展示关键代码。
提供者订阅
"jsf.plugin.filter.local.invoke",name = "enable",havingValue = "true") (prefix =
({JsfLocalInvokeProperties.class})
public class JsfFilterRedisConfiguration {
...
/**
* jsf 提供者接收消息监听器适配器
* @param messageSubscribe
* @param requestMessageRedisSerializer
* @return
*/
public MessageListenerAdapter messageListenerAdapter(MessageSubscribe<RequestMessage> messageSubscribe,RedisSerializer<RequestMessage> requestMessageRedisSerializer) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(messageSubscribe, "receiveMessage");
messageListenerAdapter.setSerializer(requestMessageRedisSerializer);
messageListenerAdapter.afterPropertiesSet();
return messageListenerAdapter;
}
/**
* redis 订阅容器
* @param redisConnectionFactory
* @param jsfLocalInvokeProperties
* @param messageListenerAdapter
* @return
*/
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,JsfLocalInvokeProperties jsfLocalInvokeProperties,MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
if(RoleEnum.PRODUCER.getCode().equals(jsfLocalInvokeProperties.getRole())) {
container.addMessageListener(messageListenerAdapter, new PatternTopic(LOCAL_INVOKE_REDIS_CUSTOMER_TOPIC));
}
return container;
}
}
调用者调用Filter
public class JsfLocalInvokeFilterByRedis extends AbstractFilter {
private volatile JsfLocalInvokeProperties jsfLocalInvokeProperties;
public static final String REGISTRY_CLASS_NAME = "com.jd.jsf.service.RegistryService";
private volatile RedisMessageListenerContainer redisMessageListenerContainer;
private volatile RedisSerializer<ResponseMessage> redisSerializer;
private volatile PublishMessageBean<RequestMessage> publishMessageBean;
public ResponseMessage invoke(RequestMessage requestMessage) {
if(getJsfLocalInvokeProperties() == null
|| !getJsfLocalInvokeProperties().isEnable()
|| !addAddressAndPort(requestMessage)) {
return this.getNext().invoke(requestMessage);
}
//排除注册、心跳接口
if(REGISTRY_CLASS_NAME.equals(requestMessage.getClassName())) {
return this.getNext().invoke(requestMessage);
}
CountDownLatch countDownLatch = new CountDownLatch(1);
//hessian对alias要求group:version 没有就补全
if(!verifyAlias(requestMessage.getAlias())) {
requestMessage.setAlias(requestMessage.getAlias() + LOCAL_INVOKE_JSF_VERSION);
requestMessage.getInvocationBody().addAttachment(LOCAL_INVOKE_JSF_VERSION_ADDED, 1);
}
log.info("开启内网jsf调用,requestMessage = {}", JSON.toJSONString(requestMessage));
//topic之间关联值
UUID uuid = UUID.randomUUID();
//添加生产者监听
AtomicReference<ResponseMessage> messageAtomicReference = new AtomicReference<>();
MessageListenerAdapter listenerAdapter = addListener(message -> {
log.debug("收到生产者回复,message = {}", message);
messageAtomicReference.set(message);
countDownLatch.countDown();
});
requestMessage.getInvocationBody().addAttachment(LOCAL_INVOKE_UUID, uuid.toString());
//发送消费者topic
getPublishMessageBean().publish(LOCAL_INVOKE_REDIS_CUSTOMER_TOPIC, requestMessage);
//等结果
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("JsfLocalInvokeFilter等待提供者结果被中断", e);
}
//拿到结果
ResponseMessage responseMessage;
if (messageAtomicReference.get() != null) {
responseMessage = messageAtomicReference.get();
} else {
//给默认值
responseMessage = MessageBuilder.buildResponse(requestMessage);
}
//停止并取消订阅生产者监听
removeListener(listenerAdapter);
return responseMessage;
}
...
}
jsf提供者接收请求
4j
public class ProviderMessageSubscribe implements MessageSubscribe<RequestMessage>{
private final PublishMessageBean<ResponseMessage> publishMessageBean;
public ProviderMessageSubscribe(PublishMessageBean<ResponseMessage> publishMessageBean) {
this.publishMessageBean = publishMessageBean;
}
public void receiveMessage(RequestMessage requestMessage) {
modifyAlias(requestMessage);
log.info("生产者收到消息:{}", JSON.toJSONString(requestMessage));
ProviderConfig providerConfig = getProviderConfig(requestMessage);
if(providerConfig != null && isSameIp(requestMessage)) {
log.debug("ProviderTopicListener invokeJsf find jsf interface");
ProviderProxyInvoker invoker = new ProviderProxyInvoker(providerConfig);
ResponseMessage responseMessage = invoker.invoke(requestMessage);
publishMessageBean.publish(LOCAL_INVOKE_REDIS_PROVIDER_TOPIC, responseMessage);
} else {
log.warn("没有提供者或非调用者配置的ip,不处理");
}
}
private void modifyAlias(RequestMessage requestMessage) {
String alias = requestMessage.getAlias();
boolean versionAdded = requestMessage.getInvocationBody().getAttachments().containsKey(LOCAL_INVOKE_JSF_VERSION_ADDED);
if(versionAdded) {
requestMessage.setAlias(alias.substring(0, alias.indexOf(LOCAL_INVOKE_JSF_VERSION)));
}
}
private ProviderConfig getProviderConfig(RequestMessage requestMessage) {
List<ProviderConfig> providerConfigs = JSFContext.getProviderConfigs();
for (ProviderConfig providerConfig : providerConfigs) {
if(requestMessage.getAlias().equals(providerConfig.getAlias()) && requestMessage.getClassName().equals(providerConfig.getInterfaceId())) {
return providerConfig;
}
}
return null;
}
private boolean isSameIp(RequestMessage requestMessage) {
final Object attachmentsObj = requestMessage.getInvocationBody().getAttachment(LOCAL_INVOKE_REMOTE_ATTACHMENT);
if(attachmentsObj == null) {
return true;
}
try {
HashSet<String> attachmentSet = (HashSet<String>)attachmentsObj;
return attachmentSet.contains(JSFContext.getLocalHost());
} catch (Exception e) {
return true;
}
}
}
主要分成三步:
第一步
引入jar包
<dependency>
<groupId>com.jxd</groupId>
<artifactId>spring-boot-starter-jsf-plugin</artifactId>
<version>0.0.2-SNAPSHOT</version>
</dependency>
第二步
在任何springboot可以检测到的配置中添加配置,例如application.properties文件中进行配置。
下面表格,除了前三个之外,关于redis的相关的非必要不需要配置。
第三步
下面就只剩jsf配置了,我们知道jsf想要通过直连方式进行调用,需要配置url,本插件也是同样配置即可。
下面举个例子。
如果你是提供者,配置如下:
<jsf:provider id="userApiProvider" interface="com.zhongyouex.order.api.a.UserApi" alias="${jsf.provider.alias}"
ref="userApiImpl" server="jsf"/>
如果你是调用者,配置如下:
<jsf:consumer id="userApi" interface="com.zhongyouex.order.api.a.UserApi"
protocol="jsf" alias="zyex-order-lv" timeout="10000"
retries="0" check="false" url="jsf://10.0.193.90:22000">
</jsf:consumer>
说明:url是提供者的ip即可,如果配置多个ip,且多个ip同时本地启动,则会广播打到全部ip上,但是只会接收一条响应。
就这三步即可实现jsf提供者与调用者的本地联调,是不是很方便。
这个小工具其实并不复杂,也没有多么高大上,只是为了解决研发过程中的小问题,方便大家使用。其实每位研发在开发的过程可能遇到不顺手的工具亦或者是工具平台某些功能不好用,这都是正常的,毕竟没有十全十美的。只要有槽点,不停留在槽点上,而是利用现有资源思考是否解决掉,并着手去实现。这样既可以解决问题,又可以锻炼自身技术,双重帮助。
对于方案或者实现细节可能会有瑕疵或者考虑不周的地方,欢迎大家指点,一起讨论,感谢阅读。