一. 前言
最近有很多小伙伴開始找工作,在面試時,面試官經常問到一個題目:RabbitMQ如何防止重復消費?
有很多小伙伴這個時候都在想,消息怎么就會重復消費呢???.......
所以他們在面試后就跑來問小編,針對這個比較高頻的題目,小編就在這里為大家來講講MQ防止重復消費的實現方案吧。
二. 面試題考點
如果面試官是小編的話,那么我想考察的,其實就是候選人除了對技術的基本使用之外,再就是在各種實際應用場景中對可能發生問題的實際處理能力。
所以這道題的考點,最起碼有兩點:
第一是RabbitMQ中消息的重復消費是如何產生的,我們首先要發現問題,知道問題產生原因:
第二是針對這個重復消費問題的處理方案及機制。
三. 解題分析
接下來小編就根據上述考點,帶大家來一起分析這個問題的解題思路。
3.1RabbitMQ消息重復消費的產生原因
根據上圖,給大家梳理總結出了消息重復消費的產生過程,如下:
消費方的業務項目從MQ隊列中接收數據;
接著處理業務;
業務處理成功后,消費方項目給MQ返回ack進行手動確認;
返回回調執行結果的過程中,因為網絡抖動等原因,回調數據時,MQ沒有返回成功,所以MQ隊列中的數據會再次發給業務項目,造成重復消費。
3.2. RabbitMQ消息重復消費的處理方案
針對消息的重復消費問題,根據上圖總結的解決思路如下:
監聽器接收MQ隊列中的數據:
利用redis的setnx命令,以消息唯一id為key,以消息內容為value,超時時間設置為10秒,存入redis中;
如果能夠成功存入,說明沒有重復消費,則處理業務,處理完業務后返回ack或者nack確認;
如果存不進去,則說明重復消費,直接返回ack確認的回調信息就可以了。
3.3解決重復消費的案例代碼
發送方測試代碼
/**
* 測試發送
* @author 千鋒
*/
@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() throws IOException {
//給消息封裝一個唯一id對象
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
//第四個參數: 設置消息唯一id
rabbitTemplate.convertAndSend("交換器名字","路由鍵","千鋒測試MQ重復消費處理!!",messageId);
}
}
接收方測試代碼
package com.qf.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 千鋒
*/
@Component
public class Consumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "隊列名字")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 獲取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//1. 設置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消費消息
System.out.println("接收到消息:" + msg);
//3. 設置key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 獲取Redis中的value即可 如果是1,手動ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
四. 總結
經過上面的分析,最后健哥再給大家總結一下這個問題的完整答案。
問題產生原因:
因為消費方和MQ服務器網絡閃斷等原因,造成了接收方消費后,返回給MQ服務器一個ack確認消息,結果MQ沒有接收到,造成了重復消費。
解決過程:
利用redis的setnx命令,將消費的消息id存入到redis,超時時間設置為10秒,然后再給mq返回ack。消費前要判斷redis中是否存在這個消息id,如果不存在說明沒有消費過,則正常消費;如果redis中存在這個消息id,則說明重復消費,直接返回ack,不重復執行業務。
以上就是MQ中消息重復消費的產生原因及解決思路和對應案例,現在你知道該怎么解決了嗎?更多關于“Java培訓”的問題,歡迎咨詢千鋒教育在線名師。千鋒已有十余年的培訓經驗,課程大綱更科學更專業,有針對零基礎的就業班,有針對想提升技術的好程序員班,高品質課程助力你實現java程序員夢想。