博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Reactor模式
阅读量:4188 次
发布时间:2019-05-26

本文共 5287 字,大约阅读时间需要 17 分钟。

对reactor模型的理解。

reactor设计模式:是基于事件驱动的设计模式。目前主要是针对reactor设计模式运用网络编程中,所以reactor的主要事件分为连接,接受,读取,写入 主要的四个事件类型。当然,reactor除了有事件驱动,还有轮询选择器,就是检查某个文件描述处于什么状态。或者说检查某个channel处于某个状态(或者说某个事件发生了),并将这个状态放入到选择器的队列中。最后reactor分发器将根据具体的事件类型,最后由调度器分发调用具体的执行操作。下面是针对具体的代码做分析。

首先分析reactor类(组件)

class Reactor implements Runnable{    //选择器:        /**        将Selector类贴在者便于理解。        abstract class Selector {         static Selector open() throws IOException;         Set keys();         Set selectedKeys();         int selectNow() throws IOException;         int select(long timeout) throws IOException;         int select() throws IOException;         void wakeup();         void close() throws IOException;  } */      //声明一个选择器    final Selector selector;   //声明了一个服务serverSocket;    final ServerSocketChannel serverSocket;   //通过构造函数传入端口号。    Reactor(int port) throws IOException    { //Reactor初始化         //打开一个选择器器        selector = Selector.open();        //打开服务套接字通道        serverSocket = ServerSocketChannel.open();        //服务套接字绑定端口        serverSocket.socket().bind(new InetSocketAddress(port));        //将服务套接字设置为非阻塞。也就是能够,也就在接受连接的时候,线程不会处于阻塞状态。        serverSocket.configureBlocking(false);        //分步处理,第一步,接收accept事件,事件是通过通道绑定的。就是事件是通过通道和选择器绑定了。目前在服务套接字通道有        //接受事件和选择器。        SelectionKey sk =                serverSocket.register(selector, SelectionKey.OP_ACCEPT);        //attach callback object, Acceptor        //事件发生了,那么应该做什么处理,在这里需要添加监听器(也不叫监听器,因为这是主动通知,或者主动调用,应该说添加回调函数,或者回调类)        sk.attach(new Acceptor());        //以上过程都是类在初始化的过程完成的。    }    public void run()    {    //初始化之后,run()方法先进行执行        try        {            while (!Thread.interrupted())            {               //select在初始化的过程中已经打开了,现在开始手动开始轮询操作                selector.select();                //轮询操作,获取事件类型的集合                Set selected = selector.selectedKeys();                //迭代器遍历集合                Iterator it = selected.iterator();                while (it.hasNext())                {                    //Reactor负责dispatch收到的事件,根据事件源获取具体的handle类处理。                    dispatch((SelectionKey) (it.next()));                }                selected.clear();            }        } catch (IOException ex)        { /* ... */ }    }    void dispatch(SelectionKey k)    {       //根据类型获取具体handler类        Runnable r = (Runnable) (k.attachment());        //调用之前注册的callback对象        if (r != null)        {            r.run();        }    }    // inner class    class Acceptor implements Runnable    {        public void run()        {            try            {              //这里将不会阻塞,因为上面设置了serverSocket为非阻塞式的。                SocketChannel channel = serverSocket.accept();                if (channel != null)                    new Handler(selector, channel);            } catch (IOException ex)            { /* ... */ }        }    }}

//这里现在开始分析Handler类。

class Handler implements Runnable{    final SocketChannel channel;    final SelectionKey sk;    ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);    ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);    static final int READING = 0, SENDING = 1;    int state = READING;    Handler(Selector selector, SocketChannel c) throws IOException    {        channel = c;        c.configureBlocking(false);        // Optionally try first read now        sk = channel.register(selector, 0);        //将Handler作为callback对象        sk.attach(this);        //第二步,注册Read就绪事件        sk.interestOps(SelectionKey.OP_READ);        //导这里唤醒线程,不会跑run方法,从而导致循环再来一遍。从而当线程再次循环跑的时候,OP_READ对应的就是中这个handle的拉。不会再跑Acceptor里面的run()方法拉。        selector.wakeup();    }    boolean inputIsComplete()    {        /* ... */        return false;    }    boolean outputIsComplete()    {        /* ... */        return false;    }    void process()    {        /* ... */        return;    }    public void run()    {        try        {            if (state == READING)            {                read();            }            else if (state == SENDING)            {                send();            }        } catch (IOException ex)        { /* ... */ }    }    void read() throws IOException    {        channel.read(input);        if (inputIsComplete())        {            process();            state = SENDING;            // Normally also do first write now            //第三步,接收write就绪事件            sk.interestOps(SelectionKey.OP_WRITE);        }    }    void send() throws IOException    {        channel.write(output);        //write完就结束了, 关闭select key        if (outputIsComplete())        {            sk.cancel();        }    }}

上面主要是reactor的单线程模式,单线程模式不能应该高并非模式,如果线程在处理数据的时候(process())出现了阻塞,那么这将导致后面的连接都进不了,从而导致后面的连接都将失败。因此,reactor单线程是在高并发的情况下一般不用。

接下来就考虑reactor多线程模型,多线程模型就是在处理数据的使用多线程处理,这样就不会导致线程阻塞了。以下这段代码在单线程的Handle类的基础上做了修改。

synchronized void read() throws IOException    {        // ...        channel.read(input);        if (inputIsComplete())        {            state = PROCESSING;            //使用线程pool异步执行            pool.execute(new Processer());        }    }

多线程的处理只解决了后续Handle处理过程中的阻塞问题,但是Reactor的主线程接受连接还是单连接,如果在高并发,或者接受连接的过程还需要做什么安全认证处理,那么也有可能导致接受连接阻塞,那么这个时候,Reactor也需要开闭多个线程。那么最后Reactor就演变程主从线程组了。

关于main方法执行完,进程还不退出,那是因为,线程池的子线程里面的工作线程在阻塞队列里面拿数据的时候,没有数据可拿,会导致阻塞,一直等待有数据。拿到数据后,工作线程除了,又去拿,一直这样循环下去,导致工作线程不会死掉,从而进程也不回死掉。

关于NGINX的百万连接

转载地址:http://yysoi.baihongyu.com/

你可能感兴趣的文章
中化CIO彭劲松:IT治理让我明明白白做事
查看>>
中国惠普公司企业计算及专业服务集团卫东:IT治理最重要就是保证技术与业务有效结合
查看>>
【MVP】 Wenzhong Huang 北大硕士,微软MVP,微软嵌入式讲师,MCSE
查看>>
解析ERP部署的三角模型
查看>>
百感交集:一个IT人应该如何面对失业?
查看>>
服装经营中关于直销、加盟、代理和联营的区别
查看>>
盯上好男人 服装业B2C暗战
查看>>
局域网内部管理行为应该如何控制?
查看>>
CIO--成,获得认可;败,危及部门生存
查看>>
ERP专家童継龙:ERP从神秘少女变成邻家小妹
查看>>
Palm之祭
查看>>
两种不同的Web应用
查看>>
.Net多线程总结(一)
查看>>
让 ASP.NET MVC 支持 HotSwap
查看>>
Http请求处理流程
查看>>
如何利用客户端缓存对网站进行优化?
查看>>
ASP.NET 应用程序性能优化
查看>>
lr监视的性能计数器
查看>>
优化 SQL Server 查询性能
查看>>
小道消息 sd2.0大会国外大师介绍
查看>>