服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.jhj.netty.groupchat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupChatServer {

private int port;//监听端口

public GroupChatServer(int port){

this.port=port;
}

//边学run方法,处理客户端请求
public void run() throws Exception {

//创建两个线程中
EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

try {

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {


@Override
protected void initChannel(SocketChannel ch) throws Exception {

//获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解码器
pipeline.addLast("decoder",new StringDecoder());
//向pipeline加入解码器
pipeline.addLast("encoder",new StringEncoder());
//加入自己的业务处理
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty服务器启动");
ChannelFuture channelFuture = b.bind(port).sync();


//监听关闭
channelFuture.channel().closeFuture().sync();
} finally {

bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {

new GroupChatServer(7000).run();
}
}

服务端handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.jhj.netty.groupchat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {


//定义一个channel 组,来管理所有的channel
//GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

//handlerAdded 表示连接建立,一旦连接,第一个被执行
//将当前channel加入到channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

//将该客户加入连天的新信息推送给其他在线的客户端
Channel channel = ctx.channel();
/*
该方法会将channelGroup中所有的channel遍历,并发送消息
我们不需要自己遍历
*/
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天\n");
channelGroup.add(channel);
}


//表示channel处于活动状态,提示xxx上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println(ctx.channel().remoteAddress()+"上线了");
}

//表示channel处于不活动状态,提示xxx离线了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

System.out.println(ctx.channel().remoteAddress()+"离线了");
}

//断开连接 将xx客户离开信息推送给当前在线的客户
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开聊天"+sdf.format(new Date())+"\n");
}

//异常 关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

ctx.close();
}

//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

//获取当前channel
Channel channel = ctx.channel();

//遍历channelGroup
channelGroup.forEach(ch->{

if (channel!=ch){

ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息"+msg+"\n");
}else{

ch.writeAndFlush("[自己]发送了消息"+msg+"\n");
}
});


}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.jhj.netty.groupchat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;

public class GroupChatClient {


//属性
private final String host;

private final int port;

public GroupChatClient(String host,int port){

this.host=host;
this.port=port;
}


public void run() throws Exception {


EventLoopGroup group = new NioEventLoopGroup();
try {

Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {


@Override
protected void initChannel(SocketChannel ch) throws Exception {

//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入handler
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("------"+channel.remoteAddress()+"------");

//客户需要输入信息 创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){

String msg=scanner.nextLine();
channel.writeAndFlush(msg+"\r\n");
}

} finally {

group.shutdownGracefully();
}

}

public static void main(String[] args) throws Exception {

new GroupChatClient("127.0.0.1",7000).run();
}



}

客户端handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.jhj.netty.groupchat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {



@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

System.out.println(msg.trim());
}
}

作者声明

1
如有问题,欢迎指正!