Dubbo系列 - QOS QOS是Dubbo在2.5.8加入的新的telnet命令,具体使用方式可以看官方文档:在线运维命令 - QOS
QOS服务启动的入口在QosProtocolWrapper
,QosProtocolWrapper
是Protocol
的包装类,当通过adaptive
方式来调用RegistryProtocol
或者和其他协议的实现类时,会先执行QosProtocolWrapper
的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override public <T> Exporter<T> export (Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { startQosServer(invoker.getUrl()); return protocol.export(invoker); } return protocol.export(invoker); } @Override public <T> Invoker<T> refer (Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { startQosServer(url); return protocol.refer(type, url); } return protocol.refer(type, url); }
QosProtocolWrapper
在export
和refer
时,判断如果是registry
协议,调用startQosServer
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void startQosServer (URL url) { try { boolean qosEnable = url.getParameter(QOS_ENABLE,true ); if (!qosEnable) { logger.info("qos won't be started because it is disabled. " + "Please check dubbo.application.qos.enable is configured either in system property, " + "dubbo.properties or XML/spring boot configuration." ); return ; } if (!hasStarted.compareAndSet(false , true )) { return ; } int port = url.getParameter(QOS_PORT, DEFAULT_PORT); boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false" )); Server server = com.alibaba.dubbo.qos.server.Server.getInstance(); server.setPort(port); server.setAcceptForeignIp(acceptForeignIp); server.start(); } catch (Throwable throwable) { logger.warn("Fail to start qos server: " , throwable); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void start () throws Throwable { if (!hasStarted.compareAndSet(false , true )) { return ; } boss = new NioEventLoopGroup(0 , new DefaultThreadFactory("qos-boss" , true )); worker = new NioEventLoopGroup(0 , new DefaultThreadFactory("qos-worker" , true )); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true ); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true ); serverBootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel (Channel ch) throws Exception { ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp)); } }); try { serverBootstrap.bind(port).sync(); logger.info("qos-server bind localhost:" + port); } catch (Throwable throwable) { logger.error("qos-server can not bind localhost:" + port, throwable); throw throwable; } }
通过netty来启动服务,最后业务处理在QosProcessHandler
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 public class QosProcessHandler extends ByteToMessageDecoder { private ScheduledFuture<?> welcomeFuture; private String welcome; private boolean acceptForeignIp; public static String prompt = "dubbo>" ; public QosProcessHandler (String welcome, boolean acceptForeignIp) { this .welcome = welcome; this .acceptForeignIp = acceptForeignIp; } @Override public void channelActive (final ChannelHandlerContext ctx) throws Exception { welcomeFuture = ctx.executor().schedule(new Runnable() { @Override public void run () { if (welcome != null ) { ctx.write(Unpooled.wrappedBuffer(welcome.getBytes())); ctx.writeAndFlush(Unpooled.wrappedBuffer(prompt.getBytes())); } } }, 500 , TimeUnit.MILLISECONDS); } @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 1 ) { return ; } final int magic = in.getByte(in.readerIndex()); ChannelPipeline p = ctx.pipeline(); p.addLast(new LocalHostPermitHandler(acceptForeignIp)); if (isHttp(magic)) { if (welcomeFuture != null && welcomeFuture.isCancellable()) { welcomeFuture.cancel(false ); } p.addLast(new HttpServerCodec()); p.addLast(new HttpObjectAggregator(1048576 )); p.addLast(new HttpProcessHandler()); p.remove(this ); } else { p.addLast(new LineBasedFrameDecoder(2048 )); p.addLast(new StringDecoder(CharsetUtil.UTF_8)); p.addLast(new StringEncoder(CharsetUtil.UTF_8)); p.addLast(new IdleStateHandler(0 , 0 , 5 * 60 )); p.addLast(new TelnetProcessHandler()); p.remove(this ); } } private static boolean isHttp (int magic) { return magic == 'G' || magic == 'P' ; } }
QosProcessHandler
主要判断协议,然后加入相应的处理器。我们重点看TelnetProcessHandler
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { if (StringUtils.isBlank(msg)) { ctx.writeAndFlush(QosProcessHandler.prompt); } else { CommandContext commandContext = TelnetCommandDecoder.decode(msg); commandContext.setRemote(ctx.channel()); try { String result = commandExecutor.execute(commandContext); if (StringUtils.equals(QosConstants.CLOSE, result)) { ctx.writeAndFlush(getByeLabel()).addListener(ChannelFutureListener.CLOSE); } else { ctx.writeAndFlush(result + QosConstants.BR_STR + QosProcessHandler.prompt); } } catch (NoSuchCommandException ex) { ctx.writeAndFlush(msg + " :no such command" ); ctx.writeAndFlush(QosConstants.BR_STR + QosProcessHandler.prompt); log.error("can not found command " + commandContext, ex); } catch (Exception ex) { ctx.writeAndFlush(msg + " :fail to execute commandContext by " + ex.getMessage()); ctx.writeAndFlush(QosConstants.BR_STR + QosProcessHandler.prompt); log.error("execute commandContext got exception " + commandContext, ex); } } }
1 2 3 4 5 6 7 8 9 10 11 12 public String execute (CommandContext commandContext) throws NoSuchCommandException { BaseCommand command = null ; try { command = ExtensionLoader.getExtensionLoader(BaseCommand.class).getExtension(commandContext.getCommandName()); } catch (Throwable throwable) { } if (command == null ) { throw new NoSuchCommandException(commandContext.getCommandName()); } return command.execute(commandContext, commandContext.getArgs()); }
TelnetProcessHandler
解析命令之后,调用commandExecutor.execute
方法,而commandExecutor.execute
方法则是去找具体的BaseCommand
扩展。
Dubbo
一共有5个命令实现:
Help: 打印帮助信息 Ls: 打印所有的服务 Offline: 下线服务 Online: 上线服务 Quit: 关闭连接
举例看下Offline
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Cmd(name = "offline", summary = "offline dubbo", example = { "offline dubbo", "offline xx.xx.xxx.service" }) public class Offline implements BaseCommand { private Logger logger = LoggerFactory.getLogger(Offline.class); private RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension(); @Override public String execute (CommandContext commandContext, String[] args) { logger.info("receive offline command" ); String servicePattern = ".*" ; if (args != null && args.length > 0 ) { servicePattern = args[0 ]; } boolean hasService = false ; List<ProviderModel> providerModelList = ApplicationModel.allProviderModels(); for (ProviderModel providerModel : providerModelList) { if (providerModel.getServiceName().matches(servicePattern)) { hasService = true ; Set<ProviderInvokerWrapper> providerInvokerWrapperSet = ProviderConsumerRegTable.getProviderInvoker(providerModel.getServiceName()); for (ProviderInvokerWrapper providerInvokerWrapper : providerInvokerWrapperSet) { if (!providerInvokerWrapper.isReg()) { continue ; } Registry registry = registryFactory.getRegistry(providerInvokerWrapper.getRegistryUrl()); registry.unregister(providerInvokerWrapper.getProviderUrl()); providerInvokerWrapper.setReg(false ); } } } if (hasService) { return "OK" ; } else { return "service not found" ; } } }
Offline
主要是从ApplicationModel.allProviderModels()
获取所有的服务,然后调用registry.unregister
方法注销服务。