Dubbo系列 - QOS

QOS是Dubbo在2.5.8加入的新的telnet命令,具体使用方式可以看官方文档:
在线运维命令 - QOS

QOS服务启动的入口在QosProtocolWrapperQosProtocolWrapperProtocol的包装类,当通过adaptive方式来调用RegistryProtocol或者和其他协议的实现类时,会先执行QosProtocolWrapper的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
startQosServer(url);
return protocol.refer(type, url);
}
return protocol.refer(type, url);
}

QosProtocolWrapperexportrefer时,判断如果是registry协议,调用startQosServer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void startQosServer(URL url) {
try {
boolean qosEnable = url.getParameter(QOS_ENABLE,true);
if (!qosEnable) {
logger.info("qos won't be started because it is disabled. " +
"Please check dubbo.application.qos.enable is configured either in system property, " +
"dubbo.properties or XML/spring boot configuration.");
return;
}
//hasStarted是AtomicBoolean类型,通过CAS操作来保证只会启动一次服务。
if (!hasStarted.compareAndSet(false, true)) {
return;
}

int port = url.getParameter(QOS_PORT, DEFAULT_PORT);
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false"));
Server server = com.alibaba.dubbo.qos.server.Server.getInstance();
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();

} catch (Throwable throwable) {
logger.warn("Fail to start qos server: ", throwable);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void start() throws Throwable {
if (!hasStarted.compareAndSet(false, true)) {
return;
}
boss = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-boss", true));
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
}
});
try {
serverBootstrap.bind(port).sync();
logger.info("qos-server bind localhost:" + port);
} catch (Throwable throwable) {
logger.error("qos-server can not bind localhost:" + port, throwable);
throw throwable;
}
}

通过netty来启动服务,最后业务处理在QosProcessHandler中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class QosProcessHandler extends ByteToMessageDecoder {

private ScheduledFuture<?> welcomeFuture;

private String welcome;
// true means to accept foreign IP
private boolean acceptForeignIp;

public static String prompt = "dubbo>";

public QosProcessHandler(String welcome, boolean acceptForeignIp) {
this.welcome = welcome;
this.acceptForeignIp = acceptForeignIp;
}

@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
//建立连接时打印LOGO
welcomeFuture = ctx.executor().schedule(new Runnable() {

@Override
public void run() {
if (welcome != null) {
ctx.write(Unpooled.wrappedBuffer(welcome.getBytes()));
ctx.writeAndFlush(Unpooled.wrappedBuffer(prompt.getBytes()));
}
}

}, 500, TimeUnit.MILLISECONDS);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 1) {
return;
}

// read one byte to guess protocol
final int magic = in.getByte(in.readerIndex());

ChannelPipeline p = ctx.pipeline();
p.addLast(new LocalHostPermitHandler(acceptForeignIp));
//取第一个字节,如果是G或者P,则判断是HTTP服务,否则是telnet服务。
//移除当前的handler,加入具体的协议的handler
if (isHttp(magic)) {
// no welcome output for http protocol
if (welcomeFuture != null && welcomeFuture.isCancellable()) {
welcomeFuture.cancel(false);
}
p.addLast(new HttpServerCodec());
p.addLast(new HttpObjectAggregator(1048576));
p.addLast(new HttpProcessHandler());
p.remove(this);
} else {
p.addLast(new LineBasedFrameDecoder(2048));
p.addLast(new StringDecoder(CharsetUtil.UTF_8));
p.addLast(new StringEncoder(CharsetUtil.UTF_8));
p.addLast(new IdleStateHandler(0, 0, 5 * 60));
p.addLast(new TelnetProcessHandler());
p.remove(this);
}
}

// G for GET, and P for POST
private static boolean isHttp(int magic) {
return magic == 'G' || magic == 'P';
}
}

QosProcessHandler主要判断协议,然后加入相应的处理器。我们重点看TelnetProcessHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

if (StringUtils.isBlank(msg)) {
ctx.writeAndFlush(QosProcessHandler.prompt);
} else {
CommandContext commandContext = TelnetCommandDecoder.decode(msg);
commandContext.setRemote(ctx.channel());

try {
String result = commandExecutor.execute(commandContext);
if (StringUtils.equals(QosConstants.CLOSE, result)) {
ctx.writeAndFlush(getByeLabel()).addListener(ChannelFutureListener.CLOSE);
} else {
ctx.writeAndFlush(result + QosConstants.BR_STR + QosProcessHandler.prompt);
}
} catch (NoSuchCommandException ex) {
ctx.writeAndFlush(msg + " :no such command");
ctx.writeAndFlush(QosConstants.BR_STR + QosProcessHandler.prompt);
log.error("can not found command " + commandContext, ex);
} catch (Exception ex) {
ctx.writeAndFlush(msg + " :fail to execute commandContext by " + ex.getMessage());
ctx.writeAndFlush(QosConstants.BR_STR + QosProcessHandler.prompt);
log.error("execute commandContext got exception " + commandContext, ex);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public String execute(CommandContext commandContext) throws NoSuchCommandException {
BaseCommand command = null;
try {
command = ExtensionLoader.getExtensionLoader(BaseCommand.class).getExtension(commandContext.getCommandName());
} catch (Throwable throwable) {
//can't find command
}
if (command == null) {
throw new NoSuchCommandException(commandContext.getCommandName());
}
return command.execute(commandContext, commandContext.getArgs());
}

TelnetProcessHandler解析命令之后,调用commandExecutor.execute方法,而commandExecutor.execute方法则是去找具体的BaseCommand扩展。

Dubbo一共有5个命令实现:

Help: 打印帮助信息
Ls: 打印所有的服务
Offline: 下线服务
Online: 上线服务
Quit: 关闭连接

举例看下Offline的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Cmd(name = "offline", summary = "offline dubbo", example = {
"offline dubbo",
"offline xx.xx.xxx.service"
})
public class Offline implements BaseCommand {
private Logger logger = LoggerFactory.getLogger(Offline.class);
private RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();

@Override
public String execute(CommandContext commandContext, String[] args) {
logger.info("receive offline command");
String servicePattern = ".*";
if (args != null && args.length > 0) {
servicePattern = args[0];
}
boolean hasService = false;

List<ProviderModel> providerModelList = ApplicationModel.allProviderModels();
for (ProviderModel providerModel : providerModelList) {
if (providerModel.getServiceName().matches(servicePattern)) {
hasService = true;
Set<ProviderInvokerWrapper> providerInvokerWrapperSet = ProviderConsumerRegTable.getProviderInvoker(providerModel.getServiceName());
for (ProviderInvokerWrapper providerInvokerWrapper : providerInvokerWrapperSet) {
if (!providerInvokerWrapper.isReg()) {
continue;
}
Registry registry = registryFactory.getRegistry(providerInvokerWrapper.getRegistryUrl());
registry.unregister(providerInvokerWrapper.getProviderUrl());
providerInvokerWrapper.setReg(false);
}
}
}

if (hasService) {
return "OK";
} else {
return "service not found";
}
}
}

Offline主要是从ApplicationModel.allProviderModels()获取所有的服务,然后调用registry.unregister方法注销服务。