Netty模型
工作原理示意图1-简单版
Netty 主要基于主从 Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor
- BossGroup 线程维护Selector , 只关注Accecpt
- 当接收到Accept事件,获取到对应的SocketChannel, 封装成 NIOScoketChannel并注册到Worker 线程(事件循环), 并进行维护
- 当Worker线程监听到selector 中通道发生自己感兴趣的事件后,就进行处理(就由handler), 注意handler 已经加入到通道
工作原理示意图2-进阶版
Netty 主要基于主从Reactors 多线程模型(如图)做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor
工作原理示意图-详细版
Netty抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是NioEventLoop
NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个NioEventLoop 都有一个selector , 用于监听绑定在其上的socket的网络通讯
NioEventLoopGroup 可以有多个线程, 即可以含有多个NioEventLoop
每个Boss NioEventLoop 循环执行的步骤有3步
轮询accept 事件
处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NIOEventLoop 上的 selector
处理任务队列的任务 , 即 runAllTasks
每个 Worker NIOEventLoop 循环执行的步骤
轮询read, write 事件
处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理
处理任务队列的任务 , 即 runAllTasks
每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline
Netty快速入门实例-TCP服务
导包
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.jhj.netty.simple;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); } });
System.out.println("客户端 ok..");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { group.shutdownGracefully(); } } }
|
handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.jhj.netty.simple;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,server",CharsetUtil.UTF_8)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址"+ctx.channel().remoteAddress()); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.jhj.netty.simple;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer { public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,128) .childOption(ChannelOption.SO_KEEPALIVE,true) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("...服务器 is ready...");
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|
handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.jhj.netty.simple;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx=" + ctx);
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址" + ctx.channel().remoteAddress()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~", CharsetUtil.UTF_8));
}
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
|
任务队列中的 Task 有 3 种典型使用场景
用户程序自定义的普通任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello",CharsetUtil.UTF_8)); } catch (InterruptedException e) { throw new RuntimeException(e); } } });
ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(20*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello",CharsetUtil.UTF_8)); } catch (InterruptedException e) { throw new RuntimeException(e); } } });
System.out.println("server ctx=" + ctx);
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址" + ctx.channel().remoteAddress()); }
|
用户自定义定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(20*1000); ctx.writeAndFlush(Unpooled.copiedBuffer("hello",CharsetUtil.UTF_8)); } catch (InterruptedException e) { throw new RuntimeException(e); } } },5, TimeUnit.SECONDS);
|
非当前 Reactor 线程调用 Channel 的各种方法例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费
方案再说明
- Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
- NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个selector,用于监听绑定在其上的 socket 网络通道。
- NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO 线程 NioEventLoop 负责
• NioEventLoopGroup 下包含多个 NioEventLoop
• 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
• 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
• 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
• 每个 NioChannel 都绑定有一个自己的 ChannelPipelin
作者声明