基本说明

  1. netty的组件设计:Netty的主要组件有Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe等
  2. ChannelHandler充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现ChannelInboundHandler接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发送响应时,也可以从ChannelInboundHandler冲刷数据。业务逻辑通常写在一个或者多个ChannelInboundHandler中。ChannelOutboundHandler原理一样,只不过它是用来处理出站数据的
  3. ChannelPipeline提供了ChannelHandler链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的,那么我们称这些事件为出站的,即客户端发送给服务端的数据会通过pipeline中的一系列ChannelOutboundHandler,并被这些Handler处理,反之则称为入站的
    在这里插入图片描述

编码解码器

  1. 当Netty发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如java对象);如果是出站消息,它会被编码成字节。
  2. Netty提供一系列实用的编解码器,他们都实现了ChannelInboundHadnler或者ChannelOutboundHandler接口。在这些类中,channelRead方法已经被重写了。以入站为例,对于每个从入站Channel读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的decode()方法进行解码,并将已经解码的字节转发给ChannelPipeline中的下一个ChannelInboundHandler。

解码器-ByteToMessageDecoder

1.
关系继承图
在这里插入图片描述

1.
由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理.

1.
一个关于ByteToMessageDecoder实例分析

1
2
3
4
5
6
7
8
9
10
11
12
public class ToIntegerDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
{

if (in.readableBytes() >= 4) {

out.add(in.readInt());
}
}
}

说明:

  1. 这个例子,每次入站从ByteBuf中读取4字节,将其解码为一个int,然后将它添加到下一个List中。当没有更多元素可以被添加到该List中时,它的内容将会被发送给下一个ChannelInboundHandler。int在被添加到List中时,会被自动装箱为Integer。在调用readInt()方法前必须验证所输入的ByteBuf是 否具有足够的数据

案例

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MyClient {


public static void main(String[] args) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

try{

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer());//自定义一个初始化类

ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {

group.shutdownGracefully();
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyClientHandler extends SimpleChannelInboundHandler<Long> {


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("MyClientHandler 发送数据");

ctx.writeAndFlush(123456L);
}


@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

System.out.println(msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {


@Override
protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();
//加入出站的handler 对数据进行一个编码
pipeline.addLast(new MyLongToByteEncoder());
pipeline.addLast(new MyByteToLongDecoder());
//加入一个自定义的handler,处理业务
pipeline.addLast(new MyClientHandler());
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class Myserver {


public static void main(String[] args) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try{

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());//自定义一个初始化类

ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {

bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class MyServerHandler extends SimpleChannelInboundHandler<Long> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {

System.out.println("读取到从客户端"+msg);

//给客户端发送一个long
ctx.writeAndFlush(98765l);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

cause.printStackTrace();
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {


@Override
protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();
//入站的Handler进行解码 MyByteToLongDecoder
//MessageToByteEncoder 和 ByteToMessageDecoder不冲突
pipeline.addLast(new MyByteToLongDecoder());
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler 处理业务罗技
pipeline.addLast(new MyServerHandler());
}
}

编解码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;

import java.util.List;

public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {


@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {

System.out.println("encode 被调用");
System.out.println("msg="+msg);
out.writeLong(msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.jhj.netty.inboundhandlerandoutboundhandler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class MyByteToLongDecoder extends ByteToMessageDecoder {

/**
*decode 会根据接收的数据,被调用多次,知道确定没有新的元素被添加到list,或者是ByteBuf 没有更多的可读字节为止
* 如果list out不为空 就会将list的内容传递给下一个channelinboundhandler处理,该处理器的方法也会调用多次
*
* @param ctx 上下文 the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in 入站的ByteBuf the {@link ByteBuf} from which to read data
* @param out List集合,将解码后的数据传给下一个handler the {@link List} to which decoded messages should be added
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {


System.out.println("decoder被调用");
//因为long8个字节,需要判断有8个字节才能读取一个long
if (in.readableBytes()>=8){

out.add(in.readLong());
}
}
}

总结

不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行
在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果和期望结果可能不一致

其他解码器

解码器-ReplayingDecoder

  1. public abstract class ReplayingDecoder extends ByteToMessageDecoder

  2. ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法。参数S指定了用户状态管理的类型,其中Void代表不需要状态管理

  3. 应用实例:使用ReplayingDecoder 编写解码器,对前面的案例进行简化 [案例演示]

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    package com.jhj.netty.inboundhandlerandoutboundhandler;

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.handler.codec.ReplayingDecoder;

    import java.util.List;

    public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {

    /**
    *decode 会根据接收的数据,被调用多次,知道确定没有新的元素被添加到list,或者是ByteBuf 没有更多的可读字节为止
    * 如果list out不为空 就会将list的内容传递给下一个channelinboundhandler处理,该处理器的方法也会调用多次
    *
    * @param ctx 上下文 the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
    * @param in 入站的ByteBuf the {@link ByteBuf} from which to read data
    * @param out List集合,将解码后的数据传给下一个handler the {@link List} to which decoded messages should be added
    * @throws Exception
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {


    System.out.println("decoder被调用");

    //不需要判断数据是否足够读取内部会进行哦按段
    out.add(in.readLong());
    }
    }
  4. ReplayingDecoder使用方便,但它也有一些局限性:
    • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException。
    • ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢

其它解码器

  1. LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
  2. DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  3. HttpObjectDecoder:一个HTTP数据的解码器
  4. LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。

其它编码器

在这里插入图片描述

作者声明

1
如有问题,欢迎指正!