基本说明
netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等
ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的
ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的
编码解码器
当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。
解码器-ByteToMessageDecoder 1. 关系继承图
1. 由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理.
1. 一个关于ByteToMessageDecoder实例分析
1 2 3 4 5 6 7 8 9 10 11 12 public class ToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in , List <Object > out) throws Exception { if (in .readableBytes () >= 4 ) { out.add (in .readInt ()); } } }
说明:
这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是 否具有足够的数据
案例 客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .bootstrap .Bootstrap ;import io.netty .bootstrap .ServerBootstrap ;import io.netty .channel .ChannelFuture ;import io.netty .channel .EventLoopGroup ;import io.netty .channel .nio .NioEventLoopGroup ;import io.netty .channel .socket .nio .NioServerSocketChannel ;import io.netty .channel .socket .nio .NioSocketChannel ;public class MyClient { public static void main (String [] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup (); try { Bootstrap bootstrap = new Bootstrap (); bootstrap.group (group).channel (NioSocketChannel .class ) .handler (new MyClientInitializer ()); ChannelFuture channelFuture = bootstrap.connect ("localhost" , 7000 ).sync (); channelFuture.channel ().closeFuture ().sync (); }finally { group.shutdownGracefully (); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .channel .ChannelHandlerContext ;import io.netty .channel .SimpleChannelInboundHandler ;public class MyClientHandler extends SimpleChannelInboundHandler <Long > { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { System .out .println ("MyClientHandler 发送数据" ); ctx.writeAndFlush (123456L); } @Override protected void channelRead0 (ChannelHandlerContext ctx, Long msg) throws Exception { System .out .println (msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .channel .ChannelInitializer ;import io.netty .channel .ChannelPipeline ;import io.netty .channel .socket .SocketChannel ;public class MyClientInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline (); pipeline.addLast (new MyLongToByteEncoder ()); pipeline.addLast (new MyByteToLongDecoder ()); pipeline.addLast (new MyClientHandler ()); } }
服务端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .bootstrap .ServerBootstrap ;import io.netty .channel .ChannelFuture ;import io.netty .channel .EventLoopGroup ;import io.netty .channel .nio .NioEventLoopGroup ;import io.netty .channel .socket .nio .NioServerSocketChannel ;public class Myserver { public static void main (String [] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup (1 ); EventLoopGroup workerGroup = new NioEventLoopGroup (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap.group (bossGroup,workerGroup).channel (NioServerSocketChannel .class ) .childHandler (new MyServerInitializer ()); ChannelFuture channelFuture = serverBootstrap.bind (7000 ).sync (); channelFuture.channel ().closeFuture ().sync (); }finally { bossGroup.shutdownGracefully (); workerGroup.shutdownGracefully (); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .channel .ChannelHandlerContext ;import io.netty .channel .SimpleChannelInboundHandler ;public class MyServerHandler extends SimpleChannelInboundHandler <Long > { @Override protected void channelRead0 (ChannelHandlerContext ctx, Long msg) throws Exception { System .out .println ("读取到从客户端" +msg); ctx.writeAndFlush (98765l); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace (); ctx.close (); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .channel .ChannelInitializer ;import io.netty .channel .ChannelPipeline ;import io.netty .channel .socket .SocketChannel ;public class MyServerInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline (); pipeline.addLast (new MyByteToLongDecoder ()); pipeline.addLast (new MyLongToByteEncoder ()); pipeline.addLast (new MyServerHandler ()); } }
编解码器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .buffer .ByteBuf ;import io.netty .channel .ChannelHandlerContext ;import io.netty .handler .codec .ByteToMessageDecoder ;import io.netty .handler .codec .MessageToByteEncoder ;import java.util .List ;public class MyLongToByteEncoder extends MessageToByteEncoder <Long > { @Override protected void encode (ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception { System .out .println ("encode 被调用" ); System .out .println ("msg=" +msg); out.writeLong (msg); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .buffer .ByteBuf ;import io.netty .channel .ChannelHandlerContext ;import io.netty .handler .codec .ByteToMessageDecoder ;import java.util .List ;public class MyByteToLongDecoder extends ByteToMessageDecoder { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in , List <Object > out) throws Exception { System .out .println ("decoder被调用" ); if (in .readableBytes ()>=8 ){ out.add (in .readLong ()); } } }
总结 不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果和期望结果可能不一致
其他解码器 解码器-ReplayingDecoder
public abstract class ReplayingDecoder extends ByteToMessageDecoder
ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数S指定了用户状态管理的类型,其中Void代表不需要状态管理
应用实例:使用ReplayingDecoder 编写解码器,对前面的案例进行简化 [案例演示]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.jhj .netty .inboundhandlerandoutboundhandler ; import io.netty .buffer .ByteBuf ;import io.netty .channel .ChannelHandlerContext ;import io.netty .handler .codec .ByteToMessageDecoder ;import io.netty .handler .codec .ReplayingDecoder ;import java.util .List ;public class MyByteToLongDecoder2 extends ReplayingDecoder <Void > { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in , List <Object > out) throws Exception { System .out .println ("decoder被调用" ); out.add (in .readLong ()); } }
ReplayingDecoder使用方便,但它也有一些局限性: • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException。 • ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢
其它解码器
LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
HttpObjectDecoder:一个HTTP数据的解码器
LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
其它编码器
作者声明