Netty 实现简单心跳机制分析

java哥 阅读:155 2021-03-31 12:53:35 评论:0

一、计算机网络通信之心跳概念
网络中的接收和发送数据都是使用操作系统中的SOCKET进行实现。但是如果此套接字已经断开,那发送数据和接收数据的时候就一定会有问题。可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制。其实TCP中已经为我们实现了一个叫做心跳的机制。如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议。所谓“心跳”就是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着。 以确保链接的有效性。

所谓的心跳包就是客户端定时发送简单的信息给服务器端告诉它我还在而已。代码就是每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息如果服务端几分钟内没有收到客户端信息则视客户端断开。比如有些通信软件长时间不使用,要想知道它的状态是在线还是离线就需要心跳包,定时发包收包。发包方:可以是客户也可以是服务端,看哪边实现方便合理。一般是客户端。服务器也可以定时轮询发心跳下去。心跳包之所以叫心跳包是因为:它像心跳一样每隔固定时间发一次,以此来告诉服务器,这个客户端还活着。事实上这是为了保持长连接,至于这个包的内容,是没有什么特别规定的,不过一般都是很小的包,或者只包含包头的一个空包。

在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项。系统默认是设置的是2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。心跳包一般来说都是在逻辑层发送空的包来实现的。下一个定时器,在一定时间间隔下发送一个空包给客户端,然后客户端反馈一个同样的空包回来,服务器如果在一定时间内收不到客户端发送过来的反馈包,那就只有认定说掉线了。只需要send或者recv一下,如果结果为零,则为掉线。

但是,在长连接下,有可能很长一段时间都没有数据往来。理论上说,这个连接是一直保持连接的,但是实际情况中,如果中间节点出现什么故障是难以知道的。更要命的是,有的节点(防火墙)会自动把一定时间之内没有数据交互的连接给断掉。在这个时候,就需要我们的心跳包了,用于维持长连接,保活。在获知了断线之后,服务器逻辑可能需要做一些事情,比如断线后的数据清理呀,重新连接呀当然,这个自然是要由逻辑层根据需求去做了。总的来说,心跳包主要也就是用于长连接的保活和断线处理。一般的应用下,判定时间在30-40秒比较不错。如果实在要求高,那就在6-9秒。

二、Netty 心跳协议实现

使用TCP协议层的Keeplive机制,但是该机制默认的心跳时间是2小时,依赖操作系统实现不够灵活;

心跳机制一般来说都是在逻辑层发送空的包来实现的,比如Netty的IdleStateHandler类实现心跳机制。

心跳机制实现逻辑:每隔几分钟发送一个固定信息给服务端,服务端收到后回复一个固定信息给客户端,如果服务端几分钟内没有收到客户端信息则视客户端断开。

 

三、Netty 心跳协议实现类IdleStateHandler 详解

IdleStateHandler 功能描述:1、主要用来检测远端是否存活,如果不存活或活跃则对空闲Socket连接进行处理避免资源的浪费;

                                           2、IdleStateHandler实现对三种心跳的检测,分别是readerIdleTime、writerIdleTime和allIdleTime,参数说明如下:

                                                 1)readerIdleTime:读超时时间
                                                 2)writerIdleTime:写超时时间
                                                 3)allIdleTime:所有类型的超时时间

IdleStateHandler  在Netty 服务端和客户端的使用

Netty 服务端添加如下代码片段:

# 在服务端通道设置心跳处理类:IdleStateHandler 并设置服务端读取超时5秒 
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); 

Netty 客户端添加如下代码片段:

# 在客户端设置心跳处理类:IdleStateHandler,并设置客户端写入4秒超时 
pipeline.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));

四、Netty 心跳协议实战

服务端代码:

package com.netty.server.three; 
 
import java.util.concurrent.TimeUnit; 
 
import com.netty.server.three.handler.HeartBeatHandler; 
 
import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 
import io.netty.handler.logging.LogLevel; 
import io.netty.handler.logging.LoggingHandler; 
import io.netty.handler.timeout.IdleStateHandler; 
 
public class HeartBeatServer { 
	private int port = 8082; 
 
	private void run() { 
		// 首先,netty通过ServerBootstrap启动服务端 
		ServerBootstrap bootstrap = new ServerBootstrap(); 
		//第1步定义两个线程组,用来处理客户端通道的accept和读写事件 
        //parentGroup用来处理accept事件,childgroup用来处理通道的读写事件 
        //parentGroup获取客户端连接,连接接收到之后再将连接转发给childgroup去处理 
		EventLoopGroup boss = new NioEventLoopGroup(); 
		EventLoopGroup worker = new NioEventLoopGroup(); 
		try { 
			bootstrap.group(boss, worker) 
					.handler(new LoggingHandler(LogLevel.INFO)) 
					 //用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。 
			         //用来初始化服务端可连接队列 
			         //服务端处理客户端连接请求是按顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。 
					.option(ChannelOption.SO_BACKLOG, 128) 
					  //第2步绑定服务端通道 
					.channel(NioServerSocketChannel.class) 
					 //第3步绑定handler,处理读写事件,ChannelInitializer是给通道初始化 
					.childHandler(new ChannelInitializer<SocketChannel>() { 
 
						@Override 
						protected void initChannel(SocketChannel channel) throws Exception { 
							ChannelPipeline pipeline = channel.pipeline(); 
							pipeline.addLast("decoder", new StringDecoder()); 
							pipeline.addLast("encoder", new StringEncoder()); 
							pipeline.addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS)); 
							pipeline.addLast(new HeartBeatHandler()); 
 
						} 
 
					}); 
			 //第4步绑定8082端口 
			ChannelFuture future = bootstrap.bind(port).sync(); 
			//当通道关闭了,就继续往下走 
			future.channel().closeFuture().sync(); 
		} catch (Exception e) { 
			e.printStackTrace(); 
		} finally { 
			worker.shutdownGracefully(); 
			boss.shutdownGracefully(); 
		} 
 
	} 
 
	public static void main(String[] args) { 
		new HeartBeatServer().run(); 
	} 
} 
package com.netty.server.three.handler; 
 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.handler.timeout.IdleState; 
import io.netty.handler.timeout.IdleStateEvent; 
 
public class HeartBeatHandler extends ChannelInboundHandlerAdapter  { 
	 /** 空闲次数 */ 
    private int idle_count = 1; 
    /** 发送次数 */ 
    private int count = 1; 
 
    /** 
     * 超时处理,如果5秒没有收到客户端的心跳,就触发; 如果超过两次,则直接关闭; 
     */ 
    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { 
        if (obj instanceof IdleStateEvent) { 
            IdleStateEvent event = (IdleStateEvent) obj; 
            if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令 
                if (idle_count > 2) { 
                    System.out.println("超过两次无客户端请求,关闭该channel"); 
                    ctx.channel().close(); 
                } 
                 
                System.out.println("已等待5秒还没收到客户端发来的消息"); 
                idle_count++; 
            } 
        } else { 
            super.userEventTriggered(ctx, obj); 
        } 
    } 
 
    /** 
     * 业务逻辑处理 
     */ 
    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        System.out.println("第" + count + "次" + ",服务端收到的消息:" + msg); 
         
        String message = (String) msg; 
        // 如果是心跳命令,服务端收到命令后回复一个相同的命令给客户端 
        if ("hb_request".equals(message)) {  
            ctx.write("服务端成功收到心跳信息"); 
            ctx.flush(); 
        } 
         
        count++; 
    } 
 
    /** 
     * 异常处理 
     */ 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
        cause.printStackTrace(); 
        ctx.close(); 
    } 
 
} 

客户端代码:

package com.netty.client.three; 
 
import java.util.Random; 
import java.util.concurrent.TimeUnit; 
 
import com.netty.client.three.handler.HeartBeatClientHandler; 
 
import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 
import io.netty.handler.timeout.IdleStateHandler; 
 
public class HeartBeatClient { 
	public static void main(String[] args) { 
		int port = 8082; 
		 
		EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); 
		try { 
			// 首先,netty通过Bootstrap启动客户端 
			Bootstrap bootstrap = new Bootstrap(); 
			// 第1步 定义线程组,处理读写和链接事件,没有了accept事件 
			bootstrap.group(eventLoopGroup) 
			  		// 第2步 绑定客户端通道 
					.channel(NioSocketChannel.class) 
					 // 第3步 给NIoSocketChannel初始化handler, 处理读写事件 
					.handler(new ChannelInitializer<Channel>() { 
						@Override 
						protected void initChannel(Channel channel) throws Exception { 
							ChannelPipeline pipeline = channel.pipeline(); 
							pipeline.addLast("decoder", new StringDecoder()); 
							pipeline.addLast("encoder", new StringEncoder()); 
							pipeline.addLast(new IdleStateHandler(4, 4, 4, TimeUnit.SECONDS)); 
							pipeline.addLast(new HeartBeatClientHandler()); 
						} 
		 
					}); 
 
			 // 连接服务端 
			Channel channel = bootstrap.connect("localhost", port).sync().channel(); 
			String text = "Hello HeartBeatServer"; 
			while (channel.isActive()) { 
				int num = new Random().nextInt(10); 
				Thread.sleep(num * 1000); 
				channel.writeAndFlush(text); 
			} 
		} catch (Exception e) { 
			// do something 
		} finally { 
			eventLoopGroup.shutdownGracefully(); 
		} 
	} 
} 
package com.netty.client.three.handler; 
 
import java.text.SimpleDateFormat; 
import java.util.Date; 
 
import io.netty.buffer.ByteBuf; 
import io.netty.buffer.Unpooled; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.handler.timeout.IdleState; 
import io.netty.handler.timeout.IdleStateEvent; 
import io.netty.util.CharsetUtil; 
 
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter{ 
 
	/** 客户端请求的心跳命令 */ 
    private static final ByteBuf HEARTBEAT_SEQUENCE =  
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("hb_request", CharsetUtil.UTF_8)); 
 
    /** 空闲次数 */ 
    private int idle_count = 1; 
 
    /** 发送次数 */ 
    private int count = 1; 
 
    /** 循环次数 */ 
    private int fcount = 1; 
 
    /** 
     * 建立连接时 
     */ 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        System.out.println("建立连接时:" + date()); 
        ctx.fireChannelActive(); 
    } 
 
    /** 
     * 关闭连接时 
     */ 
    @Override 
    public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
        System.out.println("关闭连接时:" + date()); 
    } 
 
    /** 
     * 心跳请求处理,每4秒发送一次心跳请求; 
     *  
     */ 
    @Override 
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { 
        System.out.println("\r\n循环请求的时间:" + date() + ",次数" + fcount); 
         
        if (obj instanceof IdleStateEvent) { 
            IdleStateEvent event = (IdleStateEvent) obj; 
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态就发送心跳命令 
                // 设置发送次数,允许发送3次心跳包 
                if (idle_count <= 3) {  
                    idle_count++; 
                    ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()); 
                } else { 
                    System.out.println("心跳包发送结束,不再发送心跳请求!!!"); 
                } 
            } 
        } 
         
        fcount++; 
    } 
 
    /** 
     * 业务逻辑处理 
     */ 
    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        System.out.println("第" + count + "次" + ",客户端收到的消息:" + msg); 
        count++; 
    } 
     
    private String date(){ 
         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
         return sdf.format(new Date()); 
    } 
 
} 

重点注意:ChannelInboundHandlerAdapter类中的userEventTriggered方法是Netty 处理心跳超时事件,在IdleStateHandler设置超时时间,如果达到了,就会直接调用该方法。如果没有超时则不调用。我们重写该方法的话,就可以自行进行相关的业务逻辑处理了。

效果截图:

服务端

客户端

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
排行榜
关注我们

一个IT知识分享的公众号