01
分区(Partition)机制详解
// 分区消费示例代码
public class KafkaPartitionConsumerDemo {
private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";
private static final String TOPIC_NAME = "partition_demo_topic";
public static void main(String[] args) {
Properties props = new Properties();
// 配置基本参数
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "partition_demo_group");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 手动分配分区
TopicPartition partition0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition partition1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(partition0, partition1));
// 设置分区起始偏移量
consumer.seekToBeginning(Arrays.asList(partition0, partition1));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d, Offset: %d, Key: %s, Value: %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
}
复制
02
消费者组(Consumer Group)深入解析
消费者组是 Kafka 实现消息消费的核心机制,它提供了消息消费的可扩展性和故障容错能力。一个消费者组由一个或多个消费者实例组成,这些消费者共同消费订阅主题的消息。消费者组的设计遵循一个基本原则:一个分区只能被同一个消费者组中的一个消费者消费,但一个消费者可以同时消费多个分区。这种设计既保证了消费的负载均衡,又避免了消息重复消费的问题。
public class KafkaConsumerGroupDemo {
private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";
private static final String TOPIC_NAME = "group_demo_topic";
private static final String GROUP_ID = "group_demo";
public static void main(String[] args) {
// 启动多个消费者实例
int consumerCount = 3;
CountDownLatch latch = new CountDownLatch(consumerCount);
for (int i = 0; i < consumerCount; i++) {
final int consumerId = i;
new Thread(() -> {
try {
runConsumer(consumerId);
} finally {
latch.countDown();
}
}).start();
}
}
private static void runConsumer(int consumerId) {
Properties props = new Properties();
// 配置消费者组参数
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-" + consumerId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// 订阅主题
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.printf("Consumer-%d: Partitions revoked: %s%n", consumerId, partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.printf("Consumer-%d: Partitions assigned: %s%n", consumerId, partitions);
}
});
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(consumerId, record);
}
}
}
}
}
复制
这个示例展示了如何创建一个消费者组,并启动多个消费者实例共同消费消息。通过实现 ConsumerRebalanceListener 接口,我们可以监控分区的分配和撤销过程,这在需要在再平衡前后执行特定操作(如保存位移)时非常有用。在实际应用中、消费者组的配置需要根据业务场景和性能需求来调整,比如会话超时时间、心跳间隔等参数都会影响消费者组的行为和性能。
03
位移管理(Offset Management)详解
public class KafkaOffsetManagementDemo {
private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";
private static final String TOPIC_NAME = "offset_demo_topic";
private static final String GROUP_ID = "offset_demo_group";
public static void main(String[] args) {
Properties props = new Properties();
// 配置手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// 获取最后一条消息的位移
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 提交位移
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1));
try {
consumer.commitSync(offsetsToCommit);
System.out.printf("Committed offset %d for partition %s%n",
lastOffset + 1, partition);
} catch (CommitFailedException e) {
// 处理提交失败
handleCommitFailure(partition, lastOffset, e);
}
}
}
}
}
private static void handleCommitFailure(TopicPartition partition, long offset, Exception e) {
// 实现重试逻辑
System.err.printf("Failed to commit offset %d for partition %s: %s%n",
offset, partition, e.getMessage());
// 可以选择重试提交或者记录错误
}
private static void processRecord(ConsumerRecord<String, String> record) {
// 处理消息的业务逻辑
System.out.printf("Processing record: Partition=%d, Offset=%d, Value=%s%n",
record.partition(), record.offset(), record.value());
}
}
复制
这个示例展示了如何实现手动位移提交,包括同步提交和异常处理。在实际应用中,位移管理策略需要根据业务的可靠性要求来选择。对于要求高可靠性的场景,建议使用手动提交,并实现适当的重试机制和错误处理逻辑。
04
再平衡(Rebalance)机制详解
public class KafkaRebalanceDemo {
private static final String BOOTSTRAP_SERVERS = "192.168.241.128:9092";
private static final String TOPIC_NAME = "rebalance_demo_topic";
private static final String GROUP_ID = "rebalance_demo_group";
private static final Logger logger = LoggerFactory.getLogger(KafkaRebalanceDemo.class);
public static void main(String[] args) {
Properties props = new Properties();
// 基础配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 再平衡相关配置
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("Rebalance started - Partitions revoked: {}", partitions);
RebalanceMonitor.onRebalanceStart();
// 保存消费位移
for (TopicPartition partition : partitions) {
long position = consumer.position(partition);
saveOffsets(partition, position);
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Rebalance completed - Partitions assigned: {}", partitions);
RebalanceMonitor.onRebalanceComplete();
// 恢复消费位移
for (TopicPartition partition : partitions) {
long savedOffset = getSavedOffset(partition);
if (savedOffset >= 0) {
consumer.seek(partition, savedOffset);
}
}
}
});
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processRecordsWithRetry(records, consumer);
}
}
}
private static void processRecordsWithRetry(ConsumerRecords<String, String> records,
KafkaConsumer<String, String> consumer) {
int retries = 3;
boolean processed = false;
while (!processed && retries > 0) {
try {
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
consumer.commitSync();
processed = true;
} catch (Exception e) {
logger.error("Error processing records. Retries left: {}", --retries, e);
if (retries == 0) {
// 处理最终失败的情况
handleProcessingFailure(records);
}
}
}
}
}
复制
// 1. 静态成员配置
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
"consumer-" + UUID.randomUUID().toString());
// 2. 增量式再平衡配置
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
// 3. 再平衡监控实现
public class RebalanceMonitor {
private static final AtomicLong rebalanceCount = new AtomicLong(0);
private static final AtomicLong totalRebalanceDuration = new AtomicLong(0);
private static long lastRebalanceTime = 0;
public static void onRebalanceStart() {
lastRebalanceTime = System.currentTimeMillis();
rebalanceCount.incrementAndGet();
logger.info("Rebalance started. Total count: {}", rebalanceCount.get());
}
public static void onRebalanceComplete() {
long duration = System.currentTimeMillis() - lastRebalanceTime;
totalRebalanceDuration.addAndGet(duration);
double avgDuration = (double) totalRebalanceDuration.get() rebalanceCount.get();
logger.info("Rebalance completed in {} ms. Average duration: {} ms",
duration, avgDuration);
// 检查再平衡健康状况
checkRebalanceHealth(duration, avgDuration);
}
private static void checkRebalanceHealth(long duration, double avgDuration) {
if (duration > 10000 || avgDuration > 5000) {
logger.warn("Rebalance performance degradation detected!");
// 触发告警
alertRebalancePerformance(duration, avgDuration);
}
}
}
复制
06
最佳实践总结
Properties props = new Properties();
// 基础网络配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-" + UUID.randomUUID());
// 性能相关配置
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); // 最小抓取大小
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500"); // 最大等待时间
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500"); // 单次拉取最大记录数
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576"); // 分区获取大小
// 可靠性配置
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 最大轮询间隔
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); // 会话超时
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // 心跳间隔
复制
public class KafkaConsumerWithErrorHandling {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWithErrorHandling.class);
public void consume() {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(topics);
while (true) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
// 单条消息处理异常
handleSingleRecordError(record, e);
continue;
}
}
// 提交位移
try {
consumer.commitSync();
} catch (CommitFailedException e) {
handleCommitError(e);
}
} catch (WakeupException e) {
// 处理优雅关闭
handleWakeup();
break;
} catch (Exception e) {
// 处理其他异常
handleConsumerError(e);
}
}
}
}
private void handleSingleRecordError(ConsumerRecord<String, String> record, Exception e) {
logger.error("Error processing record: {}", record, e);
// 实现死信队列处理
sendToDeadLetterQueue(record);
}
private void handleCommitError(CommitFailedException e) {
logger.error("Failed to commit offsets", e);
// 实现重试逻辑
retryCommit();
}
}
复制
监控关键指标:消费延迟、处理时间、错误率等 实现优雅关闭机制 使用死信队列处理失败消息 实现适当的重试策略 保持完善的日志记录
07
加群请添加作者

08
获取文档及视频资料
推荐阅读系列文章
建议收藏 | Dinky系列总结篇 建议收藏 | Flink系列总结篇 建议收藏 | Flink CDC 系列总结篇 建议收藏 | Doris实战文章合集 建议收藏 | Seatunnel 实战文章系列合集 建议收藏 | 实时离线输数仓(数据湖)总结篇 建议收藏 | 实时离线数仓实战第一阶段总结
如果喜欢 请点个在看分享给身边的朋友
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
练习题一:窗口函数
小伙
132次阅读
2025-03-27 22:57:54
缓存监控治理在游戏业务的实践和探索
vivo互联网技术
42次阅读
2025-03-20 09:51:10
如何使用 RisingWave 和 PuppyGraph 构建高性能实时图分析框架
RisingWave中文开源社区
37次阅读
2025-03-18 10:49:54
Oracle函数
芃芃
31次阅读
2025-03-28 18:37:13
C#操作SQLite数据库
淡定
31次阅读
2025-03-25 23:27:25
Kafka删除Topic,弄不好会失败或发生故障
虞大胆的叽叽喳喳
27次阅读
2025-03-17 16:55:10
字符串替换研究
京东云开发者
24次阅读
2025-04-02 14:27:40
oracle分区表
芃芃
20次阅读
2025-04-05 16:37:38
《泛型:代码界的“套娃大师”》
让天下没有难学的编程
19次阅读
2025-03-27 23:33:39
高并发场景下的库存管理,理论与实战能否兼得?
京东云开发者
17次阅读
2025-03-24 16:54:56