专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

tigase源码分析4:packet处理

这节主要讲数据包packet 的流转过程,如图大概说明packet被处理的流程,但实际上packet最终的处理者是插件,这个过程是在packet流转到SM中被分发到对它感兴趣的processor中处理的,下节将会详细说明packet被SM处理情况:

94_1.png

被处理的包packet,一旦被会话管理器和处理器插件(session manager and processor plugins)处理完成,数据包会被摧毁。因此一个处理器将数据包转发到目的地前必须创建一个包的副本,并设置所有属性才返回处理结果。当然处理器可以生成任意数量的数据包。

所以你会看到上图显示2个用户userA 和userB之间的通信的数据包送到最终目的地前被复制两次,messagerouter可以看作为一个包的路由器,他根据packet中的to属性值选择相应的处理组件进行投递到对方的处理队列里,这个包将被下一个组件执行processPacket(packet)进行处理

在启动章分析到XMPPServer中执行 MessageRouter.start();开启多线程处理in out packet.

94_2.png

public void MessageRouter.start() {  
    super.start();  
}

从以上的继承图得知道,将执行AbstractMessageReceiver.start()

@Override  
    public void AbstractMessageReceiver.start() {  
        startThreads();  
    }  

private ArrayDeque<QueueListener>  threadsQueue=null;  
private void startThreads() {  
  if (threadsQueue == null) {  
    threadsQueue = new ArrayDeque<QueueListener>(8);  
    for (int i = 0; i < in_queues_size; i++) {  
    QueueListener in_thread = new QueueListener(in_queues.get(i), QueueType.IN_QUEUE);  

        in_thread.setName("in_" + i + "-" + getName());  
        in_thread.start();  
        threadsQueue.add(in_thread);  
    }  
  for (int i = 0; i < out_queues_size; i++) {  
    QueueListener out_thread = new QueueListener(out_queues.get(i), QueueType  
                        .OUT_QUEUE);  

            out_thread.setName("out_" + i + "-" + getName());  
        out_thread.start();  
        threadsQueue.add(out_thread);  
    }  
    }    // end of if (thread == null || ! thread.isAlive())  
......  
}

每个MessageReceiver组件都 有多个线程分别处理各自packet,来一个简单的模型图表明

94_3.png

QueueListener是AbstractMessageReceiver的内部类,所以QueueListener内部能直接访问到AbstractMessageReceiver对象的方法。由此可见,象ClientConnectionManager,S2SConnectionManager

,MessageRouter,SessionManager 这些子类都有各自的in out线程负责处理投递到他们节点上的packet,

private QueueType    type          = null;  
private boolean      threadStopped = false;  
private PriorityQueueAbstract<Packet> queue;  
private QueueListener(PriorityQueueAbstract<Packet> q, QueueType type) {  
    this.queue = q;  
    this.type  = type;  
    compName   = AbstractMessageReceiver.this.getName();  
}  

@Override  
public void QueueListener.run() {  

    Packet        packet  = null;  
    Queue<Packet> results = new ArrayDeque<Packet>(2);  

    while (!threadStopped) {  
        try {  

        packet = queue.take();//阻塞方法  
        ++packetCounter;  
        switch (type) {  
         case IN_QUEUE :  
               long startPPT = System.currentTimeMillis();  

            PacketReceiverTask task = null;  

            if (packet.getTo() != null) {  
            String id = packet.getTo().toString() + packet.getStanzaId();  

               task = waitingTasks.remove(id);  
                        }  
            if (task != null) {  
             task.handleResponse(packet);  
            } else {  
        boolean processed = false;  
         if (packet.isCommand() && (packet.getStanzaTo() != null) && compName.equals(  
                  packet.getStanzaTo().getLocalpart()) && isLocalDomain(packet  
                .getStanzaTo().getDomain())) {  
                processed = processScriptCommand(packet, results);  
        if (processed) {  
         Packet result = null;  
          while ((result = results.poll()) != null) {  
            addOutPacket(result);  
        }  
     }  
       }  
       if (!processed && ((packet = filterPacket(packet, incoming_filters)) !=null)) {  
        processPacket(packet);//执行具体实现类的处理方法  
    }  

    int idx = pptIdx;  

    pptIdx = (pptIdx + 1) % processPacketTimings.length;  

    long timing = System.currentTimeMillis() - startPPT;  

    processPacketTimings[idx] = timing;  
    }  

    break;  

     case OUT_QUEUE :  

             if ((packet = filterPacket(packet, outgoing_filters)) != null) {  
            processOutPacket(packet);//执行具体的实现类的处理方法  
          }  

            break;  
        default :  
            break;  
        }    // end of switch (qel.type)  
        } catch (InterruptedException e) {  
                                     .....  
        }      // end of while (! threadStopped)  
        }

private MessageReceiver     parent  = null;  
//在初始化时,parent被赋值为MessageRouter对象。  
public void AbstractMessageReceiver.processOutPacket(Packet packet) {  
        if (parent != null) {  
          parent.addPacket(packet);//过渡到MessageRouter对象进行处理  
        } else {  
         addPacketNB(packet);  
        }    // end of else  
    }

父类实现默认addPacketNB()添加packet到队列的方法,如果子类没有重写该方法则使用父类的这个方法加入packet到他们各自的in 或out的处理队列里

public boolean AbstractMessageReceiver.addPacketNB(Packet packet) {  
   int queueIdx = Math.abs(hashCodeForPacket(packet) % in_queues_size);//得到一个hash值  
   //根据那个hash值加入到对应的in队列里,run()里会监听到阻塞队列有处理packet,则处理之  
   boolean result = in_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());  

        if (result) {  
            ++statReceivedPacketsOk;  
        } else {  

            // Queue overflow!  
            ++statReceivedPacketsEr;  
        }  

        return result;  
    } 

同上,只不过这个是放入out处理队列,由out线程监听处理之

protected boolean AbstractMessageReceiver.addOutPacketNB(Packet packet) {  
    int queueIdx = Math.abs(hashCodeForPacket(packet) % out_queues_size);  
    boolean result = false;  
       //放到相应的out队列里等待处理  
result = out_queues.get(queueIdx).offer(packet, packet.getPriority().ordinal());  
    if (result) {  
        ++statSentPacketsOk;  
    } else {  

        ++statSentPacketsEr;  

    }  

    return result;  
}

当我们知道如果要用哪一个AbstractMessageReceiver的实现类来处理packet的时候,我只需要把packet投递到对应的实现类(如SessionManager)里的in_queue或out_queue里就可以了,因为这些实现类开启了多条in 和out线程在等待处理in out队列里的packet,所以当客户端和服务器端建立连接后,发来有效的packet,而这个packet被投递的入口要从下面分析说起了。

public void ConnectionManager.accept(SocketChannel sc) {  
IO serv = getXMPPIOServiceInstance(); //每一个接受到的新socket都有一个与之对应的ioservice  
//这个ioservice设置监听器ConnectionManager.this可能是(ClientConnectionManager,  
//S2SConnectionManager,BoshConnectionManager,WebSocketClientConnectionManager)的对象,主要是看是这些客户端socket是从哪个服务端socket对象监听的端口进来的  
serv.setIOServiceListener(ConnectionManager.this);  
}  

//前面ConnectionOpenThread分析的章节已经详细分析过了,当有数据包来的时间,客户端socket是由服务端创建的一个对应的IoService作为处理类的。在SocketThread中监听到的socket有可以处理的数据时completionService.submit(serv);用线程池里的线程来执行IOService.call()方法,开始进入数据处理  
private IOServiceListener<IOService<RefObject>> serviceListener  = null;  
public IOService<?> IOService.call() throws IOException {  
        writeData(null);  

        boolean readLock = true;  

        if (stopping) {  
            stop();  
        } else {  
            readLock = readInProgress.tryLock();  
            if (readLock) {  
                try {  
                    processSocketData();//执行具体的子类的处理数据方法  
                                                           该方法包括解析的数据封装成packet  
                if ((receivedPackets() > 0) && (serviceListener != null)) {  
                       //由前面分析可知道执行ConnectionManager子类的方法开始处                                     理数据包packet  
                                    serviceListener.packetsReady(this);  
                    }    // end of if (receivedPackets.size() > 0)  
                } finally {  
                    readInProgress.unlock();  
                    if (!isConnected()) {  

                        forceStop();  
                    }  
                }  
            }  
        }  

        return readLock  
                ? this  
                : null;  
    }

public void ConnectionManager.packetsReady(IO serv) throws IOException {  

        if (checkTrafficLimits(serv)) {  
                //processSocketData(serv)是读入方向的packet处理入口  
                //writePacketsToSocket 是写出方向的packet处理方法  
        writePacketsToSocket(serv, processSocketData(serv));  
        }  
    }

processSocketData可能的继承实现结构,由图可见,执行processSocketData(IO serv)由继承ConnectionManager的子类的重写方法。

94_4.png

//拿实现类ClientConnectionManager来分析
public Queue<Packet> ClientConnectionManager.processSocketData(XMPPIOService<Object> serv) {

        JID id = serv.getConnectionId();

                 Packet p = null;

        while ((p = serv.getReceivedPackets().poll()) != null) {
               if (p.getAttributeStaticStr(Packet.XMLNS_ATT) == null) {
                p.setXMLNS(XMLNS);
            }

            if (p.getStanzaFrom() != null) {
                p.initVars(null, p.getStanzaTo());
            }
                        //设置包的from值
            p.setPacketFrom(id);

            JID receiver = serv.getDataReceiver();
                        //设置包的to值
            if (receiver != null) {
                   p.setPacketTo(serv.getDataReceiver());
                        //投递到(如ClientConnectionManager类)中的out_queue队列中,
                          由run()处理,这个包直到处理完成经过多次投递,
               addOutPacket(p);
            } else {

            }

        }    // end of while ()

        return null;
    }

      //投递packet到相应的out_queue列队中
      protected boolean addOutPacket(Packet packet) {
    int queueIdx = Math.abs(hashCodeForPacket(packet) % out_queues_size);

    try {
        out_queues.get(queueIdx).put(packet, packet.getPriority().ordinal());
        ++statSentPacketsOk;
     } catch (InterruptedException e) {
        ..
     }    // end of try-catch

        return true;
    }

文章永久链接:https://tech.souyunku.com/29964

未经允许不得转载:搜云库技术团队 » tigase源码分析4:packet处理

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们