Dubbo系列-心跳机制
首先参考文章:聊聊 TCP 长连接和心跳那些事
TCP 中的 KeepAlive 机制:
KeepAlive 并不是 TCP 协议的一部分,但是大多数操作系统都实现了这个机制(所以需要在操作系统层面设置 KeepAlive 的相关参数)。KeepAlive 机制开启后,在一定时间内(一般时间为 7200s,参数 tcp_keepalive_time)在链路上没有数据传送的情况下,TCP 层将发送相应的 KeepAlive 探针以确定连接可用性,探测失败后重试 10(参数 tcp_keepalive_probes)次,每次间隔时间 75s(参数 tcp_keepalive_intvl),所有探测失败后,才认为当前连接已经不可用。
既然操作系统已经做了KeepAlive,为什么我们要在应用层面做KeepAlive呢?主要体现在三个方面:
- KeepAlive 的开关是在应用层开启的,但是具体参数(如重试测试,重试间隔时间)的设置却是操作系统级别的,位于操作系统的 /etc/sysctl.conf 配置中,这对于应用来说不够灵活。
- KeepAlive 的保活机制只在链路空闲的情况下才会起到作用,假如此时有数据发送,且物理链路已经不通,操作系统这边的链路状态还是 ESTABLISHED,这时会发生什么?自然会走 TCP 重传机制,要知道默认的 TCP 超时重传,指数退避算法也是一个相当长的过程。
- KeepAlive 本身是面向网络的,并不面向于应用,当连接不可用,可能是由于应用本身的 GC 频繁,系统 load 高等情况,但网络仍然是通的,此时,应用已经失去了活性,连接应该被认为是不可用的。
Dubbo的心跳机制
Dubbo的客户端和服务端都会进行心跳的检查,客户端在HeaderExchangeClient
中开启,服务端在HeaderExchangeServer
中开启。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class HeaderExchangeClient implements ExchangeClient { private void startHeartbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { return Collections.<Channel>singletonList(HeaderExchangeClient.this); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class HeaderExchangeServer implements ExchangeServer { private void startHeartbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } } }
|
客户端和服务端都是启动HeartBeatTask
来进行心跳的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| final class HeartBeatTask implements Runnable { @Override public void run() { try { long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue; } try { Long lastRead = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } if (lastRead != null && now - lastRead > heartbeatTimeout) { logger.warn("Close channel " + channel + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms"); if (channel instanceof Client) { try { ((Client) channel).reconnect(); } catch (Exception e) { } } else { channel.close(); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } } } catch (Throwable t) { logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t); } } }
|
其中heartbeat
默认60*1000,heartbeatTimeout
默认3倍的heartbeat
,且heartbeatTimeout
必须要大于2倍的heartbeat
,否则启动会报错。