1.4.2、selector
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用
- 只能用于网络IO,文件IO不可用,因为FileChannel没有阻塞模式
- 如果不用selector多路复用,线程一直在做无用功。selector能够
- 有可连接事件才去连接
- 有可读事件才去读取
- 有可写事件才去写入
- 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件
- 有可写事件才去写入
常用方法
//1.获取selector
Selector selector = Selector.open();
//2.注册事件,前提channel必须配置为非阻塞毛事
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
channel可绑定的事件类型
- connect - 客户端连接成功时触发
- accept - 服务器端成功接受连接时触发
- read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
//1.监听channel事件,
int count = selector.select();//阻塞直到事件发生
int count = selector.select(long timeout);//阻塞直到事件发生获取超时
int count = selector.selectNow();//非阻塞,不管有无事件发生,立即返回,自己根据返回值查看知否有事件
@Slf4j
//selector
public class SocketSelectorServerTest {
public static void main(String[] args) throws IOException {
//1.获取selector
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(5888));
//2.将channel注册到selector中
SelectionKey key = ssc.register(selector, 0, null);
//key只关注accept事件
key.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key...{}", key);
while (true) {
//在事件未处理时,不阻塞
//3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
//select在事件未处理时,他不会阻塞,事件发生要么处理,要么取消,不能置之不理
selector.select();
//4.处理事件,SelectionKeys里包含了所有发生的事件
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();//问题1解决
log.debug("key...{}", next);
//5.区分事件类型
if (next.isAcceptable()) {//如果是accept
//1.处理事件
ServerSocketChannel channel = (ServerSocketChannel) next.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
log.debug("{}", sc);
scKey.interestOps(SelectionKey.OP_READ);
} else if (next.isReadable()) {//如果是read
try {
SocketChannel channel = (SocketChannel) next.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
if (-1 == read) {//问题3解决
next.cancel();
}else {
buffer.flip();
ByteBufferUtil.debugRead(buffer);
}
} catch (IOException e) {//问题2解决
e.printStackTrace();
next.cancel();//客户端关闭了,需要取消事件
}
}
}
}
}
}
事件发生后,要么处理,要么取消(cancel),不能置之不理
问题1:启动服务后,启动客户端,debug模式发送消息后,空指针异常
原因:selector会在事件发生后,向集合中存入key,但不会删除,等到发送消息的时间后,循环获取到的第一个key已经处理过的,这个key不带时间的,所以47行accep方法返回的是null
解决:迭代器中调用remove移除
问题2:客户异常关闭后,服务器报客户单关闭异常,服务器就停止了。
解决:try catch捕获异常,有异常是,调用cancel()
问题3:客户端正常关闭后(channel.close()),服务端还有问题,因为正常断开,会产生一个读事件,不在异常里面,捕获不了
解决:通过channel.read(buffer);的返回值判断,如果-1,表示断开,调用cancel
消息边界问题
如果bytebuffer长度超过了客户端数据的长度,就有消息边界问题。解决方法主要有三种:
- 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低
- TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
服务端
@Slf4j
//处理消息边界问题
public class SelectorMsgServerTest {
public static void main(String[] args) throws IOException {
//1.获取selector
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(5888));
//2.将channel注册到selector中
SelectionKey key = ssc.register(selector, 0, null);
//key只关注accept事件
key.interestOps(SelectionKey.OP_ACCEPT);
//log.debug("register key...{}", key);
while (true) {
//在事件未处理时,不阻塞
//3.select方法,没有事件发生,线程阻塞,有事件,线程才会恢复运行
//select在事件未处理时,他不会阻塞,事件发生要么处理,要么取消,不能置之不理
selector.select();
//4.处理事件,SelectionKeys里包含了所有发生的事件
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();//问题1解决
//log.debug("key...{}", next);
//5.区分事件类型
if (next.isAcceptable()) {//如果是accept
//1.处理事件
ServerSocketChannel channel = (ServerSocketChannel) next.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
//att 就是附件的意思,可以为每个channel添加一个buffer作为附件,在扩容中间共享buffer
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment附件
SelectionKey scKey = sc.register(selector, 0, buffer);
//log.debug("{}", sc);
scKey.interestOps(SelectionKey.OP_READ);
} else if (next.isReadable()) {//如果是read
try {
SocketChannel channel = (SocketChannel) next.channel();
ByteBuffer buffer = (ByteBuffer) next.attachment();
int read = channel.read(buffer);
if (-1 == read) {//问题3解决
next.cancel();
} else {
split(buffer);
//需要扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
next.attach(newBuffer);
}
}
} catch (IOException e) {//问题2解决
e.printStackTrace();
next.cancel();//客户端关闭了,需要取消事件
}
}
}
}
}
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把这条完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
ByteBufferUtil.debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}
}
客户端
public class MsgClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 5888));
SocketAddress address = sc.getLocalAddress();
sc.write(Charset.defaultCharset().encode("0123\n456789abcdef"));
sc.write(Charset.defaultCharset().encode("-xxx0123456789abcdef3333\n"));
System.in.read();
}
}
结果:
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 0a |0123. |
+--------+-------------------------------------------------+----------------+
+--------+-------------------- all ------------------------+----------------+
position: [37], limit: [37]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 34 35 36 37 38 39 61 62 63 64 65 66 2d 78 78 78 |456789abcdef-xxx|
|00000010| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef|
|00000020| 33 33 33 33 0a |3333. |
+--------+-------------------------------------------------+----------------+
ByteBuffer 大小分配
- 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
- ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
- 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗