本文共 5287 字,大约阅读时间需要 17 分钟。
class Reactor implements Runnable{ //选择器: /** 将Selector类贴在者便于理解。 abstract class Selector { static Selector open() throws IOException; Set keys(); Set selectedKeys(); int selectNow() throws IOException; int select(long timeout) throws IOException; int select() throws IOException; void wakeup(); void close() throws IOException; } */ //声明一个选择器 final Selector selector; //声明了一个服务serverSocket; final ServerSocketChannel serverSocket; //通过构造函数传入端口号。 Reactor(int port) throws IOException { //Reactor初始化 //打开一个选择器器 selector = Selector.open(); //打开服务套接字通道 serverSocket = ServerSocketChannel.open(); //服务套接字绑定端口 serverSocket.socket().bind(new InetSocketAddress(port)); //将服务套接字设置为非阻塞。也就是能够,也就在接受连接的时候,线程不会处于阻塞状态。 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件,事件是通过通道绑定的。就是事件是通过通道和选择器绑定了。目前在服务套接字通道有 //接受事件和选择器。 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //attach callback object, Acceptor //事件发生了,那么应该做什么处理,在这里需要添加监听器(也不叫监听器,因为这是主动通知,或者主动调用,应该说添加回调函数,或者回调类) sk.attach(new Acceptor()); //以上过程都是类在初始化的过程完成的。 } public void run() { //初始化之后,run()方法先进行执行 try { while (!Thread.interrupted()) { //select在初始化的过程中已经打开了,现在开始手动开始轮询操作 selector.select(); //轮询操作,获取事件类型的集合 Set selected = selector.selectedKeys(); //迭代器遍历集合 Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件,根据事件源获取具体的handle类处理。 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { //根据类型获取具体handler类 Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { r.run(); } } // inner class class Acceptor implements Runnable { public void run() { try { //这里将不会阻塞,因为上面设置了serverSocket为非阻塞式的。 SocketChannel channel = serverSocket.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException ex) { /* ... */ } } }}
//这里现在开始分析Handler类。
class Handler implements Runnable{ final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now sk = channel.register(selector, 0); //将Handler作为callback对象 sk.attach(this); //第二步,注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); //导这里唤醒线程,不会跑run方法,从而导致循环再来一遍。从而当线程再次循环跑的时候,OP_READ对应的就是中这个handle的拉。不会再跑Acceptor里面的run()方法拉。 selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } void read() throws IOException { channel.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now //第三步,接收write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { sk.cancel(); } }}
synchronized void read() throws IOException { // ... channel.read(input); if (inputIsComplete()) { state = PROCESSING; //使用线程pool异步执行 pool.execute(new Processer()); } }
转载地址:http://yysoi.baihongyu.com/