一、groupid的定義
在使用Kafka的時候,我們經常會看到group.id這個配置項,它是一個字符串類型的配置項。具體來說,每個消費者都有一個group id,一般情況下我們可以將同樣處理某個數據源的消費者放置在一組中,使用group id進行標識。
舉個例子,如果你有一個在多個地方運行的日志處理程序,每個程序都會處理某個topic的消息,那么你可以用相同的group id來標識這個處理組,以確保傳遞給組中的每個處理程序的消息是唯一的。
二、groupid的作用
Kafka通過group id分配消費者之間的消息,確保一個組內的消費者不會接收到相同的消息。當同一個group id下的多個消費者訂閱了同一個topic時,每個消息將只能被一個消費者消費。
在多個消費者共同消費一個topic的場景下,可以通過groupid來做load balance,即通過groupid的設置,部署多個消費者實例來對消息進行消費。
三、groupid的注意事項
1、group id需要唯一
在同一個Kafka集群中,group id需要唯一,如果兩個group使用了相同的groupid,它們就會消費相同的消息,造成消息的重復消費。
2、重新啟動后,groupid也需要唯一
如果在同一個group中,消費者重啟或新加入消費者組,那么每次加入新消費者之前,需要確保添加的消費者的group id在之前沒有被使用過。
3、group id的更改會導致消費者重新從頭開始消費
Kafka集群會為group id下的每個消費者保存消費的偏移量,如果group id被更改,消費者將會從頭開始消費。
四、實例代碼
// 配置項
properties.put("group.id", "test-group");
// 創建消費者
KafkaConsumer consumer = new KafkaConsumer<>(properties);
// 訂閱topic
consumer.subscribe(Arrays.asList("test-topic"));
// 消費消息
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
五、小結
Kafka是一個分布式的消息隊列,通過group id來保證消費者組內的消息處理具有唯一性,可以做到消息的負載均衡和處理組內消息的互斥性。在使用時需要注意group id的唯一性以及更改group id的影響等問題。