专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

tigase源码分析3:SocketThread

SocketThread 专用于处理客户端SOCKET的读写事件的线程,当服务器端SOCKET接受到客户socket,就会生成一个与对应的IOService,IOService.socketIO指向SocketIO对象,

SocketIO是对java api中SocketChannel的封装,所以拿到IOService也就等于拿到客户端SocketChannel了。SocketThread 是一个私有类,他在第一次加载的时候,就会创建了3类线程,

socketReadThread():负责读socket的数据;

socketWriteThread():负责写入socket数据;

ResultsListener:负责监视CompletionService执行结果IOService完成情况,判断IOService中的socket连接是否关闭,如没有则继续注册入SocketThread 的Selector中进行事件侦听;

SocketThread ::  
private static SocketThread[] socketReadThread = null;  
private static SocketThread[] socketWriteThread = null;  
private static ThreadPoolExecutor executor = null;  
private static CompletionService<IOService<?>> completionService = null;  
//下面是实例属性  
private Selector clientsSel = null;  
private boolean reading = false;  
private boolean writing = false;  

static {  
        if (socketReadThread == null) {  
            int nThreads = (cpus * DEF_MAX_THREADS_PER_CPU) / 2 + 1;  

            executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,  
                    new LinkedBlockingQueue<Runnable>());  
            completionService = new ExecutorCompletionService<IOService<?>>(executor);  
                        //执行任务的线程池  
            socketReadThread = new SocketThread[nThreads]; //一组负责读socket的数据; </span><span style="font-size: small;">  
                         socketWriteThread = new SocketThread[nThreads]; //一组负责写socket的数据;  
            for (int i = 0; i < socketReadThread.length; i++) {  
                socketReadThread[i] = new SocketThread("socketReadThread-" + i);  
                socketReadThread[i].reading = true;  

                Thread thrd = new Thread(socketReadThread[i]);  

                thrd.setName("socketReadThread-" + i);  
                thrd.start();//启动,会执行run()  
            }  

            log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length);  

            for (int i = 0; i < socketWriteThread.length; i++) {  
                socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);  
                socketWriteThread[i].writing = true;  

                Thread thrd = new Thread(socketWriteThread[i]);  

                thrd.setName("socketWriteThread-" + i);  
                thrd.start();////启动,会执行run()  
            }  

            log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length);  
        }    // end of if (acceptThread == null)  
    }  

       //生成每一个SocketThread都会有一个对应ResultsListener线程</span><span style="font-size: small;"><span style="font-size: small;">  
    private SocketThread(String name) {  
        try {  
            clientsSel = Selector.open();  
        } catch (Exception e) {  
            log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);  
            stopping = true;  
        }    // end of try-catch  

        new ResultsListener("ResultsListener-" + name).start();  
    }  

<span style="font-size: small;">  public void SocketThread.run() {  
        while ( !stopping) {  
            try {  
                clientsSel.select();  

                if (log.isLoggable(Level.FINEST)) {  
                    log.log(Level.FINEST, "Selector AWAKE: {0}", clientsSel);  
                }  
                              //等到已选择的key,证明有数据要处理  
                Set<SelectionKey> selected = clientsSel.selectedKeys();  
                int selectedKeys = selected.size();  

                if ((selectedKeys == 0) && (waiting.size() == 0)) {  
                    if (log.isLoggable(Level.FINEST)) {  
                        log.finest("Selected keys = 0!!! a bug again?");  
                    }  

                    if ((++empty_selections) > MAX_EMPTY_SELECTIONS) {  
                        recreateSelector();  
                    }  
                } else {  
                    empty_selections = 0;  

                    if (selectedKeys > 0) {  

                        for (SelectionKey sk : selected) {  
                                                //得到ConnectionListenerImpl.accept()中绑定的ioservice  
                            IOService s = (IOService) sk.attachment();  

                            try {  

                                .....  
                                                       //下一次socket从selector监听队列中移除  
                                sk.cancel();  

                                forCompletion.add(s);  

                            } catch (CancelledKeyException e) {  
                            ...  
                            }  
                        }  
                    }  

                    // Clean-up cancelled keys...  
                    clientsSel.selectNow();  
                }  
                              //注册新的socket到selector中进行监听       
                addAllWaiting();  

                IOService serv = null;  

                while ((serv = forCompletion.pollFirst()) != null) {  
                                       //放线程沲中执行,调用了IOService.call()进行数据处理  
                    completionService.submit(serv);  
                }  

                // clientsSel.selectNow();  
            } catch (CancelledKeyException brokene) {  

                。。  
            } catch (IOException ioe) {  
                           。。  
            } catch (Exception exe) {  
            ..  
            }  
        }  
    }</span>  

//ResultsListener.run()  
    public void ResultsListener.run() {  
    for (;;) {  
    try {  
//CompletionService的实现是维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法得到的对象其实就是IOService。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。  
      //其实这里的设计非常巧妙,当读到要处理事件进来后,把selector中对应的socket移出,当完成socket数据处理后只要连接还开启,再次加入selector中进行监听,所以客户端可以发送一个空字符串来进行心跳处理,维持客户端和服务器进行长连接。  

    IOService<?> service = completionService.take().get();  
        if (service != null) {  
        if (service.isConnected()) {//只要连接没关闭  
            addSocketService(service);//就再次注册到线程的Selector中  
         ............  
                  }  
           }  
        } 

文章永久链接:https://tech.souyunku.com/29966

未经允许不得转载:搜云库技术团队 » tigase源码分析3:SocketThread

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们