dubbo源码分析系列(4)dubbo通信设计

#1 系列目录

#2 NIO 通信层的抽象

目前 dubbo 已经集成的有 netty、mina、grizzly。先来通过案例简单了解下 netty、mina 编程(grizzly 没有了解过)

##2.1 netty 和 mina 的简单案例

netty 原本是 jboss 开发的,后来单独出来了,所以会有两种版本就是 org.jboss.netty 和 io.netty 两种包类型的,而 dubbo 内置的是前者。目前还不是很熟悉,可能稍有差别,但是整体大概都是一样的。

我们先来看下 io.netty 的案例:

public static void main(String[] args){ EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap=new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TcpServerHandler()); } }); ChannelFuture f=serverBootstrap.bind(8080).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally {  
        workerGroup.shutdownGracefully();  
        bossGroup.shutdownGracefully();  
    }  
}

mina 的案例:

public static void main(String[] args) throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter( new TextLineCodecFactory(Charset.forName("UTF-8"),"\r\n", "\r\n"))); acceptor.setHandler(new TcpServerHandler()); acceptor.bind(new InetSocketAddress(8080));  
}

两者都是使用 Reactor 模型结构。而最原始 BIO 模型如下:

原始BIO模型

每来一个 Socket 连接都为该 Socket 创建一个线程来处理。由于总线程数有限制,导致 Socket 连接受阻,所以 BIO 模型并发量并不大

Rector 多线程模型如下,更多信息见 Netty 系列之 Netty 线程模型

Rector多线程模型

用一个 boss 线程,创建 Selector,用于不断监听 Socket 连接、客户端的读写操作等

用一个线程池即 workers,负责处理 Selector 派发的读写操作。

由于 boss 线程可以接收更多的 Socket 连接,同时可以充分利用线程池中的每个线程,减少了 BIO 模型下每个线程为单独的 socket 的等待时间。

##2.2 服务器端如何集成 netty 和 mina

先来简单总结下上述 netty 和 mina 的相似之处,然后进行抽象概括成接口

  • 1 各自有各自的编程启动方式

  • 2 都需要各自的 ChannelHandler 实现,用于处理各自的 Channel 或者 IoSession 的连接、读写等事件。

    对于 netty 来说: 需要继承 org.jboss.netty.channel.SimpleChannelHandler(或者其他方式),来处理 org.jboss.netty.channel.Channel 的连接读写事件

    对于 mina 来说:需要继承 org.apache.mina.common.IoHandlerAdapter(或者其他方式),来处理 org.apache.mina.common.IoSession 的连接读写事件

为了统一上述问题,dubbo 需要做如下事情:

  • 1 定义 dubbo 的 com.alibaba.dubbo.remoting.Channel 接口

    • 1.1 针对 netty,上述 Channel 的实现为 NettyChannel,内部含有一个 netty 自己的 org.jboss.netty.channel.Channel channel 对象,即该 com.alibaba.dubbo.remoting.Channel 接口的功能实现全部委托为底层的 org.jboss.netty.channel.Channel channel 对象来实现

    • 1.2 针对 mina,上述 Channel 实现为 MinaChannel,内部包含一个 mina 自己的 org.apache.mina.common.IoSession session 对象,即该 com.alibaba.dubbo.remoting.Channel 接口的功能实现全部委托为底层的 org.apache.mina.common.IoSession session 对象来实现

  • 2 定义自己的 com.alibaba.dubbo.remoting.ChannelHandler 接口,用于处理 com.alibaba.dubbo.remoting.Channel 接口的连接读写事件,如下所示

    public interface ChannelHandler { void connected(Channel channel) throws RemotingException; void disconnected(Channel channel) throws RemotingException; void sent(Channel channel, Object message) throws RemotingException; void received(Channel channel, Object message) throws RemotingException; void caught(Channel channel, Throwable exception) throws RemotingException;
    
    }
    
    • 2.1 先定义用于处理 netty 的 NettyHandler,需要按照 netty 的方式继承 netty 的 org.jboss.netty.channel.SimpleChannelHandler,此时 NettyHandler 就可以委托 dubbo 的 com.alibaba.dubbo.remoting.ChannelHandler 接口实现来完成具体的功能,在交给 com.alibaba.dubbo.remoting.ChannelHandler 接口实现之前,需要先将 netty 自己的 org.jboss.netty.channel.Channel channel 转化成上述的 NettyChannel,见 NettyHandler

      public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { if (channel != null) { channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress()), channel); } handler.connected(channel); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); }
      } public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { channels.remove(NetUtils.toAddressString((InetSocketAddress) ctx.getChannel().getRemoteAddress())); handler.disconnected(channel); } finally {
              NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
          }
      }
      
    • 2.2 先定义用于处理 mina 的 MinaHandler,需要按照 mina 的方式继承 mina 的 org.apache.mina.common.IoHandlerAdapter,此时 MinaHandler 就可以委托 dubbo 的 com.alibaba.dubbo.remoting.ChannelHandler 接口实现来完成具体的功能,在交给 com.alibaba.dubbo.remoting.ChannelHandler 接口实现之前,需要先将 mina 自己的 org.apache.mina.common.IoSession 转化成上述的 MinaChannel,见 MinaHandler

      public void sessionOpened(IoSession session) throws Exception { MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { handler.connected(channel); } finally { MinaChannel.removeChannelIfDisconnectd(session); }
      } public void sessionClosed(IoSession session) throws Exception { MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler); try { handler.disconnected(channel); } finally {
              MinaChannel.removeChannelIfDisconnectd(session);
          }
      }
      

做了上述事情之后,全部逻辑就统一到 dubbo 自己的 com.alibaba.dubbo.remoting.ChannelHandler 接口如何来处理自己的 com.alibaba.dubbo.remoting.Channel 接口。

这就需要看下 com.alibaba.dubbo.remoting.ChannelHandler 接口的实现有哪些:

ChannelHandler接口实现

  • 3 定义 Server 接口用于统一大家的启动流程

    先来看下整体的 Server 接口实现情况

    Server接口实现情况

    如 NettyServer 的启动流程: 按照 netty 自己的 API 启动方式,然后依据外界传递进来的 com.alibaba.dubbo.remoting.ChannelHandler 接口实现,创建出 NettyHandler,最终对用户的连接请求的处理全部交给 NettyHandler 来处理,NettyHandler 又交给了外界传递进来的 com.alibaba.dubbo.remoting.ChannelHandler 接口实现。

    至此就将所有底层不同的通信实现全部转化到了外界传递进来的 com.alibaba.dubbo.remoting.ChannelHandler 接口的实现上了。

    而上述 Server 接口的另一个分支实现 HeaderExchangeServer 则充当一个装饰器的角色,为所有的 Server 实现增添了如下功能:

    向该 Server 所有的 Channel 依次进行心跳检测:

    • 如果当前时间减去最后的读取时间大于 heartbeat 时间或者当前时间减去最后的写时间大于 heartbeat 时间,则向该 Channel 发送一次心跳检测
    • 如果当前时间减去最后的读取时间大于 heartbeatTimeout,则服务器端要关闭该 Channel,如果是客户端的话则进行重新连接(客户端也会使用这个心跳检测任务)

##2.3 客户端如何集成 netty 和 mina

服务器端了解了之后,客户端就也非常清楚了,整体类图如下:

Client接口实现情况

如 NettyClient 在使用 netty 的 API 开启客户端之后,仍然使用 NettyHandler 来处理。还是最终转化成 com.alibaba.dubbo.remoting.ChannelHandler 接口实现上了。

HeaderExchangeClient 和上面的 HeaderExchangeServer 非常类似,就不再提了。

我们可以看到这样集成完成之后,就完全屏蔽了底层通信细节,将逻辑全部交给了 com.alibaba.dubbo.remoting.ChannelHandler 接口的实现上了。从上面我们也可以看到,该接口实现也会经过层层装饰类的包装,才会最终交给底层通信。

如 HeartbeatHandler 装饰类:

public void sent(Channel channel, Object message) throws RemotingException { setWriteTimestamp(channel); handler.sent(channel, message);
} public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if(logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug( new StringBuilder(32) .append("Receive heartbeat response in thread ") .append(Thread.currentThread().getName()) .toString()); } return;
    }
    handler.received(channel, message);
}

就会拦截那些上述提到的心跳检测请求。更新该 Channel 的最后读写时间。

##2.4 同步调用和异步调用的实现

首先设想一下我们目前的通信方式,使用 netty mina 等异步事件驱动的通信框架,将 Channel 中信息都分发到 Handler 中去处理了,Handler 中的 send 方法只负责不断的发送消息,receive 方法只负责不断接收消息,这时候就产生一个问题:

客户端如何对应同一个 Channel 的接收的消息和发送的消息之间的匹配呢?

这也很简单,就需要在发送消息的时候,必须要产生一个请求 id,将调用的信息连同 id 一起发给服务器端,服务器端处理完毕后,再将响应信息和上述请求 id 一起发给客户端,这样的话客户端在接收到响应之后就可以根据 id 来判断是针对哪次请求的响应结果了。

来看下 DubboInvoker 中的具体实现

boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult();
} else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult();
} else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get();
}
  • 如果不需要返回值,直接使用 send 方法,发送出去,设置当期和线程绑定 RpcContext 的 future 为 null
  • 如果需要异步通信,使用 request 方法构建一个 ResponseFuture,然后设置到和线程绑定 RpcContext 中
  • 如果需要同步通信,使用 request 方法构建一个 ResponseFuture,阻塞等待请求完成

可以看到的是它把 ResponseFuture 设置到与当前线程绑定的 RpcContext 中了,如果我们要获取异步结果,则需要通过 RpcContext 来获取当前线程绑定的 RpcContext,然后就可以获取 Future 对象。如下所示:

String result1 = helloService.hello("World");
System.out.println("result :"+result1);
System.out.println("result : "+RpcContext.getContext().getFuture().get());

当设置成异步请求的时候,result1 则为 null, 然后通过 RpcContext 来获取相应的值。

然后我们来看下异步请求的整个实现过程,即上述 currentClient.request 方法的具体内容:

public ResponseFuture request(Object request, int timeout) throws RemotingException { Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send(req); }catch (RemotingException e) { future.cancel(); throw e; } return future;
}
  • 第一步:创建出一个 request 对象,创建过程中就自动产生了 requestId, 如下

    public class Request { private final long mId; private static final AtomicLong INVOKE_ID = new AtomicLong(0); public Request() { mId = newId(); } private static long newId() { return INVOKE_ID.getAndIncrement();
        }
    }
    
  • 第二步:根据 request 请求封装成一个 DefaultFuture 对象,通过该对象的 get 方法就可以获取到请求结果。该方法会阻塞一直到请求结果产生。同时 DefaultFuture 对象会被存至 DefaultFuture 类如下结构中:

    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
    

    key 就是请求 id

  • 第三步:将上述请求对象发送给服务器端,同时将 DefaultFuture 对象返给上一层函数,即 DubboInvoker 中,然后设置到当前线程中

  • 第四步:用户通过 RpcContext 来获取上述 DefaultFuture 对象来获取请求结果,会阻塞至服务器端返产生结果给客户端

  • 第五步:服务器端产生结果,返回给客户端会在客户端的 handler 的 receive 方法中接收到,接收到之后判别接收的信息是 Response 后,

    static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
    

    就会根据 response 的 id 从上述 FUTURES 结构中查出对应的 DefaultFuture 对象,并把结果设置进去。此时 DefaultFuture 的 get 方法则不再阻塞,返回刚刚设置好的结果。

至此异步通信大致就了解了,但是我们会发现一个问题:

当某个线程多次发送异步请求时,都会将返回的 DefaultFuture 对象设置到当前线程绑定的 RpcContext 中,就会造成了覆盖问题,如下调用方式:

String result1 = helloService.hello("World");
String result2 = helloService.hello("java");
System.out.println("result :"+result1);
System.out.println("result :"+result2);
System.out.println("result : "+RpcContext.getContext().getFuture().get());
System.out.println("result : "+RpcContext.getContext().getFuture().get());

即异步调用了 hello 方法,再次异步调用,则前一次的结果就被冲掉了,则就无法获取前一次的结果了。必须要调用一次就立马将 DefaultFuture 对象获取走,以免被冲掉。即这样写:

String result1 = helloService.hello("World");
Future<String> result1Future=RpcContext.getContext().getFuture();
String result2 = helloService.hello("java");
Future<String> result2Future=RpcContext.getContext().getFuture();
System.out.println("result :"+result1);
System.out.println("result :"+result2);
System.out.println("result : "+result1Future.get());
System.out.println("result : "+result2Future.get());

最后来张 dubbo 的解释图片:

同步转异步

#3 通信层与 dubbo 的结合

从上面可以了解到如何对不同的通信框架进行抽象,屏蔽底层细节,统一将逻辑交给 ChannelHandler 接口实现来处理。然后我们就来了解下如何与 dubbo 的业务进行对接,也就是在什么时机来使用上述通信功能:

##3.1 服务的发布过程使用通信功能

如 DubboProtocol 在发布服务的过程中:

  • 1 DubboProtocol 中有一个如下结构

    Map<String, ExchangeServer> serverMap
    

    在发布一个服务的时候会先根据服务的 url 获取要发布的服务所在的 host 和 port,以此作为 key 来从上述结构中寻找是否已经有对应的 ExchangeServer(上面已经说明)。

  • 2 如果没有的话,则会创建一个,创建过程如下:

    ExchangeServer server = Exchangers.bind(url, requestHandler);
    

    其中 requestHandler 就是 DubboProtocol 自身实现的 ChannelHandler。

    获取一个 ExchangeServer,它的实现主要是 Server 的装饰类,依托外部传递的 Server 来实现 Server 功能,而自己加入一些额外的功能,如 ExchangeServer 的实现 HeaderExchangeServer,就是加入了心跳检测的功能。

    所以此时我们可以自定义扩展功能来实现 Exchanger。接口定义如下:

    @SPI(HeaderExchanger.NAME)
    public interface Exchanger { @Adaptive({Constants.EXCHANGER_KEY}) ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException; @Adaptive({Constants.EXCHANGER_KEY}) ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
    
    }
    

    默认使用的就是 HeaderExchanger,它创建的 ExchangeServer 是 HeaderExchangeServer 如下所示:

    public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }
    

    HeaderExchangeServer 仅仅是一个 Server 接口的装饰类,需要依托外部传递 Server 实现来完成具体的功能。此 Server 实现可以是 netty 也可以是 mina 等。所以我们可以自定义 Transporter 实现来选择不同底层通信框架,接口定义如下:

    @SPI("netty")
    public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException;
    
    }
    

    默认采用 netty 实现,如下:

    public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener);
        }
    
    }
    

    至此就到了我们上文介绍的内容了。同时 DubboProtocol 的 ChannelHandler 实现经过层层装饰器包装,最终传给底层通信 Server。

    客户端发送请求给服务器端时,底层通信 Server 会将请求经过层层处理最终传递给 DubboProtocol 的 ChannelHandler 实现,在该实现中,会根据请求参数找到对应的服务器端本地 Invoker,然后执行,再将返回结果通过底层通信 Server 发送给客户端。

##3.2 客户端的引用服务使用通信功能

在 DubboProtocol 引用服务的过程中:

  • 1 使用如下方式创建 client

    ExchangeClient client=Exchangers.connect(url ,requestHandler);
    

    requestHandler 还是 DubboProtocol 中 ChannelHandler 实现。

    和 Server 类似,我们可以通过自定义 Exchanger 实现来创建出不同功能的 ExchangeClient。默认的 Exchanger 实现是 HeaderExchanger

    public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }
    

    创建出来的 ExchangeClient 是 HeaderExchangeClient,它也是 Client 的包装类,仅仅在 Client 外层加上心跳检测的功能,向它所连接的服务器端发送心跳检测。

    HeaderExchangeClient 需要外界给它传一个 Client 实现,这是由 Transporter 接口实现来定的,默认是 NettyTransporter

    public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener);
        }
    
    }
    

    创建出来的的 Client 实现是 NettyClient。

    同时 DubboProtocol 的 ChannelHandler 实现经过层层装饰器包装,最终传给底层通信 Client。

    客户端的 DubboInvoker 调用远程服务的时候,会将调用信息通过 ExchangeClient 发送给服务器端,然后返回一个 ResponseFuture,根据客户端选择的同步还是异步方式,来决定阻塞还是直接返回,这一部分在上文同步调用和异步调用的实现中已经详细说过了。

#4 结束语

本篇文章主要介绍了集成 netty 和 mina 的那一块的通信接口及实现的设计,下篇主要介绍编解码的过程

欢迎关注微信公众号:乒乓狂魔

微信公众号

首页 - Wiki
Copyright © 2011-2025 iteam. Current version is 2.144.0. UTC+08:00, 2025-07-09 22:16
浙ICP备14020137号-1 $访客地图$