1.4.4、多线程优化
设计思路:分两组选择器
- 单线程配一个选择器,专门处理accpet事件(建立连接) BOSS
- 创建多线程,每个线程一个选择器,专门处理read事件 WORK
服务端
@Slf4j
public class ThreadServer {
public static void main(String[] args) throws IOException, InterruptedException {
Thread.currentThread().setName("boss thread");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector bossSelector = Selector.open();
ssc.register(bossSelector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(50001));
Worker worker = new Worker("work thread");
worker.register();
while (true) {
bossSelector.select();
Iterator<SelectionKey> iter = bossSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
//关联selector
log.debug("before register...{}", sc.getRemoteAddress());
sc.register(worker.workSelector, SelectionKey.OP_READ, null);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
@Data
static class Worker implements Runnable {
private String name;
private Selector workSelector;
private volatile boolean flag = false;
public Worker(String name) {
this.name = name;
}
//初始化线程和selector
public void register() throws IOException {
if (!flag) {
workSelector = Selector.open();
new Thread(this, name).start();
flag = true;
}
}
@Override
public void run() {
while (true) {
try {
workSelector.select();
Iterator<SelectionKey> iter = workSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel sc = (SocketChannel) key.channel();
log.debug("read...{}", sc.getRemoteAddress());
sc.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
客户端
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 50001));
SocketAddress address = sc.getLocalAddress();
System.out.println(address.toString());
sc.write(Charset.defaultCharset().encode("123456"));
System.in.read();
}
}
问题:只打印了before信息,没有打印after和read信息,因为worker线程start后,run方法中workSelector.select();会阻塞,在主线程boss中,sc.register(workerSelecotr..)就不会执行.
run方法是在work线程中执行,sc.register是在boss线程中执行,只要run运行在register之前就会阻塞、
解决:把worker.register();放到sc.register之前,这是sc.register就可能发生在work.register之前,selector.select()后执行,就没有问题了。
新问题:再来一个客户端,之前的select()方法已经执行阻塞了,新的就register不了
解决:把worker.register();和sc.register(worker.workSelector, SelectionKey.OP_READ, null);放早同一个线程里,就可以控制顺序
public void register(SocketChannel sc) throws IOException {
if (!flag) {
workSelector = Selector.open();
new Thread(this, name).start();
flag = true;
}
sc.register(workSelector, SelectionKey.OP_READ, null);
}
但是register还在在boss线程中被调用,只有run方法才是在work线程中执行。
- 方案1:采用一个队列,将sc.register(workSelector, SelectionKey.OP_READ, null)放入队列中,然后在run方法中从队列中拿到任务,执行,还要调用warkUp唤醒select的阻塞
@Slf4j
public class ThreadServer {
public static void main(String[] args) throws IOException, InterruptedException {
Thread.currentThread().setName("boss thread");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector bossSelector = Selector.open();
ssc.register(bossSelector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(50001));
Worker worker = new Worker("work thread");
while (true) {
bossSelector.select();
Iterator<SelectionKey> iter = bossSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.debug("connected...{}", sc.getRemoteAddress());
//关联selector
log.debug("before register...{}", sc.getRemoteAddress());
worker.register(sc);
log.debug("after register...{}", sc.getRemoteAddress());
}
}
}
}
@Data
static class Worker implements Runnable {
private String name;
private Selector workSelector;
private volatile boolean flag = false;
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
//初始化线程和selector
public void register(SocketChannel sc) throws IOException {
if (!flag) {
workSelector = Selector.open();
new Thread(this, name).start();
flag = true;
}
queue.add(()->{
try {
sc.register(workSelector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
});
workSelector.wakeup();//唤醒select方法
}
@Override
public void run() {
while (true) {
try {
workSelector.select();
//取出任务并执行
Runnable task = queue.poll();
if (task != null) {
task.run();
}
Iterator<SelectionKey> iter = workSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(16);
SocketChannel sc = (SocketChannel) key.channel();
log.debug("read...{}", sc.getRemoteAddress());
sc.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
- 方案2:不用队列,23步都是在boss线程中执行,顺序固定
分析:wakeup可以在select前或后唤醒,相当于发一张票
- 123顺序,先阻塞,然后warkup唤醒,没问题
- 213,ok
- 231,和第2情况相同