抱歉,这是一篇迟来的实用操作贴,原付费内容,现在免费送给大家(已付费的用户别急,拿好手中剩余的ORC,等升值)。
鱼跃龙门,一门之隔,就是龙和鱼。同样,技术升职到最后也是有瓶颈,借用阿里职级来说,有些人一辈子就停留在p7了。
技术岗到了最后,除了你要在技术领域有自己的开源作品,还要在业内有一定的影响力。这也是我长期坚持写开源文章的动力之一。同样,如果您也是一位热爱分享的人,欢迎加入。
开始
通过传入开始和结束时间戳,重新消费kafka历史消息。
public static void main(String[] args) {
String startTime = "2018-08-14 21:19:09"; // 开始时间
String endTime = "2018-08-14 21:20:59"; // 结束时间
// endTime = ""; // 不设置结束时间
new ReConsumerByTime().start(startTime, endTime);
}
复制
详细代码:
package com.system.kafka.clients.demo.producer.reconsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* 通过开始时间和结束时间,重新消费kafka消息
*/
public class ReConsumerByTime {
KafkaConsumer<String, String> consumer;
final String topic = "mytopic";
final String groupId = "test";
final int partitionNum = 4;
final String bootstrapServers = "10.211.55.5:9092";
public static void main(String[] args) {
String startTime = "2018-08-14 21:19:09"; // 开始时间
String endTime = "2018-08-14 21:20:59"; // 结束时间
// endTime = ""; 不设置结束时间
new ReConsumerByTime().start(startTime, endTime);
}
public void start(String startTime, String endTime) {
init();
start(timeFormat(startTime), timeFormat(endTime));
}
public void start(long startTime, long endTime) {
Map<TopicPartition, Long> startMap = new HashMap<>();
for (int i = 0; i < partitionNum; i++) startMap.put(new TopicPartition(topic, i), startTime);
Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);
List<TopicPartition> topicPartitions = new ArrayList<>();
startMap.forEach((k, v) -> {
topicPartitions.add(k);
});
consumer.assign(topicPartitions);
startMap.forEach((k, v) -> {
consumer.seek(k, startOffsetMap.get(k).offset());
System.out.println(k + ", offsets:" + startOffsetMap.get(k).offset());
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
try {
if (endTime == 0 || record.timestamp() <= endTime) {
System.out.printf("offset = %d,p = %d, key = %s, value = %s \r\n", record.offset(), record.partition(), record.key(), record.value());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// 格式化时间
public long timeFormat(String dateTime) {
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.parse(dateTime).getTime();
} catch (ParseException e) {
e.printStackTrace();
return 0;
}
}
// 初始化消费者连接
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
}
}
复制
作者:半兽人
链接:http://orchome.com/1004
来源:OrcHome
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
文章转载自orchome,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
598次阅读
2025-04-14 09:40:20
Hologres x 函数计算 x Qwen3,对接MCP构建企业级数据分析 Agent
阿里云大数据AI技术
573次阅读
2025-05-06 17:24:44
一页概览:Oracle GoldenGate
甲骨文云技术
478次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
471次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
362次阅读
2025-04-18 10:01:22
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
阿里云大数据AI技术
298次阅读
2025-04-27 15:28:51
XCOPS广州站:从开源自研之争到AI驱动的下一代数据库架构探索
韩锋频道
284次阅读
2025-04-29 10:35:54
Coco AI 入驻 GitCode:打破数据孤岛,解锁智能协作新可能
极限实验室
248次阅读
2025-05-04 23:53:06
优炫数据库成功应用于晋江市发展和改革局!
优炫软件
198次阅读
2025-04-25 10:10:31
优炫数据库四个案例入选《2024网信自主创新调研报告》
优炫软件
189次阅读
2025-04-22 10:12:23