暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

通过开始和结束时间,重新消费kafka

orchome 2019-02-25
954

抱歉,这是一篇迟来的实用操作贴,原付费内容,现在免费送给大家(已付费的用户别急,拿好手中剩余的ORC,等升值)。



鱼跃龙门,一门之隔,就是龙和鱼。同样,技术升职到最后也是有瓶颈,借用阿里职级来说,有些人一辈子就停留在p7了。


技术岗到了最后,除了你要在技术领域有自己的开源作品,还要在业内有一定的影响力。这也是我长期坚持写开源文章的动力之一。同样,如果您也是一位热爱分享的人,欢迎加入。


开始


通过传入开始和结束时间戳,重新消费kafka历史消息。

    public static void main(String[] args) {
        String startTime = "2018-08-14 21:19:09";       // 开始时间
        String endTime = "2018-08-14 21:20:59";         // 结束时间
        //        endTime = "";         // 不设置结束时间
        new ReConsumerByTime().start(startTime, endTime);
    }
    复制


    详细代码:

      package com.system.kafka.clients.demo.producer.reconsumer;


      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
      import org.apache.kafka.common.TopicPartition;


      import java.text.ParseException;
      import java.text.SimpleDateFormat;
      import java.util.*;


      /**
      * 通过开始时间和结束时间,重新消费kafka消息
      */
      public class ReConsumerByTime {


      KafkaConsumer<String, String> consumer;


      final String topic = "mytopic";
      final String groupId = "test";
      final int partitionNum = 4;
      final String bootstrapServers = "10.211.55.5:9092";


      public static void main(String[] args) {
      String startTime = "2018-08-14 21:19:09"; // 开始时间
      String endTime = "2018-08-14 21:20:59"; // 结束时间
      // endTime = ""; 不设置结束时间
      new ReConsumerByTime().start(startTime, endTime);
      }


      public void start(String startTime, String endTime) {
      init();
      start(timeFormat(startTime), timeFormat(endTime));
      }


      public void start(long startTime, long endTime) {
      Map<TopicPartition, Long> startMap = new HashMap<>();
      for (int i = 0; i < partitionNum; i++) startMap.put(new TopicPartition(topic, i), startTime);


      Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);


      List<TopicPartition> topicPartitions = new ArrayList<>();
      startMap.forEach((k, v) -> {
      topicPartitions.add(k);
      });
      consumer.assign(topicPartitions);


      startMap.forEach((k, v) -> {
      consumer.seek(k, startOffsetMap.get(k).offset());
      System.out.println(k + ", offsets:" + startOffsetMap.get(k).offset());
      });


      while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);
      for (ConsumerRecord<String, String> record : records) {
      try {
      if (endTime == 0 || record.timestamp() <= endTime) {
      System.out.printf("offset = %d,p = %d, key = %s, value = %s \r\n", record.offset(), record.partition(), record.key(), record.value());
      }
      } catch (Exception e) {
      e.printStackTrace();
      }
      }
      }
      }


      // 格式化时间
      public long timeFormat(String dateTime) {
      try {
      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      return sdf.parse(dateTime).getTime();
      } catch (ParseException e) {
      e.printStackTrace();
      return 0;
      }
      }


      // 初始化消费者连接
      public void init() {
      Properties props = new Properties();
      props.put("bootstrap.servers", bootstrapServers);
      props.put("group.id", groupId);
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      consumer = new KafkaConsumer<>(props);
      }
      }
      复制


      作者:半兽人

      链接:http://orchome.com/1004

      来源:OrcHome

      著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

      文章转载自orchome,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

      评论