`
hbxflihua
  • 浏览: 658497 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于Google Protobuf的Netty编解码技术

阅读更多

Google的Protobuf在业界非常流行,很多商业项目都选择Protobuf作为编解码框架,以下为Protobuf的一些优点:

    (1)在谷歌内长期使用,产品成熟度高。

    (2)跨语言,支持包括C++、Java、Python在内的多重语言。

    (3)编码后的码流小,便于存储和传输。

    (4)编解码性能高。

    (5)支持不同协议向前兼容。

    (6)支持定义可选和必选字段。

 

一、Protobuf开发环境搭建

    1、下载Protobuf的Windows版,网址如下:https://developers.google.com/protocol-buffers/docs/downloads?hl=zh-cn,本示例基于protoc-2.6.1-win32.zip

    2、下载Protobuf Java语言所需的jar包,网址如下:http://repo2.maven.org/maven2/com/google/protobuf/protobuf-java/2.6.1/,本示例基于protobuf-java-2.6.1.jar

    3、新建请求响应所需的proto文件

    SubscribeReq.proto

package netty;
option java_package = "com.serial.java.protobuf";
option java_outer_classname = "SubscribeReqProto";

message SubscribeReq{
	required int32 subReqID = 1;
	required string userName = 2;
	required string productName = 3;
	repeated string address = 4;
}

    SubscribeRespProto.proto

package netty;
option java_package = "com.serial.java.protobuf";
option java_outer_classname = "SubscribeRespProto";
message SubscribeResp{
	required int32 subReqID = 1;
	required string respCode = 2;
	required string desc = 3;
}

    4、通过Protoc.exe生成所需的Java编解码POJO文件,命令行如下。

C:\Users\Administrator>d:
D:\>cd "Program Files\protoc-2.6.1-win32"
D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
eReq.proto
D:\Program Files\protoc-2.6.1-win32>protoc.exe --java_out=.\src .\netty\Subscrib
eResp.proto
D:\Program Files\protoc-2.6.1-win32>

    5、将生成的Java POJO文件拷贝到项目中,注意Protobuf所需的jar包也需包含在项目中,不然会报错。

    6、创建测试类,测试Protobuf的编解码功能。

    TestSubscribeReq.java

package com.serial.java.test;

import java.util.ArrayList;
import java.util.List;

import com.google.protobuf.InvalidProtocolBufferException;
import com.serial.java.protobuf.SubscribeReqProto;

public class TestSubscribeReq {

	private static byte [] encode(SubscribeReqProto.SubscribeReq req){
		return req.toByteArray();
	}
	
	private static SubscribeReqProto.SubscribeReq decode(byte [] body) 
			throws InvalidProtocolBufferException{
		return SubscribeReqProto.SubscribeReq.parseFrom(body);
	}
	
	private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
		SubscribeReqProto.SubscribeReq.Builder builder = SubscribeReqProto.SubscribeReq.newBuilder();
		builder.setSubReqID(1);
		builder.setUserName("leeka");
		builder.setProductName("Netty book");
		
		List<String> address = new ArrayList<String>();
		address.add("Nanjing");
		address.add("Beijing");
		address.add("Hangzhou");
		builder.addAllAddress(address);
		return builder.build();
	}
	
	
	public static void main(String[] args)throws Exception {		
		SubscribeReqProto.SubscribeReq req = createSubscribeReq();
		System.out.println("before encode:"+ req.toString());		
		SubscribeReqProto.SubscribeReq req2 = decode(encode(req));		
		System.out.println("after encode:"+ req2.toString());		
		System.out.println("Assert equal: " + req2.equals(req));
		
	}
	
}

    7、运行测试类,查看测试结果,控制台输出如下信息:

before encode:subReqID: 1
userName: "leeka"
productName: "Netty book"
address: "Nanjing"
address: "Beijing"
address: "Hangzhou"

after encode:subReqID: 1
userName: "leeka"
productName: "Netty book"
address: "Nanjing"
address: "Beijing"
address: "Hangzhou"

Assert equal: true

 

 二、Netty的Protobuf服务端和客户端开发

     服务端入口

package com.serial.java;

import com.serial.java.protobuf.SubscribeReqProto;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;


public class SubReqServer {

	public void bind(int port)throws Exception{
		
		//配置服务端NIO线程组
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try{
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.option(ChannelOption.SO_BACKLOG, 1024)
				.handler(new LoggingHandler(LogLevel.INFO))
				.childHandler(new ChannelInitializer<SocketChannel>() {

					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(new ProtobufVarint32FrameDecoder())						
						.addLast(new ProtobufDecoder(
								SubscribeReqProto.SubscribeReq.getDefaultInstance()))						
						.addLast(new ProtobufVarint32LengthFieldPrepender())						
						.addLast(new ProtobufEncoder())						
						.addLast(new SubReqServerHandler());						
					}
					
				});
			//绑定端口,同步等待成功
			ChannelFuture f = b.bind(port).sync();
			//等待服务端监听端口关闭
			f.channel().closeFuture().sync();
			
		}finally{
			//退出时释放资源
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}		
	}
	
	public static void main(String[] args) throws Exception{
		int port = 8085;
		if(args!=null && args.length > 0){
			port = Integer.valueOf(args[0]);
		}
		new SubReqServer().bind(port);		
	}
}

 

    服务端处理类

package com.serial.java;

import com.serial.java.protobuf.SubscribeReqProto;
import com.serial.java.protobuf.SubscribeRespProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqServerHandler extends ChannelHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq)msg;
		//System.out.println("SubReqServerHandler channelRead:"+ req.getUserName());
		if("leeka".equalsIgnoreCase(req.getUserName())){
			System.out.println("service accept client subscribe req:["+ req +"]");
			ctx.writeAndFlush(resp(req.getSubReqID()));		
		}
	}
	
	private SubscribeRespProto.SubscribeResp resp(int subReqID){
		SubscribeRespProto.SubscribeResp.Builder builder = SubscribeRespProto.SubscribeResp.newBuilder();
		builder.setSubReqID(subReqID);
		builder.setRespCode("0");
		builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
		return builder.build();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
	
}

    

    客户端入口

package com.serial.java;

import com.serial.java.protobuf.SubscribeRespProto;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class SubReqClient {
	
	public void connect(int port,String host)throws Exception{
		
		//配置客户端NIO线程组
		EventLoopGroup group = new NioEventLoopGroup();
		
		try{
			Bootstrap b = new Bootstrap();
			b.group(group).channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ch.pipeline()
						.addLast(new ProtobufVarint32FrameDecoder())						
						.addLast(new ProtobufDecoder(
								SubscribeRespProto.SubscribeResp.getDefaultInstance()))						
						.addLast(new ProtobufVarint32LengthFieldPrepender())						
						.addLast(new ProtobufEncoder())						
						.addLast(new SubReqClientHandler());
					};
				});
			
			//发起异步连接操作
			ChannelFuture f = b.connect(host,port).sync();
			//等待客户端链路关闭
			f.channel().closeFuture().sync();
		}finally{
			//退出,释放资源
			group.shutdownGracefully();
		}
		
	}
	
	public static void main(String[] args)throws Exception {
		int port = 8085;
		if(args!=null && args.length > 0){
			port = Integer.valueOf(args[0]);
		}
		new SubReqClient().connect(port, "127.0.0.1");		
	}
}

 

    客户端处理类

package com.serial.java;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

import com.serial.java.protobuf.SubscribeReqProto;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter {

	private static final Logger logger = Logger.getLogger(SubReqClientHandler.class.getName());
	
	public SubReqClientHandler() {	
		
	}
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		for (int i = 0; i < 10; i++) {
			ctx.write(req(i));
		}
		ctx.flush();
	}
	
	private SubscribeReqProto.SubscribeReq req(int i){
		SubscribeReqProto.SubscribeReq.Builder r = SubscribeReqProto.SubscribeReq.newBuilder();
		r.setSubReqID(i);
		r.setProductName("Netty Book"+i);
		r.setUserName("leeka");
		
		List<String> address = new ArrayList<String>();
		address.add("Nanjing");
		address.add("Beijing");
		r.addAllAddress(address);		
		return r.build();
	}
	
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		//super.channelReadComplete(ctx);
		ctx.flush();
	}
	
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg)
			throws Exception {
		System.out.println("receive server response:["+msg+"]");
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		logger.warning("unexpected exception from downstream:"+ cause.getMessage());
		ctx.close();
	}
	
}

 

OVER

分享到:
评论
1 楼 quanwsx 2016-09-19  
   

相关推荐

Global site tag (gtag.js) - Google Analytics