IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

九、RabbitMQ-客户端源码之Consumer

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

作者:朱小厮
出自:https://hiddenpps.blog.csdn.net/column/info/14800

[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。

这里先来看下消费者客户端的关键代码:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicQos(32);
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [X] Received '" + message + "'");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}

可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。

在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:

if (method instanceof Basic.Deliver) {
    processDelivery(command, (Basic.Deliver) method);
    return true;
} 

之后调用processDelivery方法:

protected void processDelivery(Command command, Basic.Deliver method) {
    Basic.Deliver m = method;

    Consumer callback = _consumers.get(m.getConsumerTag());
    if (callback == null) {
        if (defaultConsumer == null) {
            throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");
        }
        else {
            callback = defaultConsumer;
        }
    }

    Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());
    try {
        this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());
    } catch (Throwable ex) {
        getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");
    }
}

这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。

我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:

@Override public void handleDelivery(String consumerTag,
                           Envelope envelope,
                           AMQP.BasicProperties properties,
                           byte[] body)
    throws IOException
{
    checkShutdown();
    this._queue.add(new Delivery(envelope, properties, body));
}

这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:

public Delivery nextDelivery()
    throws InterruptedException, ShutdownSignalException, ConsumerCancelledException
{
    return handle(_queue.take());
}

private Delivery handle(Delivery delivery) {
    if (delivery == POISON ||
        delivery == null && (_shutdown != null || _cancelled != null)) {
        if (delivery == POISON) {
            _queue.add(POISON);
            if (_shutdown == null && _cancelled == null) {
                throw new IllegalStateException(
                    "POISON in queue, but null _shutdown and null _cancelled. " +
                    "This should never happen, please report as a BUG");
            }
        }
        if (null != _shutdown)
            throw Utility.fixStackTrace(_shutdown);
        if (null != _cancelled)
            throw Utility.fixStackTrace(_cancelled);
    }
    return delivery;
}

这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。

附录:RabbitMQ-客户端源码系列文章


Warning: A non-numeric value encountered in /data/wangzhan/tech.souyunku.com.wp/wp-content/themes/dux/functions-theme.php on line 1154
赞(68) 打赏



未经允许不得转载:搜云库技术团队 » 九、RabbitMQ-客户端源码之Consumer

IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码
IDEA2023.1.3破解,IDEA破解,IDEA 2023.1破解,最新IDEA激活码

评论 抢沙发

大前端WP主题 更专业 更方便

联系我们联系我们

觉得文章有用就打赏一下文章作者

微信扫一扫打赏

微信扫一扫打赏


Fatal error: Uncaught Exception: Cache directory not writable. Comet Cache needs this directory please: `/data/wangzhan/tech.souyunku.com.wp/wp-content/cache/comet-cache/cache/https/tech-souyunku-com/index.q`. Set permissions to `755` or higher; `777` might be needed in some cases. in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php:367 Stack trace: #0 [internal function]: WebSharks\CometCache\Classes\AdvancedCache->outputBufferCallbackHandler() #1 /data/wangzhan/tech.souyunku.com.wp/wp-includes/functions.php(5109): ob_end_flush() #2 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(303): wp_ob_end_flush_all() #3 /data/wangzhan/tech.souyunku.com.wp/wp-includes/class-wp-hook.php(327): WP_Hook->apply_filters() #4 /data/wangzhan/tech.souyunku.com.wp/wp-includes/plugin.php(470): WP_Hook->do_action() #5 /data/wangzhan/tech.souyunku.com.wp/wp-includes/load.php(1097): do_action() #6 [internal function]: shutdown_action_hook() #7 {main} thrown in /data/wangzhan/tech.souyunku.com.wp/wp-content/plugins/comet-cache/src/includes/traits/Ac/ObUtils.php on line 367