专注于 JetBrains IDEA 全家桶,永久激活,教程
持续更新 PyCharm,IDEA,WebStorm,PhpStorm,DataGrip,RubyMine,CLion,AppCode 永久激活教程

springboot项目中使用Kafka消息队列

1、Kafka消息生产者

1)创建springboot-kafka-producer-demo项目,导入jar包。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.5.RELEASE</version>
</dependency>

2)在application.properties添加Kafka producer配置项

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.template.default-topic=topic-test
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.concurrency= 3

spring.kafka.producer.client-id=${spring.application.name}
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#request.required.acks有三个值 0 1 -1
#0:生产者不会等待broker的ack,这个延迟最低但是存储的保证最弱当server挂掉的时候就会丢数据
#1:服务端会等待ack值 leader副本确认接收到消息后发送ack但是如果leader挂掉后他不确保是否复制完成新leader也会导致数据丢失
#-1:同样在1的基础上 服务端会等所有的follower的副本受到数据后才会受到leader发出的ack,这样数据不会丢
spring.kafka.producer.acks=-1

3)Kafka消息生产者启动类中加入@EnableKafka注解

4)编写Kafka消息发送类-Kafka消息生产者

@Slf4j
@Component
public class KafkaSender {

    @Autowired
    private Globals globals;
    @Resource
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息方法
     * @param msg
     */
    public ResponseEntity send(String msg) {
        log.info("发送消息,消息内容 : {}", msg);
        try {
            String uuid = UUID.randomUUID().toString();
            String topic = globals.getTopic();
            Message message = new Message();
            message.setId(uuid);
            message.setMsg(msg);
            message.setSendTime(new Date());

            ListenableFuture listenableFuture = kafkaTemplate.send(topic, uuid, JSON.toJSONString(message));

            //发送成功后回调
            SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {
                @Override
                public void onSuccess(SendResult<String,String> result) {
                    log.info("发送消息成功");
                }
            };
            //发送失败回调
            FailureCallback failureCallback = new FailureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("发送消息失败", ex);
                }
            };

            listenableFuture.addCallback(successCallback,failureCallback);
        }catch (Exception e){
            log.error("发送消息异常", e);
        }
        return new ResponseEntity("", HttpStatus.OK);
    }
}

5)发送消息测试类

public class MsgSenderTest extends KafkaProducerApplicationTest {
    @Autowired
    MsgSender sender;

    @Test
    public void send() {
        sender.send("这是Kafka发送的消息内容" + System.currentTimeMillis());
    }
}

2、Kafka消息消费者

1)创建springboot-kafka-consumer-demo项目,导入jar包。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.4.5.RELEASE</version>
</dependency>

2)在application.properties添加Kafka consumer配置项

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.template.default-topic=topic-test
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.concurrency=3
spring.cloud.bus.trace.enabled=true

#=============== consumer  =======================
spring.kafka.consumer.client-id=${spring.application.name}
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=test
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smalles
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
# 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=10
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3)Kafka消息消费者启动类中加入@EnableKafka注解

4)用@KafkaListener创建Kafka消息消费者类

@Component
public class KafkaBatchConsumer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = {"${spring.kafka.template.default-topic}"},
            containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<ConsumerRecord> records, Acknowledgment ack) {
        try {
            for (ConsumerRecord record : records) {
                logger.info("接收消息: offset = {}, key = {}, value = {} ",
                        record.offset(), record.key(), record.value());
            }
        } catch (Exception e) {
            logger.error("kafka接收消息异常",e);
        } finally {
            //手动提交偏移量
            ack.acknowledge();
        }
    }
}

3、测试Kafka

先启动Kafka消息消费者springboot-kafka-consumer-demo项目

1)在Kafka消息生产者的MsgSenderTest类执行send()方法,如下图

70_1.png

2)切换到Kafka消息消费者查看消费结果

70_2.png

源码示例:https://gitee.com/lion123/springboot-kafka-demo

4、一些问题

1)、kafka在高并发的情况下,如何避免消息丢失和消息重复?
消息丢失解决方案:
首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功
消息重复解决方案:
消息可以使用唯一id标识
生产者(ack=all 代表至少成功发送一次)
消费者 (offset手动提交,业务逻辑成功处理后,提交offset)
落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

2)、kafka怎么保证数据消费一次且仅消费一次
幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
事务(transaction):保证原子性地写入到多个分区,即写入到多个分区的消息要么全部成功,要么全部回滚流处理EOS:流处理本质上可看成是“读取-处理-写入”的管道。此EOS保证整个过程的操作是原子性。注意,这只适用于Kafka Streams

3)、kafka保证数据一致性和可靠性
数据一致性保证
一致性定义:若某条消息对client可见,那么即使Leader挂了,在新Leader上数据依然可以被读到
HW-HighWaterMark: client可以从Leader读到的最大msg offset,即对外可见的最大offset, HW=max(replica.offset)
对于Leader新收到的msg,client不能立刻消费,Leader会等待该消息被所有ISR中的replica同步后,更新HW,此时该消息才能被client消费,这样就保证了如果Leader fail,该消息仍然可以从新选举的Leader中获取。
对于来自内部Broker的读取请求,没有HW的限制。同时,Follower也会维护一份自己的HW,Folloer.HW = min(Leader.HW, Follower.offset)
数据可靠性保证
当Producer向Leader发送数据时,可以通过acks参数设置数据可靠性的级别
0: 不论写入是否成功,server不需要给Producer发送Response,如果发生异常,server会终止连接,触发Producer更新meta数据;
1: Leader写入成功后即发送Response,此种情况如果Leader fail,会丢失数据
-1: 等待所有ISR接收到消息后再给Producer发送Response,这是最强保证

4)、kafka到spark streaming怎么保证数据完整性,怎么保证数据不重复消费?
保证数据不丢失(at-least)
spark RDD内部机制可以保证数据at-least语义。
Receiver方式
开启WAL(预写日志),将从kafka中接受到的数据写入到日志文件中,所有数据从失败中可恢复。
Direct方式
依靠checkpoint机制来保证。
保证数据不重复(exactly-once)
要保证数据不重复,即Exactly once语义。
- 幂等操作:重复执行不会产生问题,不需要做额外的工作即可保证数据不重复。
- 业务代码添加事务操作
就是说针对每个partition的数据,产生一个uniqueId,只有这个partition的所有数据被完全消费,则算成功,否则算失效,要回滚。下次重复执行这个uniqueId时,如果已经被执行成功,则skip掉。

5)、你用的kafka是批量消费还是一条条消费?为什么这样用?
两种方式都有用到。
对用不重要的数据,如日志这些数据,使用的是批量;对用重要数据则不配置批量消费。
重要数据批量消费失败的话就要批量重新发送和消费。

文章永久链接:https://tech.souyunku.com/24931

未经允许不得转载:搜云库技术团队 » springboot项目中使用Kafka消息队列

JetBrains 全家桶,激活、破解、教程

提供 JetBrains 全家桶激活码、注册码、破解补丁下载及详细激活教程,支持 IntelliJ IDEA、PyCharm、WebStorm 等工具的永久激活。无论是破解教程,还是最新激活码,均可免费获得,帮助开发者解决常见激活问题,确保轻松破解并快速使用 JetBrains 软件。获取免费的破解补丁和激活码,快速解决激活难题,全面覆盖 2024/2025 版本!

联系我们联系我们