Dubbo系列-心跳机制

首先参考文章:聊聊 TCP 长连接和心跳那些事

TCP 中的 KeepAlive 机制:

KeepAlive 并不是 TCP 协议的一部分,但是大多数操作系统都实现了这个机制(所以需要在操作系统层面设置 KeepAlive 的相关参数)。KeepAlive 机制开启后,在一定时间内(一般时间为 7200s,参数 tcp_keepalive_time)在链路上没有数据传送的情况下,TCP 层将发送相应的 KeepAlive 探针以确定连接可用性,探测失败后重试 10(参数 tcp_keepalive_probes)次,每次间隔时间 75s(参数 tcp_keepalive_intvl),所有探测失败后,才认为当前连接已经不可用。

既然操作系统已经做了KeepAlive,为什么我们要在应用层面做KeepAlive呢?主要体现在三个方面:

  • KeepAlive 的开关是在应用层开启的,但是具体参数(如重试测试,重试间隔时间)的设置却是操作系统级别的,位于操作系统的 /etc/sysctl.conf 配置中,这对于应用来说不够灵活。
  • KeepAlive 的保活机制只在链路空闲的情况下才会起到作用,假如此时有数据发送,且物理链路已经不通,操作系统这边的链路状态还是 ESTABLISHED,这时会发生什么?自然会走 TCP 重传机制,要知道默认的 TCP 超时重传,指数退避算法也是一个相当长的过程。
  • KeepAlive 本身是面向网络的,并不面向于应用,当连接不可用,可能是由于应用本身的 GC 频繁,系统 load 高等情况,但网络仍然是通的,此时,应用已经失去了活性,连接应该被认为是不可用的。

Dubbo的心跳机制

Dubbo的客户端和服务端都会进行心跳的检查,客户端在HeaderExchangeClient中开启,服务端在HeaderExchangeServer中开启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HeaderExchangeClient implements ExchangeClient {

private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HeaderExchangeServer implements ExchangeServer {
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
}

客户端和服务端都是启动HeartBeatTask来进行心跳的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
final class HeartBeatTask implements Runnable {
@Override
public void run() {
try {
//当前时间
long now = System.currentTimeMillis();
//循环所有的channel
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
//获得最近一次读数据的时间
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
//最近一次写数据的时间
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
//写或者读的时间建个如果大于heartbeat的话,发送一个心跳包
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
//写或者读的时间建个如果大于heartbeatTimeout的话,客户端重新连接,服务端断开连接
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
}

其中heartbeat默认60*1000,heartbeatTimeout默认3倍的heartbeat,且heartbeatTimeout必须要大于2倍的heartbeat,否则启动会报错。