00
引言
01
Kafka消费滞后现象解析
分区滞后量 = 该分区最新消息位置(Log End Offset) - 消费者在该分区的消费位置(Consumer Offset)
复制
02
Kafka消费滞后的常见原因分析
03
监控Kafka消费滞后的方法与工具
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import java.util.concurrent.ExecutionException;
public class KafkaLagMonitor {
public static void main(String[] args) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient adminClient = AdminClient.create(props)) {
// 获取所有消费者组
ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> groups = groupsResult.all().get();
for (ConsumerGroupListing group : groups) {
String groupId = group.groupId();
System.out.println("消费者组: " + groupId);
// 获取消费者组的消费位置
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// 获取主题分区的最新位置
Set<TopicPartition> partitions = offsets.keySet();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
partitions.stream().collect(
HashMap::new,
(m, tp) -> m.put(tp, OffsetSpec.latest()),
HashMap::putAll
)
).all().get();
// 计算并显示滞后量
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
TopicPartition partition = entry.getKey();
long consumerOffset = entry.getValue().offset();
long endOffset = endOffsets.get(partition).offset();
long lag = endOffset - consumerOffset;
System.out.printf("主题: %s, 分区: %d, 消费位置: %d, 最新位置: %d, 滞后量: %d%n",
partition.topic(), partition.partition(), consumerOffset, endOffset, lag);
}
System.out.println("-----------------------------------");
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
复制
kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}"下的records-lag-max和records-lag-avg:分别表示最大滞后量和平均滞后量。 kafka.consumer:type=consumer-coordinator-metrics,client-id="{client-id}"下的commit-latency-avg和commit-latency-max:表示提交消费位置的平均延迟和最大延迟。
import javax.management.*;
import java.lang.management.ManagementFactory;
import java.util.Set;
public class KafkaJmxLagMonitor {
public static void main(String[] args) throws Exception {
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
// 查询所有消费者的JMX指标
ObjectName objectName = new ObjectName("kafka.consumer:type=consumer-fetch-manager-metrics,*");
Set<ObjectName> objectNames = mBeanServer.queryNames(objectName, null);
for (ObjectName name : objectNames) {
String clientId = name.getKeyProperty("client-id");
// 获取最大滞后量
Object maxLag = mBeanServer.getAttribute(name, "records-lag-max");
// 获取平均滞后量
Object avgLag = mBeanServer.getAttribute(name, "records-lag-avg");
System.out.printf("消费者: %s, 最大滞后量: %s, 平均滞后量: %s%n",
clientId, maxLag, avgLag);
}
}
}
复制
// 这不是Java代码,而是配置示例
// Prometheus配置文件 prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-exporter:9308']
// Kafka Exporter启动命令
kafka-exporter --kafka.server=kafka:9092 --group.filter=.*
复制
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class CustomLagMonitor {
private final AdminClient adminClient;
private final String groupId;
private final String topic;
private final long lagThreshold;
public CustomLagMonitor(String bootstrapServers, String groupId, String topic, long lagThreshold) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.adminClient = AdminClient.create(props);
this.groupId = groupId;
this.topic = topic;
this.lagThreshold = lagThreshold;
}
public void startMonitoring(long intervalSeconds) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(this::checkLag, 0, intervalSeconds, TimeUnit.SECONDS);
}
private void checkLag() {
try {
// 获取消费者组的消费位置
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// 过滤出指定主题的分区
Set<TopicPartition> partitions = new HashSet<>();
for (TopicPartition partition : offsets.keySet()) {
if (partition.topic().equals(topic)) {
partitions.add(partition);
}
}
// 获取主题分区的最新位置
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
partitions.stream().collect(
HashMap::new,
(m, tp) -> m.put(tp, OffsetSpec.latest()),
HashMap::putAll
)
).all().get();
// 计算总滞后量
long totalLag = 0;
for (TopicPartition partition : partitions) {
long consumerOffset = offsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
long lag = endOffset - consumerOffset;
totalLag += lag;
System.out.printf("分区: %d, 滞后量: %d%n", partition.partition(), lag);
}
System.out.printf("主题: %s, 消费者组: %s, 总滞后量: %d%n", topic, groupId, totalLag);
// 检查是否超过阈值,发送告警
if (totalLag > lagThreshold) {
sendAlert(topic, groupId, totalLag);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void sendAlert(String topic, String groupId, long lag) {
// 实现告警逻辑,如发送邮件、短信、钉钉消息等
System.out.printf("告警: 主题 %s 的消费者组 %s 滞后量达到 %d,超过阈值 %d%n",
topic, groupId, lag, lagThreshold);
}
public static void main(String[] args) {
CustomLagMonitor monitor = new CustomLagMonitor(
"localhost:9092", // Kafka服务器地址
"my-consumer-group", // 消费者组ID
"my-topic", // 主题名称
1000 // 滞后阈值
);
monitor.startMonitoring(60); // 每60秒检查一次
}
}
复制
04
解决Kafka消费滞后的策略与最佳实践
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
public class MultiThreadedConsumer {
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executorService;
private final int numWorkers;
private final String topic;
private volatile boolean running = true;
public MultiThreadedConsumer(String bootstrapServers, String groupId, String topic, int numWorkers) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
this.consumer = new KafkaConsumer<>(props);
this.executorService = Executors.newFixedThreadPool(numWorkers);
this.numWorkers = numWorkers;
this.topic = topic;
}
public void start() {
consumer.subscribe(Collections.singletonList(topic));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
// 将消息分配给多个工作线程处理
Map<TopicPartition, List<ConsumerRecord<String, String>>> partitionRecords = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
partitionRecords.put(partition, records.records(partition));
}
CountDownLatch latch = new CountDownLatch(partitionRecords.size());
for (Map.Entry<TopicPartition, List<ConsumerRecord<String, String>>> entry : partitionRecords.entrySet()) {
TopicPartition partition = entry.getKey();
List<ConsumerRecord<String, String>> partitionRecord = entry.getValue();
executorService.submit(() -> {
try {
processRecords(partitionRecord);
// 记录最后处理的偏移量
long lastOffset = partitionRecord.get(partitionRecord.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(
partition, new OffsetAndMetadata(lastOffset + 1)
));
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
});
}
try {
// 等待所有分区的消息处理完成
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
consumer.close();
executorService.shutdown();
}
private void processRecords(List<ConsumerRecord<String, String>> records) {
// 实际的消息处理逻辑
for (ConsumerRecord<String, String> record : records) {
System.out.printf("处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 模拟处理时间
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void stop() {
running = false;
}
public static void main(String[] args) {
MultiThreadedConsumer consumer = new MultiThreadedConsumer(
"localhost:9092", // Kafka服务器地址
"multi-threaded-group", // 消费者组ID
"my-topic", // 主题名称
10 // 工作线程数
);
consumer.start();
}
}
复制
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class BatchProcessingConsumer {
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final int batchSize;
private volatile boolean running = true;
public BatchProcessingConsumer(String bootstrapServers, String groupId, String topic, int batchSize) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(batchSize));
this.consumer = new KafkaConsumer<>(props);
this.topic = topic;
this.batchSize = batchSize;
}
public void start() {
consumer.subscribe(Collections.singletonList(topic));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
// 按分区批量处理消息
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
processBatch(partitionRecords);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
currentOffsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
// 提交所有分区的偏移量
consumer.commitSync(currentOffsets);
}
}
consumer.close();
}
private void processBatch(List<ConsumerRecord<String, String>> records) {
// 收集批量数据
List<String> messages = new ArrayList<>(records.size());
for (ConsumerRecord<String, String> record : records) {
messages.add(record.value());
}
// 批量处理
System.out.printf("批量处理 %d 条消息%n", messages.size());
// 这里是实际的批量处理逻辑,如批量写入数据库、批量调用API等
// 示例中只是简单打印
for (int i = 0; i < messages.size(); i += 100) {
int end = Math.min(i + 100, messages.size());
List<String> batch = messages.subList(i, end);
System.out.printf("处理子批次: %d-%d%n", i, end - 1);
// 模拟批量处理
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
public void stop() {
running = false;
}
public static void main(String[] args) {
BatchProcessingConsumer consumer = new BatchProcessingConsumer(
"localhost:9092", // Kafka服务器地址
"batch-processing-group", // 消费者组ID
"my-topic", // 主题名称
1000 // 批次大小
);
consumer.start();
}
}
复制
Properties props = new Properties();
// 基本配置
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "optimized-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 性能优化配置
// 每次拉取的最大消息数量,根据消息大小和处理能力调整
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
// 拉取请求的最大字节数,根据网络带宽和内存调整
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "1048576");
// 消费者拉取超时时间,避免长时间阻塞
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
// 消费者会话超时时间,根据环境稳定性调整
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
// 心跳间隔时间,通常设置为会话超时时间的1/3
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
// 消费位置自动提交间隔,如果手动提交则禁用
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 消费者拉取消息的最大时间间隔,超过此时间将触发重平衡
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
复制
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class ScalableConsumerGroup {
private final String bootstrapServers;
private final String groupId;
private final String topic;
private final ExecutorService executorService;
private final AtomicBoolean running = new AtomicBoolean(true);
private final int maxConsumers;
private int currentConsumers;
public ScalableConsumerGroup(String bootstrapServers, String groupId, String topic, int initialConsumers, int maxConsumers) {
this.bootstrapServers = bootstrapServers;
this.groupId = groupId;
this.topic = topic;
this.maxConsumers = maxConsumers;
this.currentConsumers = initialConsumers;
this.executorService = Executors.newCachedThreadPool();
}
public void start() {
// 启动初始数量的消费者
for (int i = 0; i < currentConsumers; i++) {
startConsumer();
}
// 启动监控线程,根据滞后情况动态调整消费者数量
executorService.submit(this::monitorAndScale);
}
private void startConsumer() {
executorService.submit(() -> {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(java.util.Collections.singletonList(topic));
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(java.time.Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}
}
});
}
private void processRecord(ConsumerRecord<String, String> record) {
// 实际的消息处理逻辑
System.out.printf("处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 模拟处理时间
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void monitorAndScale() {
while (running.get()) {
try {
// 获取当前消费滞后情况
long currentLag = getCurrentLag();
System.out.printf("当前滞后量: %d, 当前消费者数量: %d%n", currentLag, currentConsumers);
// 根据滞后情况调整消费者数量
if (currentLag > 10000 && currentConsumers < maxConsumers) {
// 滞后量大,增加消费者
int newConsumers = Math.min(currentConsumers + 2, maxConsumers);
int consumersToAdd = newConsumers - currentConsumers;
System.out.printf("滞后量过大,增加 %d 个消费者%n", consumersToAdd);
for (int i = 0; i < consumersToAdd; i++) {
startConsumer();
}
currentConsumers = newConsumers;
} else if (currentLag < 1000 && currentConsumers > 1) {
// 滞后量小,减少消费者(保留至少一个)
currentConsumers = Math.max(currentConsumers - 1, 1);
System.out.println("滞后量较小,减少消费者数量至: " + currentConsumers);
// 注意:这里只是减少计数,实际的消费者会在下一次重平衡时自动调整
}
// 每分钟检查一次
Thread.sleep(60000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private long getCurrentLag() {
// 实现获取当前消费滞后量的逻辑
// 这里可以使用前面介绍的监控方法
// 简化起见,这里返回一个模拟值
return (long) (Math.random() * 20000);
}
public void stop() {
running.set(false);
executorService.shutdown();
}
public static void main(String[] args) {
ScalableConsumerGroup group = new ScalableConsumerGroup(
"localhost:9092", // Kafka服务器地址
"scalable-group", // 消费者组ID
"my-topic", // 主题名称
2, // 初始消费者数量
10 // 最大消费者数量
);
group.start();
}
}
复制
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class BackpressureExample {
private final String bootstrapServers;
private final String sourceTopic;
private final String targetTopic;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
private final AtomicLong currentLag = new AtomicLong(0);
private final long maxLag;
public BackpressureExample(String bootstrapServers, String sourceTopic, String targetTopic, long maxLag) {
this.bootstrapServers = bootstrapServers;
this.sourceTopic = sourceTopic;
this.targetTopic = targetTopic;
this.maxLag = maxLag;
}
public void start() {
// 启动监控线程
executorService.submit(this::monitorLag);
// 启动消费-生产线程
executorService.submit(this::consumeAndProduce);
}
private void monitorLag() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "lag-monitor");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(sourceTopic));
while (running.get()) {
// 获取分配的分区
consumer.poll(Duration.ofMillis(0));
consumer.assignment().forEach(partition -> {
// 获取最新位置
long endOffset = consumer.endOffsets(Collections.singleton(partition))
.get(partition);
// 获取当前消费位置
long currentOffset = consumer.position(partition);
// 计算滞后量
long lag = endOffset - currentOffset;
// 更新当前滞后量
currentLag.set(lag);
System.out.printf("当前滞后量: %d%n", lag);
});
Thread.sleep(5000); // 每5秒检查一次
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void consumeAndProduce() {
// 配置消费者
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "backpressure-consumer");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 配置生产者
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
consumer.subscribe(Collections.singletonList(sourceTopic));
while (running.get()) {
// 检查当前滞后量,如果超过最大值,则暂停消费
long lag = currentLag.get();
if (lag > maxLag) {
System.out.println("滞后量过大,暂停消费");
Thread.sleep(1000); // 暂停1秒
continue;
}
// 正常消费
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
String processedValue = processRecord(record);
// 发送到目标主题
producer.send(new ProducerRecord<>(targetTopic, record.key(), processedValue),
(metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
}
// 提交消费位置
consumer.commitSync();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private String processRecord(ConsumerRecord<String, String> record) {
// 实际的消息处理逻辑
System.out.printf("处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 模拟处理时间
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "processed-" + record.value();
}
public void stop() {
running.set(false);
executorService.shutdown();
}
public static void main(String[] args) {
BackpressureExample example = new BackpressureExample(
"localhost:9092", // Kafka服务器地址
"source-topic", // 源主题
"target-topic", // 目标主题
5000 // 最大滞后量
);
example.start();
}
}
复制
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
public class OptimizedDatabaseConsumer {
private final String bootstrapServers;
private final String topic;
private final String jdbcUrl;
private final String dbUser;
private final String dbPassword;
private final AtomicBoolean running = new AtomicBoolean(true);
private HikariDataSource dataSource;
public OptimizedDatabaseConsumer(String bootstrapServers, String topic,
String jdbcUrl, String dbUser, String dbPassword) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.jdbcUrl = jdbcUrl;
this.dbUser = dbUser;
this.dbPassword = dbPassword;
}
public void start() {
// 初始化数据库连接池
initDatabaseConnectionPool();
// 配置消费者
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "db-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
// 批量写入数据库
batchInsertToDatabase(records);
// 提交消费位置
consumer.commitSync();
}
}
}
}
private void initDatabaseConnectionPool() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(jdbcUrl);
config.setUsername(dbUser);
config.setPassword(dbPassword);
config.setMaximumPoolSize(20); // 根据需要调整连接池大小
config.setMinimumIdle(5);
config.setIdleTimeout(30000);
config.setConnectionTimeout(10000);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
dataSource = new HikariDataSource(config);
}
private void batchInsertToDatabase(ConsumerRecords<String, String> records) {
String sql = "INSERT INTO messages (message_key, message_value, topic, partition, offset) VALUES (?, ?, ?, ?, ?)";
try (Connection conn = dataSource.getConnection();
PreparedStatement pstmt = conn.prepareStatement(sql)) {
conn.setAutoCommit(false);
int batchSize = 0;
for (ConsumerRecord<String, String> record : records) {
pstmt.setString(1, record.key());
pstmt.setString(2, record.value());
pstmt.setString(3, record.topic());
pstmt.setInt(4, record.partition());
pstmt.setLong(5, record.offset());
pstmt.addBatch();
batchSize++;
// 每500条提交一次
if (batchSize >= 500) {
pstmt.executeBatch();
batchSize = 0;
}
}
// 提交剩余的批次
if (batchSize > 0) {
pstmt.executeBatch();
}
conn.commit();
System.out.printf("成功写入 %d 条消息到数据库%n", records.count());
} catch (SQLException e) {
e.printStackTrace();
}
}
public void stop() {
running.set(false);
if (dataSource != null) {
dataSource.close();
}
}
public static void main(String[] args) {
OptimizedDatabaseConsumer consumer = new OptimizedDatabaseConsumer(
"localhost:9092", // Kafka服务器地址
"db-topic", // 主题名称
"jdbc:mysql://localhost:3306/kafka_messages", // 数据库URL
"user", // 数据库用户名
"password" // 数据库密码
);
consumer.start();
}
}
复制
05
Kafka消费滞后的预防与长期维护策略
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
public class KafkaPerformanceTest {
private final String bootstrapServers;
private final String topic;
private final int messageCount;
private final int messageSize;
private final int consumerCount;
private final ExecutorService executorService;
public KafkaPerformanceTest(String bootstrapServers, String topic,
int messageCount, int messageSize, int consumerCount) {
this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.messageCount = messageCount;
this.messageSize = messageSize;
this.consumerCount = consumerCount;
this.executorService = Executors.newFixedThreadPool(consumerCount + 1);
}
public void runTest() throws InterruptedException {
// 创建测试主题(如果不存在)
createTestTopic();
// 启动消费者
CountDownLatch consumerLatch = new CountDownLatch(consumerCount);
AtomicLong totalConsumed = new AtomicLong(0);
long startTime = System.currentTimeMillis();
for (int i = 0; i < consumerCount; i++) {
executorService.submit(() -> {
try {
long consumed = runConsumer();
totalConsumed.addAndGet(consumed);
} finally {
consumerLatch.countDown();
}
});
}
// 启动生产者
executorService.submit(() -> runProducer());
// 等待所有消费者完成
consumerLatch.await();
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
// 计算性能指标
double messagesPerSec = 1000.0 * totalConsumed.get() duration;
double mbPerSec = messagesPerSec * messageSize (1024.0 * 1024.0);
System.out.printf("性能测试结果:%n");
System.out.printf("总消息数: %d%n", totalConsumed.get());
System.out.printf("总耗时: %.2f 秒%n", duration 1000.0);
System.out.printf("吞吐量: %.2f 消息/秒%n", messagesPerSec);
System.out.printf("吞吐量: %.2f MB/秒%n", mbPerSec);
executorService.shutdown();
}
private void createTestTopic() {
// 实际应用中应使用AdminClient创建主题
// 这里简化处理,假设主题已存在
}
private void runProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.LINGER_MS_CONFIG, "5");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
// 生成测试消息
StringBuilder messageBuilder = new StringBuilder();
for (int i = 0; i < messageSize; i++) {
messageBuilder.append('a');
}
String message = messageBuilder.toString();
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < messageCount; i++) {
String key = "key-" + i;
producer.send(new ProducerRecord<>(topic, key, message),
(metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
if (i % 10000 == 0) {
System.out.printf("已生产 %d 条消息%n", i);
}
}
}
System.out.println("生产者完成");
}
private long runConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "perf-test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
long count = 0;
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
while (count < messageCount) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 模拟消息处理
count++;
if (count % 10000 == 0) {
System.out.printf("消费者已处理 %d 条消息%n", count);
}
if (count >= messageCount) {
break;
}
}
}
}
return count;
}
public static void main(String[] args) throws InterruptedException {
KafkaPerformanceTest test = new KafkaPerformanceTest(
"localhost:9092", // Kafka服务器地址
"perf-test-topic", // 测试主题
1000000, // 消息数量
1024, // 消息大小(字节)
3 // 消费者数量
);
test.runTest();
}
}
复制
这个性能测试示例可以帮助评估Kafka生产者和消费者的处理能力。通过调整消息数量、消息大小和消费者数量等参数,可以模拟不同的负载场景,找出系统的性能瓶颈和最大处理能力。在实际应用中,应当根据性能测试结果,预留足够的处理能力冗余,以应对流量波动和突发情况。
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.HTTPServer;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class KafkaLagMonitoringSystem {
private final AdminClient adminClient;
private final Set<String> monitoredGroups;
private final Set<String> monitoredTopics;
private final ScheduledExecutorService scheduler;
private HTTPServer server;
// Prometheus指标
private final Gauge lagGauge = Gauge.build()
.name("kafka_consumer_group_lag")
.help("Kafka消费者组滞后量")
.labelNames("group", "topic", "partition")
.register();
private final Counter errorCounter = Counter.build()
.name("kafka_lag_monitor_errors")
.help("监控过程中的错误数")
.register();
public KafkaLagMonitoringSystem(String bootstrapServers, Set<String> groups, Set<String> topics) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.adminClient = AdminClient.create(props);
this.monitoredGroups = groups;
this.monitoredTopics = topics;
this.scheduler = Executors.newScheduledThreadPool(1);
}
public void start(int intervalSeconds, int prometheusPort) throws IOException {
// 启动Prometheus HTTP服务器
server = new HTTPServer(prometheusPort);
// 定期执行监控任务
scheduler.scheduleAtFixedRate(this::monitorLag, 0, intervalSeconds, TimeUnit.SECONDS);
System.out.printf("监控系统已启动,Prometheus指标暴露在端口 %d%n", prometheusPort);
}
private void monitorLag() {
try {
// 如果没有指定消费者组,则获取所有消费者组
Set<String> groupsToMonitor = monitoredGroups;
if (groupsToMonitor.isEmpty()) {
ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
Collection<ConsumerGroupListing> groups = groupsResult.all().get();
groupsToMonitor = new HashSet<>();
for (ConsumerGroupListing group : groups) {
groupsToMonitor.add(group.groupId());
}
}
for (String groupId : groupsToMonitor) {
// 获取消费者组的消费位置
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// 过滤出需要监控的主题
Set<TopicPartition> partitionsToCheck = new HashSet<>();
for (TopicPartition partition : offsets.keySet()) {
if (monitoredTopics.isEmpty() || monitoredTopics.contains(partition.topic())) {
partitionsToCheck.add(partition);
}
}
if (partitionsToCheck.isEmpty()) {
continue;
}
// 获取主题分区的最新位置
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
partitionsToCheck.stream().collect(
HashMap::new,
(m, tp) -> m.put(tp, OffsetSpec.latest()),
HashMap::putAll
)
).all().get();
// 计算并记录滞后量
for (TopicPartition partition : partitionsToCheck) {
OffsetAndMetadata metadata = offsets.get(partition);
if (metadata != null) {
long consumerOffset = metadata.offset();
long endOffset = endOffsets.get(partition).offset();
long lag = endOffset - consumerOffset;
// 更新Prometheus指标
lagGauge.labels(groupId, partition.topic(), String.valueOf(partition.partition()))
.set(lag);
System.out.printf("组: %s, 主题: %s, 分区: %d, 滞后量: %d%n",
groupId, partition.topic(), partition.partition(), lag);
}
}
}
} catch (Exception e) {
errorCounter.inc();
e.printStackTrace();
}
}
public void stop() {
scheduler.shutdown();
if (server != null) {
server.stop();
}
adminClient.close();
}
public static void main(String[] args) throws IOException {
// 配置要监控的消费者组和主题
Set<String> groups = new HashSet<>(Arrays.asList("group1", "group2"));
Set<String> topics = new HashSet<>(Arrays.asList("topic1", "topic2"));
KafkaLagMonitoringSystem monitor = new KafkaLagMonitoringSystem(
"localhost:9092", // Kafka服务器地址
groups, // 要监控的消费者组
topics // 要监控的主题
);
monitor.start(30, 8080); // 每30秒监控一次,Prometheus指标暴露在8080端口
}
}
复制
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1DeploymentSpec;
import io.kubernetes.client.openapi.models.V1Scale;
import io.kubernetes.client.openapi.models.V1ScaleSpec;
import io.kubernetes.client.util.Config;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class KafkaAutoScaler {
private final AdminClient adminClient;
private final String groupId;
private final String topic;
private final String namespace;
private final String deploymentName;
private final int minReplicas;
private final int maxReplicas;
private final long lagThresholdPerReplica;
private final ScheduledExecutorService scheduler;
private final AppsV1Api appsV1Api;
public KafkaAutoScaler(String bootstrapServers, String groupId, String topic,
String namespace, String deploymentName,
int minReplicas, int maxReplicas, long lagThresholdPerReplica) throws IOException {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
this.adminClient = AdminClient.create(props);
this.groupId = groupId;
this.topic = topic;
this.namespace = namespace;
this.deploymentName = deploymentName;
this.minReplicas = minReplicas;
this.maxReplicas = maxReplicas;
this.lagThresholdPerReplica = lagThresholdPerReplica;
this.scheduler = Executors.newScheduledThreadPool(1);
// 初始化Kubernetes客户端
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
this.appsV1Api = new AppsV1Api();
}
public void start(int intervalSeconds) {
// 定期执行自动扩缩容任务
scheduler.scheduleAtFixedRate(this::autoScale, 0, intervalSeconds, TimeUnit.SECONDS);
System.out.printf("自动扩缩容系统已启动,每 %d 秒检查一次%n", intervalSeconds);
}
private void autoScale() {
try {
// 获取当前滞后量
long totalLag = getTotalLag();
System.out.printf("当前总滞后量: %d%n", totalLag);
// 获取当前副本数
int currentReplicas = getCurrentReplicas();
System.out.printf("当前副本数: %d%n", currentReplicas);
// 计算所需副本数
int desiredReplicas = calculateDesiredReplicas(totalLag, currentReplicas);
System.out.printf("期望副本数: %d%n", desiredReplicas);
// 如果需要调整副本数,则更新Deployment
if (desiredReplicas != currentReplicas) {
updateReplicas(desiredReplicas);
System.out.printf("已将副本数从 %d 调整为 %d%n", currentReplicas, desiredReplicas);
}
} catch (Exception e) {
e.printStackTrace();
}
}
private long getTotalLag() throws Exception {
// 获取消费者组的消费位置
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
// 过滤出指定主题的分区
Set<TopicPartition> partitionsToCheck = new HashSet<>();
for (TopicPartition partition : offsets.keySet()) {
if (partition.topic().equals(topic)) {
partitionsToCheck.add(partition);
}
}
if (partitionsToCheck.isEmpty()) {
return 0;
}
// 获取主题分区的最新位置
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
partitionsToCheck.stream().collect(
HashMap::new,
(m, tp) -> m.put(tp, OffsetSpec.latest()),
HashMap::putAll
)
).all().get();
// 计算总滞后量
long totalLag = 0;
for (TopicPartition partition : partitionsToCheck) {
OffsetAndMetadata metadata = offsets.get(partition);
if (metadata != null) {
long consumerOffset = metadata.offset();
long endOffset = endOffsets.get(partition).offset();
long lag = endOffset - consumerOffset;
totalLag += lag;
}
}
return totalLag;
}
private int getCurrentReplicas() throws ApiException {
V1Scale scale = appsV1Api.readNamespacedDeploymentScale(deploymentName, namespace, null);
return scale.getSpec().getReplicas();
}
private int calculateDesiredReplicas(long totalLag, int currentReplicas) {
// 根据滞后量计算所需副本数
int desiredReplicas = (int) Math.ceil((double) totalLag lagThresholdPerReplica);
// 确保副本数在最小值和最大值之间
desiredReplicas = Math.max(desiredReplicas, minReplicas);
desiredReplicas = Math.min(desiredReplicas, maxReplicas);
// 避免频繁扩缩容,只有当需要增加或减少超过1个副本时才调整
if (Math.abs(desiredReplicas - currentReplicas) <= 1) {
return currentReplicas;
}
return desiredReplicas;
}
private void updateReplicas(int replicas) throws ApiException {
V1Scale scale = new V1Scale();
V1ScaleSpec spec = new V1ScaleSpec();
spec.setReplicas(replicas);
scale.setSpec(spec);
appsV1Api.replaceNamespacedDeploymentScale(deploymentName, namespace, scale, null, null, null, null);
}
public void stop() {
scheduler.shutdown();
adminClient.close();
}
public static void main(String[] args) throws IOException {
KafkaAutoScaler autoScaler = new KafkaAutoScaler(
"localhost:9092", // Kafka服务器地址
"my-consumer-group", // 消费者组ID
"my-topic", // 主题名称
"default", // Kubernetes命名空间
"kafka-consumer", // Deployment名称
2, // 最小副本数
10, // 最大副本数
10000 // 每个副本的滞后阈值
);
autoScaler.start(60); // 每60秒检查一次
}
}
复制
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class DisasterRecoveryConsumer {
private final String bootstrapServers;
private final String primaryGroupId;
private final String backupGroupId;
private final String topic;
private final long maxLagThreshold;
private final long checkIntervalMs;
private final AtomicBoolean running = new AtomicBoolean(true);
private final ExecutorService executorService = Executors.newFixedThreadPool(2);
private final AtomicBoolean backupActive = new AtomicBoolean(false);
public DisasterRecoveryConsumer(String bootstrapServers, String primaryGroupId, String backupGroupId,
String topic, long maxLagThreshold, long checkIntervalMs) {
this.bootstrapServers = bootstrapServers;
this.primaryGroupId = primaryGroupId;
this.backupGroupId = backupGroupId;
this.topic = topic;
this.maxLagThreshold = maxLagThreshold;
this.checkIntervalMs = checkIntervalMs;
}
public void start() {
// 启动监控线程
executorService.submit(this::monitorPrimaryConsumer);
// 启动备用消费者线程
executorService.submit(this::runBackupConsumer);
System.out.println("灾备消费系统已启动");
}
private void monitorPrimaryConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
try (AdminClient adminClient = AdminClient.create(props)) {
while (running.get()) {
try {
// 获取主消费者组的滞后情况
Map<TopicPartition, OffsetAndMetadata> offsets =
adminClient.listConsumerGroupOffsets(primaryGroupId).partitionsToOffsetAndMetadata().get();
// 过滤出指定主题的分区
Set<TopicPartition> partitions = new HashSet<>();
for (TopicPartition partition : offsets.keySet()) {
if (partition.topic().equals(topic)) {
partitions.add(partition);
}
}
if (partitions.isEmpty()) {
System.out.println("主消费者组未消费指定主题,激活备用消费者");
backupActive.set(true);
Thread.sleep(checkIntervalMs);
continue;
}
// 获取主题分区的最新位置
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(
partitions.stream().collect(
HashMap::new,
(m, tp) -> m.put(tp, OffsetSpec.latest()),
HashMap::putAll
)
).all().get();
// 计算总滞后量
long totalLag = 0;
for (TopicPartition partition : partitions) {
long consumerOffset = offsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
long lag = endOffset - consumerOffset;
totalLag += lag;
}
System.out.printf("主消费者组滞后量: %d%n", totalLag);
// 检查是否需要激活备用消费者
if (totalLag > maxLagThreshold) {
System.out.println("主消费者组滞后量超过阈值,激活备用消费者");
backupActive.set(true);
} else {
backupActive.set(false);
}
} catch (Exception e) {
System.out.println("监控主消费者时出错,激活备用消费者");
backupActive.set(true);
e.printStackTrace();
}
Thread.sleep(checkIntervalMs);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void runBackupConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, backupGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList(topic));
while (running.get()) {
// 检查备用消费者是否激活
if (backupActive.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
// 处理消息
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 提交消费位置
consumer.commitSync();
System.out.printf("备用消费者处理了 %d 条消息%n", records.count());
}
} else {
// 备用消费者未激活,短暂休眠
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// 实际的消息处理逻辑
System.out.printf("备用消费者处理消息: 主题=%s, 分区=%d, 偏移量=%d, 键=%s, 值=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
public void stop() {
running.set(false);
executorService.shutdown();
}
public static void main(String[] args) {
DisasterRecoveryConsumer consumer = new DisasterRecoveryConsumer(
"localhost:9092", // Kafka服务器地址
"primary-group", // 主消费者组ID
"backup-group", // 备用消费者组ID
"important-topic", // 主题名称
10000, // 最大滞后阈值
30000 // 检查间隔(毫秒)
);
consumer.start();
}
}
复制
06
总结
07
加群请添加作者

08
获取文档资料
推荐阅读系列文章
建议收藏 | Dinky系列总结篇 建议收藏 | Flink系列总结篇 建议收藏 | Flink CDC 系列总结篇 建议收藏 | Doris实战文章合集 建议收藏 | Paimon 实战文章总结 建议收藏 | Fluss 实战文章总结 建议收藏 | Seatunnel 实战文章系列合集 建议收藏 | 实时离线输数仓(数据湖)总结篇 建议收藏 | 实时离线数仓实战第一阶段总结 超700star!电商项目数据湖建设实战代码 ,拿来即用! 从0到1建设电商项目数据湖实战教程 推荐一套开源电商项目数据湖建设实战代码
如果喜欢 请点个在看分享给身边的朋友
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
kafka集群部署
easonhyj
37次阅读
2025-03-04 19:54:11
谈谈 ES 6.8 到 7.10 的功能变迁(6)- 其他
极限实验室
36次阅读
2025-03-01 23:59:20
缓存监控治理在游戏业务的实践和探索
vivo互联网技术
35次阅读
2025-03-20 09:51:10
如何使用 RisingWave、Kafka 和 Redis 构建实时推荐引擎
RisingWave中文开源社区
31次阅读
2025-03-10 10:30:31
Worker模块源码实战:万字长文解析DolphinScheduler如何实现亿级任务调度
海豚调度
30次阅读
2025-03-04 09:47:24
如何使用 RisingWave 和 PuppyGraph 构建高性能实时图分析框架
RisingWave中文开源社区
29次阅读
2025-03-18 10:49:54
Java的SPI机制详解
京东云开发者
27次阅读
2025-03-05 11:47:12
数仓建模:基于OTD流程的订单履约分析
会飞的一十六
27次阅读
2025-03-03 09:53:12
C#操作SQLite数据库
淡定
24次阅读
2025-03-25 23:27:25
Kafka删除Topic,弄不好会失败或发生故障
虞大胆的叽叽喳喳
21次阅读
2025-03-17 16:55:10