暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

前端埋点数据采集(四)Flume消费Kafka数据到HDFS

畅谈Fintech 2021-06-25
3698


上一期“前端埋点数据采集(三)Flume采集数据”,node01和02服务器节点Flume组件已经启动并且采集app.yyyy-mm-dd.log了。

本期分享的内容是服务器Node01和02,Flume组件采集到的日志,通过Kafka channel到3台Kafka集群上,然后Node03服务器FLume组件来消费3台Kafka集群数据,然后上传到HDFS。


“前端埋点数据的采集系列,一共5次分享来实现这个需求交付解决方案。

一、采集系统架构设计
二、mock应用系统10万条前端埋点数据
三、Flume采集数据
四、Flume消费Kafka数据到HDFS
五. 前端埋点数据采集总结







一、Kafka搭建、配置及生产数据


1、Kafka集群搭建和配置


我们在之前分享文章里面,我们已经搭建过kafka集群和配置,并且对Kafka集群做了简单的基准测试。详见前面几次分享的“kafka集群搭建”。

2、启动kafka集群

    kafka_cluster.sh start
    复制

    使用kafka_cluster.sh脚本,来启动和关闭3台Kafka集群。

      #function:kafka集群启动、关闭
      #!/bin/bash
      case $1 in
      "start"){
      for node in node01 node02 node03
      do
      echo =============== kafka $node 启动 ===============
      ssh $node "/opt/module/kafka/bin/kafka-server-start.sh -daemon opt/module/kafka/config/server.properties"
      done
      };;
      "stop"){
      for node in node01 node02 node03
      do
      echo =============== kafka $node 停止 ===============
      ssh $node "/opt/module/kafka/bin/kafka-server-stop.sh -daemon opt/module/kafka/config/server.properties"
      done
      };;
      esac
      复制

      3、查看Kafka集群topic


      查看topic_log已经启动。这是为什么呢?我们并没有启动topic。

      因为Kafka集群启动,默认会启动一个名字叫做topic_log的topic。

      Flume会通过组件里面的kafka channel写到kafka集群,如果kafka的topic没有创建,就自动创建一个topic_log,待后面的flume来消费。

      4、查看Kafka集群是否消费到数据


        bin/kafka-console-consumer.sh \
        --bootstrap-server node01:9092 --topic topic_log
        复制
        控制台打印信息,发现Kafka集群已经有信息输出了。


        二、消费Flume选型


        我们先看一下日志采集流程,现在数据已经在3台Kafka集群里面存储了,Flume组件即Node03对3台Kafka数据做消费。需要配置消费Flume组件。



        1、消费Flume 的Source选型


        Source源:我们选择的是KafKa Source,因为是从Kafka集群里面消费数据。


        2、消费Flume 的Channel选型


        Channel:我们选择的是FileChannel,因为Channel 数据会到下一级HDFS上,保证了数据的安全性,不会丢失。Agent进程挂掉也可以从失败中恢复数据。

        Channel类型
        特点
        File channel
        基于磁盘,断电数据保存,可靠性较高
        效率低,速度慢
        memory channel
        基于内存,断电数据丢失,可靠性较低
        效率高,速度快
        Kafka channel
        数据存储在Kafka里面,Kafka存储于磁盘,可靠性高
        等价于 memory channel  + Kafka sink


        3、消费Flume 的Sink选型


        Sink:我们选择HDFS Sink,因为我们消费的数据采集最终到HDFS上。




        三、消费Flume拦截器实现


        我们先来看一下Kafka-Flume-HDFS配置架构图:

        1、why需要TimeInterceptor时间拦截器?

        我们从架构图里面看到多了一个TimeInterceptor时间拦截器:因为Flume->HDFS,FLume默认使用的是Linux系统集群的时间,消费Kafka数据后很有可能是第二天了(如果数据是23点59分产生的log,那输出到HDFS很有可能是第二天了),这样HDFS统计这个时间是从第二天才开始,这样不满足需求统计 T + 1的时间。所以这里到FLume组件之前,必须要加上时间拦截器把日志产生的实际时间取出来,然后写入到HDFS。


        2、Maven工程编写TimeStampInterceptor类

        我们查看Flume官网http://flume.apache.org/FlumeUserGuide.html

        找到Flume Sinks、HDFS Sink关于timestamp的内容。

        (1)定义TimeStampInterceptor类实现Interceptor接口

        主要在单event里面Intercept方法处理把时间替换掉。

          publicEvent intercept(Event event)
          {
          Map<String,String> headers = event.getHeaders();
          byte[] body = event.getBody();
            String log  = new String(body, StandardCharsets.UTF_8);
            //把日志转为JSON对象
          JSONObject jsonObject = JSONObject.parseObject(log);
            //获取到key
          String ts = jsonObject.getString("ts");
            //把key,value放到timastamp里面
          headers.put("timestamp",ts);
          return event;
          }
          复制

          在List event数组里面,调用单event清除时间

            public List<Event> intercept(List<Event> list)
            {
            eventArrayList.clear();
            for(Event event:list)
            {
            //调用单个event把时间替换掉
            eventArrayList.add(intercept(event));
                }
            return eventArrayList;
            }
            复制


            (2)定义静态内部类创建TimeStampInterceptor

              public
              static
              class Builder implements Interceptor.Builder
              {
              @Override
              public Interceptor build()
              {
                  return new TimeStampInterceptor();
              }


              @Override
              public void configure(Context context)
              {


              }
              }
              复制
              3、Maven package打出jar包然后上传到HDFS




              四、消费Flume配置


              完成了Flume组件Source、Channel、Sink以及TimeInterceptor拦截器,我们开始配置flume的conf文件。

                #新建kafka-file-hdfs.conf文件
                vim kafka-file-hdfs.conf
                复制
                1、配置Source、TimeStampInterceptor拦截器等
                  #消费flume配置信息
                  a1.sources = r1
                  a1.channels = c1
                  a1.sinks = k1
                  #Source:kafka source
                  a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
                  #一次拉取10000个
                  a1.sources.r1.batchSize = 10000
                  #配置时间是2000
                  a1.sources.r1.batchDurationMillis = 2000
                  #Kafka集群
                  a1.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
                  #topic
                  a1.sources.r1.kafka.topics = topic_log
                  #配置时间拦截器
                  a1.sources.r1.interceptors = i1
                  a1.sources.r1.interceptors.i1.type = com.wqs.flume.interceptor.TimeStampInterceptor$Builder
                  复制

                  2、配置Channel

                    #配置channel:file channel
                    a1.channels.c1.type = file
                    #配置磁盘索引,它备份内存索引
                    a1.channels.c1.checkpointDir = opt/module/flume-1.9.0/checkpoint/behavior1
                    #存在存盘的位置
                    a1.channels.c1.dataDirs = opt/module/flume-1.9.0/data/behavior1/
                    a1.channels.c1.maxFileSize = 2146435071
                    #100W个event
                    a1.channels.c1.capacity = 1000000
                    #timeout in seconds for adding or removing event
                    a1.channels.c1.keep-alive = 3
                    a1.channels.c1.parseAsFlumeEvent = false
                    复制

                    3、配置Sink以及HDFS压缩

                      #配置 sink
                      a1.sinks.k1.type = hdfs
                      a1.sinks.k1.hdfs.path = /log/topic_log/%Y-%m-%d
                      a1.sinks.k1.hdfs.filePrefix = log-
                      a1.sinks.k1.hdfs.round = false
                      a1.sinks.k1.hdfs.rollInterval = 10
                      a1.sinks.k1.hdfs.rollSize = 134217728
                      a1.sinks.k1.hdfs.rollCount = 0
                      # 控制输出文件是原生文件,lzop压缩
                      a1.sinks.k1.hdfs.fileType = CompressedStream
                      a1.sinks.k1.hdfs.codeC = lzop
                      #配置source和channel的关系
                      a1.sources.r1.channels = c1
                      a1.sinks.k1.channel = c1
                      复制




                      五、启动Flume观察HDFS


                      1、启动Flume消费节点


                      由于我们Flume消费这台节点是单节点Node03,所以启动的话直接通过命令行完成,当然也能使用脚本封装一下。脚本在上一节“前端埋点数据采集(三)Flume采集数据

                      ”一样。

                        nohup opt/module/flume-1.9.0/bin/flume-ng agent --name a1 --conf-file opt/module/flume-1.9.0/conf/kafka-file-hdfs.conf --name a1 Dflume.root.logger=INFO,LOGFILE 1>/opt/module/flume-1.9.0/log2.txt 2>&1 &
                        复制

                        这个时候已经万事俱备只欠东风了!我们启动整个集群,我们这里使用1个脚本运行启动整个集群。

                        并且观察进程查看Flume是否启动成功?

                        有了Application说明Flume启动成功。

                        注意:其他进程这里也标出来了,可以自检一下。

                        2、执行jar包生成日志
                        执行jar包,查看生成的日志文件


                        3、查看HDFS上文件
                        访问http://node01:9870/
                        • 发现HDFS目录生成和我们配置的一月,并且按天一个日期。

                        • 观察HDFS生成的文件指出lzo压缩





                        总结:
                        1. 3台Kafka集群搭建、配置以及是否能否生产数据;
                        2. 消费Flume选型、拦截器实现、配置;
                        3. 启动FLume、启动整个集群;
                        4. 观察WebUI查看HDFS目录;
                        5. 是否生成日志数据、日志数据时间是否正常、是否支持lzo压缩。




                        >>>>

                        Q&A

                        Q1:为什么启动Kafka集群的时候,没有设置和指定topic,然后查看Kafka --list_consumer_offsets有一个叫topic_log?

                        A1: 因为我们Flume组件配置Kafka Channel的时候,定义了一个叫topic_log的主题,Kafka默认启动会创建这个topic_log,等等后面的Flume来消费。


                        Q2: FLume组件里面 File Channel的dataDirs,checkpointDir以及backupcheckpointDir有什么区别?
                        A2:dataDirs:指向多个路径,每个路径对应不同的硬盘;
                        checkpointDir:保证dataDir挂掉后,可以恢复数据;
                        backupCheckpointDir:保证checkpoint挂掉后,可以恢复数据。




                        Q3:为什么启动启动消费Flume抛出如下异常:

                          ERROR hdfs.HDFSEventSinkprocess failed
                          java.lang.OutOfMemoryError: GC overhead limitexceeded
                          复制

                          A3:在conf/flume-env.sh文件中增加如下配置:

                            export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
                            复制

                            -Xms:表示JVM Heap(堆内存)最小尺寸,初始启动时候的分配;

                            -Xmx :表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc。


                            文章转载自畅谈Fintech,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论