无论是B/S还是C/S架构,如果你用的是长连接,那么心跳是必不可少的。Netty提供了对心跳机制的天然支持,今天结合例子特地学习了一下。
首先,我们来设想一下何时需要发送心跳。假设你做的是一款棋牌类小游戏,那么当玩家登陆游戏后肯定是先进入大厅,再选择一张合适的桌子正式开始游戏。此时玩家的客户端与服务器建立的这一次session(会话)应该是长久保持着,如果服务器端保存着大量的session,那么整个服务器就会越来越卡,最终整个服务都会挂掉。
为了预防这种情况,我们需要清理掉一些已经不用的或者理论上不会再用的session,比如:在手机上,如果我们在游戏中,突然接到一个电话或者退回桌面,这个时候我们的游戏客户端理论上就不会再主动向我们发送任何消息。这时候,心跳就派上用场了。
心跳,是为了证明自己还活着。因此,这里的心跳,说白了就是客户端向服务器端发送一次请求,服务器端相应,这样客户端就知道了服务器端是alive(活着的);服务器端向客户端发送一次心跳,客户端相应,这样服务器端就知道了客户端是alive。
知道了心跳的大致概念,那现在我们就需要知道Netty中是如何实现心跳,这就引出了两个类:IdleStateHandler、ChannelInboundHandlerAdapter
IdleStateHandler
大致作用
当连接的空闲时间(无论是读或者是写)太长时,都会触发IdleStateEvent事件。你可以写一个类继承ChannelInboundHandlerAdapter,重写userEventTriggered方法,来处理这类空闲事件。
知道了其大致作用,那么接下来就看看我们到底该如何使用了。
IdleStateHandler有3个构造方法,主要针对这4个属性,分别是:1
2
3
4private final boolean observeOutput;// 是否考虑出站时较慢的情况。默认值是false(一般不考虑)。
private final long readerIdleTimeNanos; // 读事件空闲时间,0 代表禁用事件
private final long writerIdleTimeNanos;// 写事件空闲时间,0 代表禁用事件
private final long allIdleTimeNanos; //读或写空闲时间,0 代表禁用事件
上面的三个时间,默认是秒,你也可以在构造的时候指定。
当你在pipeline中加入了该handler之后:
pipeline.addLast(new IdleStateHandler(30, 90, 0)); // 这个代表只考虑读空闲30秒或写空闲90秒的情况
则会先调用handlerAdded
方法:1
2
3
4
5
6
7
8
9
10
11
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
如果channel正常,则调用initialize
方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private byte state; // 0 - none, 1 - initialized, 2 - destroyed
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1: // 避免重复添加
case 2: // 如果处于destoryed状态,则不需要添加
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos(); // 当前时间
// 添加相应的定时调度任务
if (readerIdleTimeNanos > 0) {
// readerIdleTimeNanos时间后,执行ReaderIdleTimeoutTask里面的方法
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask
均继承自类AbstractIdleTask
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private abstract static class AbstractIdleTask implements Runnable {
private final ChannelHandlerContext ctx;
AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
run(ctx);
}
// 子类需要实现的方法
protected abstract void run(ChannelHandlerContext ctx);
}
以ReaderIdleTimeoutTask
为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) { // 如果不在读(channelRead时会被置为true,cahnnelReadComplete时会被置为false)
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) { // 说明读空闲时间达到或超过预设时间
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
// firstReaderIdleEvent,是否是第一次读空闲事件(该标志位会在下一次channelRead触发时改成true,所以应该理解在一次读取完成后,这个读空闲事件是不是第一次)
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
// 生成一个IdleStateEvent对象
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
// 找到下一个ChannelInboundHandler类(或其子类)的handler,触发其userEventTrigger(可以参考AbstractChannelHandlerContext的fireUserEventTriggered方法)
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else { // 要么正在读,要么读空闲时间小于预设时间
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
schedule
方法可以理解为将定时调度事件放进一个队列当中(我是在AbstractScheduledEventExecutor
里找到的scheduledTaskQueue().add(task);
,但这里面的代码我还没看明白,有兴趣的你可以自己研究,研究完后如果有空可在下方评论)。channelIdle(ctx, event)
方法时找到下一个ChannelInboundHandler类(或其子类)的handler,因此你写的继承自ChannelInboundHandler的handler,一定要添加在IdleStateHandler
的后面,比如:1
2pipeline.addLast(new IdleStateHandler(30, 90, 0));
pipeline.addLast(heartbeatHandler);
ChannelInboundHandler
它就很简单了,因为上面说了,channelIdle会调用ChannelInboundHandler的userEventTrigger,所以你只要自己写一个类继承ChannelInboundHandler,并重写它的userEventTrigger方法。比如:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// 用Sharable是因为我的每一个pipeline中用的都是同样的handler
public class NettyHeartbeatHandler extends ChannelInboundHandlerAdapter {
private final IHeartbeatFactory factory;
public NettyHeartbeatHandler(IHeartbeatFactory factory) {
Preconditions.checkNotNull(factory);
this.factory = factory;
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (!(evt instanceof IdleStateEvent)) {
super.userEventTriggered(ctx, evt);
return;
}
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) { // 如果是读空闲,则关闭当前会话
ctx.close(); // 此时会触发下一个ChannelOutboundHandler的close方法,你可以在自己写的handler中进行断线操作
} else if (event.state() == IdleState.WRITER_IDLE) {
// 如果是写空闲,则向客户端发送心跳请求包,如果客户端不返回心跳相应包,则说明客户端断线,下一次就将触发读空闲事件。这也是为了向客户端证明服务器端alive
ctx.writeAndFlush(
new BinaryWebSocketFrame(
Unpooled.copiedBuffer(factory.getHeartbeatRequest().toByteArray())
)
);
}
}
}
因此,以上就是关于用Netty实现心跳的简单介绍。其中带大家重点看了服务器端应该在什么情况下发起一次心跳请求,应该是长久没有收到消息时(可能是有业务含义的消息或者是一个心跳包)。如果大家有什么想法可以在下方评论。