Netty心跳协议添加重连机制

符号 阅读:669 2021-03-31 12:53:23 评论:0

Netty Client有两种情况下需要重连:

  1. Netty Client启动的时候需要重连

  2. Netty Client在程序运行中连接断掉需要重连。

针对 Netty Client 启动时需要重连解决办法: Netty Client 的ChannelFuture实现ChannelFutureListener 用来启动时监测是否连接成功,不成功的话重试

Netty Client 源码:

package com.netty.client.four; 
 
import java.util.concurrent.TimeUnit; 
import com.netty.client.four.handler.CommonHandler; 
import com.netty.client.four.handler.ReconnectionHeartBeatHandler; 
import io.netty.bootstrap.Bootstrap; 
import io.netty.channel.Channel; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelFutureListener; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 
import io.netty.handler.timeout.IdleStateHandler; 
 
public class ReconnectionHeartBeatClient { 
	int port = 8083; 
	public void run() { 
		EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); 
		try { 
			// 首先,netty通过Bootstrap启动客户端 
			Bootstrap bootstrap = new Bootstrap(); 
			// 第1步 定义线程组,处理读写和链接事件,没有了accept事件 
			bootstrap.group(eventLoopGroup) 
			  		// 第2步 绑定客户端通道 
					.channel(NioSocketChannel.class) 
					.option(ChannelOption.SO_KEEPALIVE, true) 
		            .option(ChannelOption.TCP_NODELAY, true) 
					 // 第3步 给NIoSocketChannel初始化handler, 处理读写事件 
					.handler(new ChannelInitializer<Channel>() { 
						@Override 
						protected void initChannel(Channel channel) throws Exception { 
							ChannelPipeline pipeline = channel.pipeline(); 
							pipeline.addLast("decoder", new StringDecoder()); 
							pipeline.addLast("encoder", new StringEncoder()); 
							// 心跳协议发送 
							pipeline.addLast(new IdleStateHandler(0, 6, 0, TimeUnit.SECONDS)); 
							// 心跳协议自定义处理逻辑 
							pipeline.addLast(new ReconnectionHeartBeatHandler()); 
							// 普通信息处理 
							pipeline.addLast(new CommonHandler()); 
						} 
		 
					}); 
 
			 // 连接服务端 
			ChannelFuture channelFuture = bootstrap.connect("localhost", port).sync(); 
			 //客户端断线重连逻辑 
			channelFuture.addListener((ChannelFutureListener) future -> { 
	            if (future.isSuccess()) { 
	                System.out.println("连接Netty服务端成功"); 
	            } else { 
	                System.out.println("连接失败,进行断线重连"); 
	                future.channel().eventLoop().schedule(() -> run(), 20, TimeUnit.SECONDS); 
	            } 
	        }); 
			// 客户端连接服务端通道,可以进行消息发送 
			channelFuture.channel().writeAndFlush("Hello Server, I'm online"); 
			channelFuture.channel().closeFuture().sync(); 
		} catch (Exception e) { 
			// do something 
		} finally { 
			eventLoopGroup.shutdownGracefully(); 
		} 
		 
	} 
	public static void main(String[] args) { 
		new ReconnectionHeartBeatClient().run(); 
	} 
} 

针对 Netty Client 在程序运行中连接断掉需要重连解决办法: 在Netty Client 的 ChannelHandler类中重写channelInactive方法,监测连接是否断掉,断掉的话也要重连。

package com.netty.client.four.handler; 
 
import java.util.concurrent.TimeUnit; 
import com.netty.client.four.ReconnectionHeartBeatClient; 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.channel.EventLoop; 
import io.netty.handler.timeout.IdleState; 
import io.netty.handler.timeout.IdleStateEvent; 
 
public class ReconnectionHeartBeatHandler extends ChannelInboundHandlerAdapter { 
 
 
	/** 
	 * 如果4s没有收到写请求,则向服务端发送心跳请求 
	 */ 
	@Override 
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
		if (evt instanceof IdleStateEvent) { 
			IdleStateEvent idleStateEvent = (IdleStateEvent) evt; 
			// 客户端写入空闲,向服务器发送心跳包 
			if (idleStateEvent.state() == IdleState.WRITER_IDLE) { 
				// 向服务端送心跳包 
				String message = "heartbeat"; 
				// 发送心跳消息,并在发送失败时关闭该连接 
				ctx.writeAndFlush(message); 
			} 
		}  
		super.userEventTriggered(ctx, evt); 
		 
	} 
 
	@Override 
	public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
		System.out.println("Client is close"); 
		// 如果运行过程中服务端挂了,执行重连机制 
		EventLoop eventLoop = ctx.channel().eventLoop(); 
		eventLoop.schedule(() -> new ReconnectionHeartBeatClient().run(), 10L, TimeUnit.SECONDS); 
		super.channelInactive(ctx); 
	} 
 
} 

Netty Client 客户端涉及其他代码片段:

package com.netty.client.four.handler; 
 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
 
public class CommonHandler extends SimpleChannelInboundHandler<String> { 
	  /** 
     * 如果服务端发生消息给客户端,下面方法进行接收消息 
     * 
     * @param ctx 
     * @param msg 
     * @throws Exception 
     */ 
    @Override 
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { 
    	 if("heartbeat".equals(msg)) { 
             System.out.println(ctx.channel().remoteAddress() + "===>client: " + msg); 
         } else { 
        	 System.out.println(ctx.channel().remoteAddress() + "===>message: " + msg); 
         } 
    } 
 
    /** 
     * 处理异常, 一般将实现异常处理逻辑的Handler放在ChannelPipeline的最后 
     * 这样确保所有入站消息都总是被处理,无论它们发生在什么位置,下面只是简单的关闭Channel并打印异常信息 
     * 
     * @param ctx 
     * @param cause 
     * @throws Exception 
     */ 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
        cause.printStackTrace(); 
        ctx.close(); 
    } 
} 

Netty Server源码:

package com.netty.server.four; 
 
import java.util.concurrent.TimeUnit; 
 
import com.netty.server.four.handler.ReConnectionHeartBeatHandler; 
import com.netty.server.four.handler.ServerIdleStateTrigger; 
import com.netty.server.three.handler.HeartBeatHandler; 
 
import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.ChannelPipeline; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.string.StringDecoder; 
import io.netty.handler.codec.string.StringEncoder; 
import io.netty.handler.logging.LogLevel; 
import io.netty.handler.logging.LoggingHandler; 
import io.netty.handler.timeout.IdleStateHandler; 
 
public class ReConnectionHeartBeatServer { 
	private int port = 8083; 
	 /** 
     * 设置空闲检测时间为 30s 
     */ 
    private static final int READER_IDLE_TIME = 5; 
     
	private void run() { 
		// 首先,netty通过ServerBootstrap启动服务端 
				ServerBootstrap bootstrap = new ServerBootstrap(); 
				//第1步定义两个线程组,用来处理客户端通道的accept和读写事件 
		        //parentGroup用来处理accept事件,childgroup用来处理通道的读写事件 
		        //parentGroup获取客户端连接,连接接收到之后再将连接转发给childgroup去处理 
				EventLoopGroup boss = new NioEventLoopGroup(); 
				EventLoopGroup worker = new NioEventLoopGroup(); 
				try { 
					bootstrap.group(boss, worker) 
							.handler(new LoggingHandler(LogLevel.INFO)) 
							 //用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。 
					         //用来初始化服务端可连接队列 
					         //服务端处理客户端连接请求是按顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。 
							.option(ChannelOption.SO_BACKLOG, 128) 
							//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 
			                .childOption(ChannelOption.SO_KEEPALIVE, true) 
			                //将小的数据包包装成更大的帧进行传送,提高网络的负载 
			                .childOption(ChannelOption.TCP_NODELAY, true) 
							  //第2步绑定服务端通道 
							.channel(NioServerSocketChannel.class) 
							 //第3步绑定handler,处理读写事件,ChannelInitializer是给通道初始化 
							.childHandler(new ChannelInitializer<SocketChannel>() { 
 
								@Override 
								protected void initChannel(SocketChannel channel) throws Exception { 
									ChannelPipeline pipeline = channel.pipeline(); 
									pipeline.addLast("decoder", new StringDecoder()); 
									pipeline.addLast("encoder", new StringEncoder()); 
									pipeline.addLast(new IdleStateHandler(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS)); 
									pipeline.addLast(new ServerIdleStateTrigger()); 
									pipeline.addLast(new ReConnectionHeartBeatHandler()); 
 
								} 
 
							}); 
					 //第4步绑定8083端口 
					ChannelFuture future = bootstrap.bind(port).sync(); 
					//当通道关闭了,就继续往下走 
					future.channel().closeFuture().sync(); 
				} catch (Exception e) { 
					e.printStackTrace(); 
				} finally { 
					worker.shutdownGracefully(); 
					boss.shutdownGracefully(); 
				} 
 
	} 
	public static void main(String[] args) { 
		new ReConnectionHeartBeatServer().run(); 
	} 
} 
package com.netty.server.four.handler; 
 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.SimpleChannelInboundHandler; 
 
public class ReConnectionHeartBeatHandler  extends SimpleChannelInboundHandler<String> { 
 
	@Override 
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { 
		/** 
		 * msg 模拟:如何模拟心跳协议和消息协议,在msg 内容中包含逗号标识心跳协议,不包含内容标识普通消息 
		 */ 
		if(msg.equalsIgnoreCase("heartbeat")){ 
			System.out.println("server 接收到心跳协议标识" + msg); 
			 //服务端响应客户端心跳协议,返回"server ping" 标识服务器与客户端正常联通 
            ctx.writeAndFlush("heartbeat"); 
		} else { 
			System.out.print("server 接收信息:" + msg); 
		} 
	} 
 
	@Override 
	public void channelActive(ChannelHandlerContext ctx) throws Exception { 
		// TODO Auto-generated method stub 
		System.out.println("Established connection with the remote client."); 
	} 
 
	@Override 
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
		// TODO Auto-generated method stub 
		 cause.printStackTrace(); 
	     ctx.close(); 
	} 
	 
	 
 
} 
package com.netty.server.four.handler; 
 
import io.netty.channel.ChannelHandlerContext; 
import io.netty.channel.ChannelInboundHandlerAdapter; 
import io.netty.handler.timeout.IdleState; 
import io.netty.handler.timeout.IdleStateEvent; 
 
public class ServerIdleStateTrigger extends ChannelInboundHandlerAdapter  { 
 
	/** 
	 * 如果5s没有读请求,则向客户端发送心跳 
	 */ 
	@Override 
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { 
		// TODO Auto-generated method stub 
		if (evt instanceof IdleStateEvent) { 
            IdleState state = ((IdleStateEvent) evt).state(); 
            if (state == IdleState.READER_IDLE) { 
            	ctx.writeAndFlush("heartbeat"); 
            } 
        }  
        super.userEventTriggered(ctx, evt); 
	} 
	 
} 

 

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号