服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package com.jhj.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class GroupChatServer {

//定义属性
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PROT = 6667;

//构造器
public GroupChatServer() {

//初始化
try {

//得到选择器
selector = Selector.open();
//ServerScoketChannel
listenChannel=ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PROT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
//将该listenChannel注册到selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);

} catch (IOException e) {

e.printStackTrace();
}
}

//监听
public void listen() {

try {

//循环处理
while (true) {

int count = selector.select();
//有事件处理
if (count > 0) {

//遍历得到selectionKey集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {

//取出selectionKey
SelectionKey key = iterator.next();
//监听到accept
if (key.isAcceptable()) {

SocketChannel sc = listenChannel.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress() + "上线");
}

if (key.isReadable()) {

//通道发送read事件,即通道是可读的状态
//处理读
readDate(key);
}
//删除 方式重复操作
iterator.remove();
}
} else {

System.out.println("等待...");
}
}
} catch (Exception e) {

e.printStackTrace();
} finally {


}

}

//读取客户端消息
private void readDate(SelectionKey key) {


//定义一个SocketChannel

SocketChannel channel = null;
try {

//得到channel
channel = (SocketChannel) key.channel();

//创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//获取到该channel关联的buffer
int count = channel.read(byteBuffer);
//根据count的值做处理
if (count > 0) {

//把缓冲区的数据转成字符串
String s = new String(byteBuffer.array());
//输出
System.out.println("from客户端:" + s);

//向其他客户端转发消息
sendInfoToOtherClients(s, channel);
}

} catch (IOException e) {


try {

System.out.println(channel.getRemoteAddress() + "离线了....");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException ex) {

ex.printStackTrace();
}

}
}

//转发消息给其他客户端(通道)
private void sendInfoToOtherClients(String s, SocketChannel socketChannel) throws IOException {


System.out.println("服务器转发消息中");
//遍历所有注册到select上的SocketChannel 并排除自己
for (SelectionKey key : selector.keys()) {

//通过key获取 socketChannel
Channel channel = key.channel();
//排除自己
if (channel instanceof SocketChannel && channel != socketChannel) {

//转型
SocketChannel channel1 = (SocketChannel) channel;
//将s存储到buffer
ByteBuffer wrap = ByteBuffer.wrap(s.getBytes());
//将buffer数据写入通道
channel1.write(wrap);
}
}


}


public static void main(String[] args) {

//创建一个服务器对象
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.jhj.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class GroupChatClient {


//定义相关的属性
private final String HOST="127.0.0.1";

private final int PROT=6667;

private Selector selector;
private SocketChannel socketChannel;

private String username;

//构造器 完成初始化
public GroupChatClient() throws IOException {

selector = Selector.open();

//连接服务器
socketChannel=SocketChannel.open(new InetSocketAddress(HOST,PROT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将channel注册到Selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到username
username = socketChannel.getLocalAddress().toString().substring(1);

System.out.println(username+"is ok");
}

//向服务器发送消息
public void sendInfo(String info){

info=username+"说:"+info;

try {

socketChannel.write(ByteBuffer.wrap(info.getBytes()));
}catch (IOException e){

e.printStackTrace();
}
}

//读取从服务器端回复的消息
public void readInfo(){

try {

int readChannels=selector.select(2000);
if (readChannels>0){

//有可用的通道

Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){


SelectionKey next = iterator.next();
if (next.isReadable()){

//得到相关的通道
SocketChannel channel = (SocketChannel) next.channel();
//得到一个buffer
ByteBuffer allocate = ByteBuffer.allocate(1024);
//读取
channel.read(allocate);
//把读到的缓冲区的数据转成字符串
String s = new String(allocate.array());
System.out.println(s.trim());
}
//删除当前selectkey 防止重复操作
iterator.remove();
}
}else {

// System.out.println("没有可以用的通道");
}
}catch (IOException e){

e.printStackTrace();
}
}

public static void main(String[] args) throws IOException {

//启动客户端
GroupChatClient groupChatClient = new GroupChatClient();
//启动一个线程 每隔3秒 读取从服务器发送的数据
new Thread(){

public void run(){

while (true){

groupChatClient.readInfo();
try{

Thread.currentThread().sleep(3000);
} catch (InterruptedException e){

e.printStackTrace();
}
}
}
}.start();


//发送数据
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){

String s=scanner.nextLine();
groupChatClient.sendInfo(s);
}
}
}

作者声明

1
如有问题,欢迎指正!