Netty 权威指南之Google protobuf 编解码

无情 阅读:566 2021-03-31 22:28:31 评论:0

本章相关知识点:

Google 的protobuf 在业界非常流行,很多的商业项目选址使用protobuf作为编解码框架,这里一起回顾一下protobuf 框架的优点:

1、结构化数据存储(类似XML和JSON)

2、高效编码性能

3、语言无法、平台无法、拓展性好

4、官方支持Java、C++和Python


本章学习目标:

1、Protobuf 开发入门

2、开发支持Protobuf 的Netty 服务端

3、开发支持Protobuf 的Netty 客户端

4、运行基于Netty 开发的Protobuf 应用


第一节:Protobuf 开发入门

Protobuf 入门

protobuf 是一个灵活、高效、结构化的数据序列化框架。相比XML等传统序列化工具,它更小、更快、更简单.Protobuf支持数据结构化一次编译可以到处使用,甚至是跨语言使用,通过代码生成工具可以自动生成不同语言版本的源代码,甚至可以在不同的版本数据结构进程中进行数据传递,实现数据结构的前向兼容。

Protobuf 开发环境搭建

1、首先先下载protobuf 的windows 版本


2、protoc.exe 主要根据.proto文件生成代码,我们以图书订购流程为例,定义SubscribeReq.proto和SubscribeResp.proto,数据定义格式如下:

SubscribeResp.proto

option java_package="com.nio.serlizable"; 
option java_outer_classname="SubscribeRespProto"; 
 
message SubscribeResp{ 
		required int32 subReqId = 1; 
		required int32 respCode = 2; 
		required string desc = 3;		 
}
SubscribeReq.proto

option java_package="com.nio.serlizable"; 
option java_outer_classname="SubscribeReqProto"; 
 
message SubscribeReq{ 
		required int32 subReqId = 1; 
		required string userName = 2; 
		required string productName = 3; 
		repeated string address = 4; 
}


3、通过protoc.exe命令生成java代码,命令行如图所示:


4、将生成的的POJO代码SubscribeReqProto.java和SubscribeRespProto.java复制到对应的IDEA14 项目当中。

5、到此,我们已经完成对google protobuf 开发环境搭建工作,我们接下来通过一个简单的Demo 来了解Protobuf 类库使用。

package com.nio.protobuf; 
 
import java.util.ArrayList; 
import java.util.List; 
 
/** 
 * Created by vixuan-008 on 2015/6/24. 
 */ 
public class TestSubscribeReq { 
    public static void main(String[] args)throws Exception{ 
        SubscribeReqProto.SubscribeReq req=createSubscribeReq(); 
        System.out.println("Before encode:"+req.toString()); 
 
        SubscribeReqProto.SubscribeReq result=decode(encode(req)); 
        System.out.println("decode cotent is:"+result.toString()); 
    } 
    private static byte[] encode(SubscribeReqProto.SubscribeReq req){ 
            return req.toByteArray(); 
    } 
    private static  SubscribeReqProto.SubscribeReq decode(byte[] body) throws Exception{ 
        return SubscribeReqProto.SubscribeReq.parseFrom(body); 
    } 
 
    private static SubscribeReqProto.SubscribeReq createSubscribeReq(){ 
        SubscribeReqProto.SubscribeReq.Builder builder= SubscribeReqProto.SubscribeReq.newBuilder(); 
        builder.setSubReqId(1); 
        builder.setUserName("zhouzhigang"); 
        builder.setProductName("Netty Book"); 
        List<String> address=new ArrayList<String>(); 
        address.add("湖南长沙"); 
        address.add("湖南株洲"); 
        address.add("湖南湘潭"); 
        builder.addAllAddress(address); 
        return builder.build(); 
 
    } 
} 

首先我们看如何创建 SubscribeReqProto.SubscribeReq的实例,通过SubscribeReqProto.SubscribeReq的静态方法newBuilder创建SubscribeReqProto.SubscribeReq的Builder实例,通过Builder构造器对SubscribeReq的属性进行相关设置,对于集合类型,通过addAllXXX()方法可以将集合对象添加到对象属性当中。

编码通过调用SubscribeReqProto.SubscribeReq实例的toByteArray方法,即可将SubscribeReq对象编码为byte数组,使用非常方便。

解码通过调用SubscribeReqProto.SubscribeReq的静态方法parseFrom将二进制数组解码为原始数据对象。


6、Protobuf 测试程序效果截图:



第二节:开发支持Protobuf 的Netty 服务端

SubRespProServer.java源代码(Handler 存在问题可能需要修改)

package com.nio.server; 
 
import com.nio.handler.SubRespProHandler; 
import com.nio.protobuf.SubscribeReqProto; 
import io.netty.bootstrap.ServerBootstrap; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioServerSocketChannel; 
import io.netty.handler.codec.protobuf.ProtobufDecoder; 
import io.netty.handler.codec.protobuf.ProtobufEncoder; 
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; 
import io.netty.handler.logging.LogLevel; 
import io.netty.handler.logging.LoggingHandler; 
 
/** 
 * Created by vixuan-008 on 2015/6/24. 
 */ 
public class SubRespProServer { 
    public static void main(String[] args)throws  Exception{ 
        int port=15444; 
        new SubRespProServer().bind(port); 
 
    } 
 
    public void bind(int port)throws Exception{ 
        //配置服务端的NIO线程池 
        EventLoopGroup bossGroup=new NioEventLoopGroup(); 
        EventLoopGroup workGroup=new NioEventLoopGroup(); 
        try{ 
            ServerBootstrap b=new ServerBootstrap(); 
            b.group(bossGroup, workGroup); 
            b.channel(NioServerSocketChannel.class); 
            b.option(ChannelOption.SO_BACKLOG, 100); 
            b.handler(new LoggingHandler(LogLevel.INFO)); 
            b.childHandler(new ChannelInitializer<SocketChannel>() { 
                @Override 
                protected void initChannel(SocketChannel socketChannel) throws Exception { 
                   socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder()); 
                    socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeReqProto.SubscribeReq.getDefaultInstance())); 
                    socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); 
                    socketChannel.pipeline().addLast(new ProtobufEncoder()); 
                    socketChannel.pipeline().addLast(new SubRespProHandler()); 
 
                } 
            }); 
            //绑定端口,等待同步成功 
            ChannelFuture f=b.bind(port).sync(); 
            //等待服务端关闭监听端口 
            f.channel().closeFuture().sync(); 
 
        }finally { 
            //释放线程池资源 
            bossGroup.shutdownGracefully(); 
            workGroup.shutdownGracefully(); 
 
        } 
 
 
    } 
} 
package com.nio.handler; 
 
import com.nio.protobuf.SubscribeReqProto; 
import com.nio.protobuf.SubscribeRespProto; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelHandlerContext; 
 
/** 
 * Created by vixuan-008 on 2015/6/24. 
 */ 
public class SubRespProHandler extends ChannelHandlerAdapter { 
 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
       ctx.close(); 
    } 
 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        super.channelActive(ctx); 
    } 
 
    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
        SubscribeReqProto.SubscribeReq req=(SubscribeReqProto.SubscribeReq)msg; 
        System.out.println("server receiver client message is:"+req.toString()); 
        ctx.writeAndFlush(resp(req.getSubReqId())); 
    } 
 
    private SubscribeRespProto.SubscribeResp resp(int subReqId)throws Exception{ 
        SubscribeRespProto.SubscribeResp.Builder resp= SubscribeRespProto.SubscribeResp.newBuilder(); 
        resp.setSubReqId(subReqId); 
        resp.setRespCode(0); 
        resp.setDesc("Netty Book order succeed 3 day later,sent to the designated adderss"); 
        return resp.build(); 
    } 
 
    @Override 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
        super.channelReadComplete(ctx); 
    } 
} 

第三节: 开发支持Protobuf 的Netty 客户端

package com.nio.client; 
 
import com.nio.handler.SubReqProHandler; 
import com.nio.handler.SubReqServerHandler; 
import com.nio.protobuf.SubscribeRespProto; 
import io.netty.channel.ChannelFuture; 
import io.netty.channel.ChannelInitializer; 
import io.netty.channel.ChannelOption; 
import io.netty.channel.EventLoopGroup; 
import io.netty.channel.nio.NioEventLoopGroup; 
import io.netty.channel.socket.SocketChannel; 
import io.netty.channel.socket.nio.NioSocketChannel; 
import io.netty.handler.codec.protobuf.ProtobufDecoder; 
import io.netty.handler.codec.protobuf.ProtobufEncoder; 
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; 
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; 
import io.netty.handler.codec.serialization.ClassResolvers; 
import io.netty.handler.codec.serialization.ObjectDecoder; 
import io.netty.handler.codec.serialization.ObjectEncoder; 
 
/** 
 * Created by vixuan-008 on 2015/6/24. 
 */ 
public class SubReqProClient { 
    public static void main(String[] args)throws Exception{ 
        int port=15444; 
        new SubReqProClient().bind(port, "127.0.0.1"); 
    } 
 
    public void bind(int port,String host)throws Exception{ 
        //配置客户端NIO线程池 
        EventLoopGroup workGroup=new NioEventLoopGroup(); 
        try{ 
            io.netty.bootstrap.Bootstrap b=new io.netty.bootstrap.Bootstrap(); 
            b.group(workGroup); 
            b.channel(NioSocketChannel.class); 
            b.option(ChannelOption.TCP_NODELAY,true); 
            b.handler(new ChannelInitializer<SocketChannel>() { 
                @Override 
                protected void initChannel(SocketChannel socketChannel) throws Exception { 
                    socketChannel.pipeline().addLast(new ProtobufVarint32FrameDecoder()); 
                    socketChannel.pipeline().addLast(new ProtobufDecoder(SubscribeRespProto.SubscribeResp.getDefaultInstance())); 
                    socketChannel.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); 
                    socketChannel.pipeline().addLast(new ProtobufEncoder()); 
                    socketChannel.pipeline().addLast(new SubReqProHandler()); 
                } 
            }); 
            //发起异步连接操作 
            ChannelFuture f=b.connect(host,port).sync(); 
            //等待客户端链路关闭 
            f.channel().closeFuture().sync(); 
 
        }finally { 
            //释放NIO 线程组 
            workGroup.shutdownGracefully(); 
 
        } 
 
 
    } 
} 
package com.nio.handler; 
 
import com.nio.protobuf.SubscribeReqProto; 
import com.nio.protobuf.SubscribeRespProto; 
import io.netty.channel.ChannelHandlerAdapter; 
import io.netty.channel.ChannelHandlerContext; 
 
import java.util.ArrayList; 
import java.util.List; 
 
/** 
 * Created by vixuan-008 on 2015/6/24. 
 */ 
public class SubReqProHandler extends ChannelHandlerAdapter { 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
      ctx.close(); 
    } 
 
    @Override 
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        for(int i=0;i<10;i++){ 
            ctx.write(subReq(i)); 
        } 
        ctx.flush(); 
    } 
 
    private SubscribeReqProto.SubscribeReq subReq(int i){ 
        SubscribeReqProto.SubscribeReq.Builder req=SubscribeReqProto.SubscribeReq.newBuilder(); 
       req.setProductName("Netty Book"); 
        req.setUserName("zhouzhigang"); 
        req.setSubReqId(i); 
      List<String> address=new ArrayList<String>(); 
        address.add("china"); 
        address.add("usa"); 
        address.add("france"); 
        req.addAllAddress(address); 
        return req.build(); 
 
 
 
 
    } 
 
    @Override 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
       System.out.println("Receiver server message is:"+msg); 
    } 
 
    @Override 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
        ctx.flush(); 
    } 
} 


第四节: 运行基于Netty 开发的Protobuf 应用

服务端截图:



Protobuf 使用注意事项:

1、ProtobufDecoder仅仅负责解码,它不支持读半包。因此,在ProtobufDecode前面,一定要有能够处理读半包的解码器,有三种方式可以选择.

使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息:

集成Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;

继承ByteMessageDecoder类,自己处理半包消息。

如果你只使用ProtobufDecoder解码器而忽视对半包消息处理,程序是不能正常运行的。今天的学习就到这里

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号