专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

RabbitMQ消息确认机制—消息发送确认和消息接收确认

/**
* RabbitMQ消息确认机制
* 关于rabbit的生产和消费方的一些实用的操作;
* producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失

*/

/**
     * producer的confirm模式
     * 业务场景描述:
     * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加,
     * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信;
     * 此时插入mq消息的服务为了保证给所有用户发消息,并且要在短时间内插入完成(因此用到了异步插入方式(快速)),
     * 我们就需要知道每次插入mq是否成功,如果不成功那我们可以收集失败的信息后补发(因此confirm模式排上了用场);
     * 开启confirm模式后,返回send结果(成功或失败)
     */

    public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback){
        CachingConnectionFactory connectionFactory = null;
        return getRabbitTemplate(connectionFactory, confirmCallback);

    }

    //producer生产 - confirm模式
    public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory,RabbitTemplate.ConfirmCallback confirmCallback){
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        //product开启confirm模式
        connectionFactory.setPublisherConfirms(true);
        //设置confirm回调处理
        template.setConfirmCallback(confirmCallback);
        return template;

    }
/**
     * consumer的ack模式
     * 场景描述:短信服务去消费mq队列信息时,倘若服务调用的运营商发送短信接口异常了(短信运营商接口欠费),
     * 我们此时的短信是发送失败的,用户也收不到短信,但是在默认(默认开启ack)前提下mq消息已经被消费了rabbit中没有记录了(kafka例外);
     * 想要mq消息在业务逻辑异常时还存在,那么可以使用ack方式;
     * 业务无异常,发送ack标识,mq消息释放
     * 
     * 在springboot中可以使用基于amqp封装的工厂类关闭自动ack模式,改为手动ack方式;
     * 只有当业务代码流程走完后,最后通过代码设置ack标识,来通知rabbit消息可以丢弃了;
     * 如果设置了手动模式后,又没有提交ack标识,那么mq中的消息一直存在无法释放(每次consumer消费后,rabbit会把noack的消息重复放入队列中):
     */

    public SimpleRabbitListenerContainerFactory listenerContainerFactory(){
        ConnectionFactory connectionFactory = null;
        return listenerContainerFactory(connectionFactory);
    }

    public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //代码手动ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //开启消费者数量
        factory.setConcurrentConsumers(2);
        //手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量
        //每次接受数据量,默认250
        factory.setPrefetchCount(300);
        return factory;
    }

    /**
     * 消息确认–ACK
     * 通过连接工厂设置手动ack方式,然后获取mq消息后,走完正常业务逻辑,最后再手动通知ack释放消息,如下:
     */
    private void firstNodeListener(String msg,Channel channel,Message message){
        try {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("firstNodeListener - 消费消息 [" + deliveryTag + "] - " + msg);
            //这里ack主要根据mq消息的唯一编号(deliverTag)来通知;
            //如果我们不设置ack确认,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中
            //忘记通过basicAck返回确认信息,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息
            channel.basicAck(deliveryTag, true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

本文源自:https://tech.souyunku.com/wangrudong003/p/11436990.html

文章永久链接:https://tech.souyunku.com/48474

未经允许不得转载:搜云库技术团队 » RabbitMQ消息确认机制—消息发送确认和消息接收确认

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们