dubbo系列(四)服务端调用流程解析 [toc]
流程解析 通过服务暴露流程,我们知道了dubbo服务端暴露dubbo协议的服务是在DubboProtocol类中进行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false ); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded." )); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } openServer(url); optimizeSerialization(url); return exporter; } private void openServer (URL url) { String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true ); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null ) { serverMap.put(key, createServer(url)); } else { server.reset(url); } } } private ExchangeServer createServer (URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0 ) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
我们这里先不管Invoker
对象是做什么的,可以看到在暴露服务前,dubbo先将invoker
对象封装进DubboExporter
对象中,然后存到一个map
对象里。在最后调用bind
接口时,除了传入url
外,还会传入requestHandler
对象,我们先看下requestHandler
对象是做什么的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply (ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods" ); boolean hasMethod = false ; if (methodsStr == null || methodsStr.indexOf("," ) == -1 ) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split("," ); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true ; break ; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null ; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super .received(channel, message); } } @Override public void connected (Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override public void disconnected (Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } private void invoke (Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null ) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } private Invocation createInvocation (Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0 ) { return null ; } RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0 ], new Object[0 ]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false )) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } };
通过requestHandler
的实现方法我们可以看到它主要用来处理各种请求回调,我们重点关注reply
的回调方法,主要做了3件事情:
从exporterMap获取具体的invoker对象。
判断调用的method是否存在。
调用invoker.invoke
。
我们继续深入Exchangers.bind
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange" ); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger (URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger (String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
Exchangers.bind
主要做了2件事情:
获取Exchanger
对象,这里根据SPI默认获取HeaderExchanger
,实际上Exchanger
也只有这么一个实现类。
调用Exchanger
的bind
方法,返回ExchangeServer
对象。
我们再看HeaderExchanger
的bind
代码。
1 2 3 4 @Override public ExchangeServer bind (URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
HeaderExchanger
主要3件事情:
封装handler
对象,外面分别套了一个HeaderExchangeHandler
和DecodeHandler
。
Transporters.bind
开启服务,传入url
和ChannelHandler
两个参数,返回Server
。
初始化HeaderExchangeServer
对象并返回。
我们先看HeaderExchangeServer
做了什么事情。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public HeaderExchangeServer (Server server) { if (server == null ) { throw new IllegalArgumentException("server == null" ); } this .server = server; this .heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0 ); this .heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 ); if (heartbeatTimeout < heartbeat * 2 ) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2" ); } startHeartbeatTimer(); } private void startHeartbeatTimer () { stopHeartbeatTimer(); if (heartbeat > 0 ) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels () { return Collections.unmodifiableCollection( HeaderExchangeServer.this .getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } } private void stopHeartbeatTimer () { try { ScheduledFuture<?> timer = heartbeatTimer; if (timer != null && !timer.isCancelled()) { timer.cancel(true ); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } finally { heartbeatTimer = null ; } } @Override public void close () { doClose(); server.close(); } ...
HeaderExchangeServer
主要是封装了server对象,并通过ScheduledExecutorService
来定时向客户端发送心跳。心跳在这里不详细分析,我们继续深入Transporters.bind
看服务是如何暴露的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static Server bind (URL url, ChannelHandler... handlers) throws RemotingException { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handlers == null || handlers.length == 0 ) { throw new IllegalArgumentException("handlers == null" ); } ChannelHandler handler; if (handlers.length == 1 ) { handler = handlers[0 ]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } public static Transporter getTransporter () { return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }
Transporters.bind
主要做的两件事情:
通过SPI
找到Transporter
对象,这里默认使用NettyTransporter
。
调用NettyTransporter.bind
1 2 3 4 @Override public Server bind (URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); }
NettyTransporter.bind
方法初始化了一个NettyServer
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public NettyServer (URL url, ChannelHandler handler) throws RemotingException { super (url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } @Override protected void doOpen () throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1 , new DefaultThreadFactory("NettyServerBoss" , true )); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker" , true )); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this ); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel (NioSocketChannel ch) throws Exception { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this ); ch.pipeline() .addLast("decoder" , adapter.getDecoder()) .addLast("encoder" , adapter.getEncoder()) .addLast("handler" , nettyServerHandler); } }); ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
NettyServer
主要做的几件事情:
初始化时对ChannelHandler
进行封装。
初始化NettyServerHandler
,并传入NettyServer
自身。
初始化netty
的ServerBootstrap
,注册3三个ChannelHandler
。
启动服务。
dubbo
编解码相关的以后单独讲,我们继续深入nettyServerHandler
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class NettyServerHandler extends ChannelDuplexHandler { private final ChannelHandler handler; public NettyServerHandler (URL url, ChannelHandler handler) { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } this .url = url; this .handler = handler; } ... @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } ... @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { super .write(ctx, msg, promise); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.sent(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } ... }
NettyServerHandler
继承netty
的ChannelDuplexHandler
,实现了channelRead
和write
两个方法,最终调用ChannelHandler
的received
和sent
方法。NettyServerHandler
可以看成是netty
的channelHandler
和dubbo
的channelHandler
的转换。
根据类图可以看到NettyServer
实现了dubbo
的channelHandler
,其sent
方法和received
方法主要在父类AbstractPeer
中实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public abstract class AbstractPeer implements Endpoint , ChannelHandler { public AbstractPeer (URL url, ChannelHandler handler) { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } this .url = url; this .handler = handler; } @Override public void sent (Channel ch, Object msg) throws RemotingException { if (closed) { return ; } handler.sent(ch, msg); } @Override public void received (Channel ch, Object msg) throws RemotingException { if (closed) { return ; } handler.received(ch, msg); }
AbstractPeer
调用ChannelHandler
的sent
和received
方法。而ChannelHandler
是在NettyServer
初始化时传入的。
1 2 3 public NettyServer (URL url, ChannelHandler handler) throws RemotingException { super (url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
ExecutorUtil.setThreadName
在url加了一个线程名的参数DubboServerHandler
,在传入handler
前,ChannelHandlers.wrap
对handler
进行了封装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ChannelHandlers { private static ChannelHandlers INSTANCE = new ChannelHandlers(); protected ChannelHandlers () { } public static ChannelHandler wrap (ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected static ChannelHandlers getInstance () { return INSTANCE; } static void setTestingChannelHandlers (ChannelHandlers instance) { INSTANCE = instance; } protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } }
在wrapInternal
方法中,对handler
进行了3次的封装:
Dispatcher.dispatch
HeartbeatHandler
MultiMessageHandler
我们依次来看这3个ChannelHandler
各做什么事情。首先Dispatcher
是个SPI,默认会使用AllDispatcher
。
1 2 3 4 5 6 7 8 9 10 public class AllDispatcher implements Dispatcher { public static final String NAME = "all" ; @Override public ChannelHandler dispatch (ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class AllChannelHandler extends WrappedChannelHandler { ... @Override public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if (request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return ; } } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class ChannelEventRunnable implements Runnable {@Override public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break ; default : logger.warn("unknown state: " + state + ", message is " + message); } } } }
AllChannelHandler
做的事情其实就是将具体的业务处理代码放入线程池。ChannelEventRunnable
中的代码看起来有点怪异,其实是CPU分支预测的优化,具体查看优化技巧:提前if判断帮助CPU分支预测
这里可以总结一下dubbo
的几个线程池和Dispatcher
。
Dispatcher
all:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。
direct:所有消息都不派发到线程池,全部在IO线程上直接执行。
message:只有请求响应消息派发到线程池,其他连接断开事件、心跳等消息,直接在IO线程上执行。
execution:只请求消息派发到线程池,不含响应,响应和其他连接断开事件、心跳等消息,直接在IO线程上执行。
connection:在IO线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池。
ThreadPool
fixed:固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
cached:缓存线程池,空闲一分钟自动删除,需要时重建。
limited:可伸缩线程池,但池中的线程只会增长不会收缩。(为避免收缩时突然来了大流量引起的性能问题)。
接下来看HeartbeatHandler
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class HeartbeatHandler extends AbstractChannelHandlerDelegate { ... @Override public void received (Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0 ); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "" )); } } } return ; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName()); } return ; } handler.received(channel, message); } private boolean isHeartbeatRequest (Object message) { return message instanceof Request && ((Request) message).isHeartbeat(); } ... }
HeartbeatHandler
负责处理接收到的心跳。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class MultiMessageHandler extends AbstractChannelHandlerDelegate { public MultiMessageHandler (ChannelHandler handler) { super (handler); } @SuppressWarnings("unchecked") @Override public void received (Channel channel, Object message) throws RemotingException { if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } } }
MultiMessageHandler
如果是批量请求,则依次对请求调用下一个Handler来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class DecodeHandler extends AbstractChannelHandlerDelegate { private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class); public DecodeHandler (ChannelHandler handler) { super (handler); } @Override public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); } private void decode (Object message) { if (message != null && message instanceof Decodeable) { try { ((Decodeable) message).decode(); if (log.isDebugEnabled()) { log.debug("Decode decodeable message " + message.getClass().getName()); } } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } } } }
接下来会调用DecodeHandler
,DecodeHandler
调用message
的decode
方法进行序列化。
HeaderExchangeHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class HeaderExchangeHandler implements ChannelHandlerDelegate { @Override public void received (Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0 ) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } }
最后经过层层过滤回到DubboProtocol
的ExchangeHandler
上,通过invoker.invoke(Invocation)
调到具体的业务代码上。
invoker 通过调试,可以看到invoker是个链式的结构,最终调用到了具体的服务方法上。我们先看ProtocolFilterWrapper
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ProtocolFilterWrapper implements Protocol { @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } private static <T> Invoker<T> buildInvokerChain (final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1 ; i >= 0 ; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface () { return invoker.getInterface(); } @Override public URL getUrl () { return invoker.getUrl(); } @Override public boolean isAvailable () { return invoker.isAvailable(); } @Override public Result invoke (Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy () { invoker.destroy(); } @Override public String toString () { return invoker.toString(); } }; } } return last; } }
ProtocolFilterWrapper
当协议不是registry时,调用buildInvokerChain
方法。buildInvokerChain
获取所有的Filter,循环构建调用链。 Dubbo对于服务端的Filter
有:
com.alibaba.dubbo.rpc.filter.EchoFilter com.alibaba.dubbo.rpc.filter.ClassLoaderFilter com.alibaba.dubbo.rpc.filter.GenericFilter com.alibaba.dubbo.rpc.filter.ContextFilter com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter com.alibaba.dubbo.rpc.filter.TimeoutFilter com.alibaba.dubbo.monitor.support.MonitorFilter com.alibaba.dubbo.rpc.filter.ExceptionFilter
每个Filter
的作用不详细展开。
接下来的InvokerDelegete
和DelegateProviderMetaDataInvoker
是没有什么业务的委托类,最后看JavassistProxyFactory
。
在ServiceConfig
的doExportUrlsFor1Protocol()
中,第一次创建了Invoker
对象。
1 2 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this );
其中proxyFactory
默认使用,JavassistProxyFactory
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class JavassistProxyFactory extends AbstractProxyFactory { @Override @SuppressWarnings("unchecked") public <T> T getProxy (Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } @Override public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
JavassistProxyFactory
使用Wrapper.getWrapper
创建Wrapper
对象,然后创建AbstractProxyInvoker
对象,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public abstract class Wrapper { public static Wrapper getWrapper (Class<?> c) { while (ClassGenerator.isDynamicClass(c)) c = c.getSuperclass(); if (c == Object.class) return OBJECT_WRAPPER; Wrapper ret = WRAPPER_MAP.get(c); if (ret == null ) { ret = makeWrapper(c); WRAPPER_MAP.put(c, ret); } return ret; } private static Wrapper makeWrapper (Class<?> c) { ... ClassLoader cl = ClassHelper.getClassLoader(c); ... ClassGenerator cc = ClassGenerator.newInstance(cl); ... Class<?> wc = cc.toClass(); ... return (Wrapper) wc.newInstance(); } }
Wrapper
通过javassist动态生成Wrapper
类,将文件反编译,可以看到代理类的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 public class Wrapper0 extends Wrapper implements DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; } public boolean hasProperty (String var1) { return pts.containsKey(var1); } public Class getPropertyType (String var1) { return (Class)pts.get(var1); } public String[] getMethodNames() { return mns; } public String[] getDeclaredMethodNames() { return dmns; } public void setPropertyValue (Object var1, String var2, Object var3) { try { DemoService var4 = (DemoService)var1; } catch (Throwable var6) { throw new IllegalArgumentException(var6); } throw new NoSuchPropertyException("Not found property \"" + var2 + "\" filed or setter method in class com.guhailin.sample.DemoService." ); } public Object getPropertyValue (Object var1, String var2) { try { DemoService var3 = (DemoService)var1; } catch (Throwable var5) { throw new IllegalArgumentException(var5); } throw new NoSuchPropertyException("Not found property \"" + var2 + "\" filed or setter method in class com.guhailin.sample.DemoService." ); } public Object invokeMethod (Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException { DemoService var5; try { var5 = (DemoService)var1; } catch (Throwable var8) { throw new IllegalArgumentException(var8); } try { if ("sayHello" .equals(var2) && var3.length == 1 ) { return var5.sayHello((String)var4[0 ]); } } catch (Throwable var9) { throw new InvocationTargetException(var9); } throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.guhailin.sample.DemoService." ); } public Wrapper0 () { } }
由此可以看到代理类是如何调用实体类的。
总结 线程模型
在使用默认AllDispatcher
的情况下,Dubbo
的线程模型为 1+M+N 的模型,即1个acceptor线程,M个IO线程,N个业务线程。
调用栈 1 2 3 4 5 6 7 8 9 10 11 12 graph TB boss[boss] --> worker[worker] worker --> nettyServerhandler[nettyServerhandler] nettyServerhandler --> nettyServer[nettyServer] nettyServer --> MultiMessageHandler[MultiMessageHandler] MultiMessageHandler --> HeartBeatHhandler[HeartBeatHhandler] HeartBeatHhandler --> AllChannelHandler[AllChannelHandler] AllChannelHandler --> ThreadPool[ThreadPool] ThreadPool --> DecodeHandler[DecodeHandler] DecodeHandler --> HeaderExchangehandler[HeaderExchangehandler] HeaderExchangehandler --> DubboProtocol.ExchangeHandler[DubboProtocol.ExchangeHandler] DubboProtocol.ExchangeHandler --> Invoker[Invoker]