Kafka 的消費者可以使用兩種方式來提交消費位移(offset):自動提交和手動提交。自動提交是由 Kafka 客戶端自動定期提交位移,而手動提交則需要應用程序顯式地調用 API 來提交位移。手動提交位移的方式可以更精細地控制消費位移,以及避免因自動提交位移而產生的數據丟失或重復消費等問題。
下面是使用 Kafka Java API 手動提交位移的一些示例代碼:
1.啟用手動提交位移:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交位移
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
在創建 KafkaConsumer 對象時,將 enable.auto.commit 屬性設置為 false,以關閉自動提交位移的功能。
2.手動提交位移:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 處理消息
}
consumer.commitSync(); // 手動提交位移
}
在消費消息后,調用 commitSync() 方法來手動提交位移。如果需要批量提交位移,可以使用 commitSync(Map<topicpartition, offsetandmetadata=""> offsets) 方法來提交指定的分區和位移信息。
需要注意的是,手動提交位移需要在適當的時機進行提交,以確保數據不會丟失或重復消費。一般來說,可以在消費一批消息后,或者在處理完一段業務邏輯后,再進行位移提交。同時,還需要注意位移的提交順序,以保證數據的一致性。