暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

奇葩现象!零负载场景电力行业线上Flink作业必现TaskManager内存溢出超分被杀原生bug问题排查

大数据从业者 2025-03-18
11

问题描述    

根据客户现场运维人员反馈新上的某业务对应的Flink作业经常定期异常退出,已有的历史Flink作业并没有这种现象。排查过JobManager日志提示心跳超时,现场人员曾经多次尝试调大过超时时间。但是,问题仍旧存在,日志如下:

    java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id container_**** timed out.
           at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
    复制

    详细查看本次故障任务对应的日志,发现除了上述心跳超时记录之外,日志还提示物理内存超分、被强制kill,记录如下:

      beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB of 3 GB physical memory used; 6.1 GB of 6.3 GB virtual memory used. Killing container.
      Container killed on request. Exit code is 143
      Container exited with a non-zero exit code 143.  
      复制

      根据笔者经验,这种异常无外乎几种情况导致,细分如下:

      1. TaskManager内存设置的不合理(过小),也就是不够用。

      2. 业务逻辑存在内存泄漏的Bug,导致TaskManager内存溢出。

      3. Flink框架自身存在内存泄漏的Bug,导致TaskManager内存溢出。

      本文结合现场反馈,针对该作业逐步对上述情况展开排查,遂有此文!欢迎关注微信公众号:大数据从业者          

      问题分析    

      客户业务开发人员反馈该作业还在灰度阶段,入口数据未完全割接,目前只有极少数据进行灰度发布。并且,客户称业务逻辑很简单,不可能有问题。从发过来的业务代码对应的FlinkUI界面来看,确实很简单:

      综合考虑现场情况:业务作业数据量小、逻辑简单、未引入任何第三方的jar,那么问题直指Flink框架自身了。既然怀疑Flink存在问题,那么直接开发启动一个Flink作业测试用例,保持运行状态过程时,通过命令生成堆转储,分析内存泄漏情况:

        jmap -dump:live,format=b,file=heaplive.bin <pid>
        复制

        测试用例见:

          https://github.com/felixzh2020/felixzh-flink/tree/master/Kafka2Kafka
          复制

          上述dump文件分析发现,即使源端Kafka没有业务数据,Flink作业KafkaSourceReader类中的SynchronizedSortedMap仍然存在大量数据,详见如下:

          从相关源码来看,每次checkpoint触发时调用snapshotState方法会使用offsetsToCommit存储当前批次checkpoint期间已经完成读取Kafka的分区及对应offsets。

          然后,在每次checkpoint成功时调用notifyCheckpointComplete方法、commitOffsets方法将offsets信息提交到Kafka。同时会清理上述offsetsToCommit中小于当前checkpointID的记录。    

          深入看下commitOffsets方法内容:enqueueOffsetsCommitTask方法就是实现提交offsets到Kafka,而callback就是实现清理上述offsetsToCommit(注意并不是下图中的offsetsToCommit)。关键问题是:下图中的入参committedPartitions(也就是下图中的offsetsToCommit)如果为空,则不会触发提交offset,这个没毛病。但是也不会触发清理上述offsetsToCommit,这个就有毛病了。

          定位根因:数据量很小或者压根没有数据时,每次checkpoint都写入信息到offsetsToCommit,但是并不会删除历史,最终导致offsetsToCommit数据量越来越大!

          源码修改    

          在notifyCheckpointComplete方法中,提前判断待提交的offsets是否为空?如果为空,说明没有读取到数据,那就不需要提交offset,那么可以复用已有的代码逻辑清理offsetsToCommit,不再执行commitOffsets方法即可。

          完整代码修改PR:

            https://github.com/felixzh2020/flink/commit/6da4b59d1d371cd6e18504bee5df2a80c9f82833    
            复制

            效果验证    

            源码修改之后,进行源码编译,替换flink-connector-kafka相关jar包。

              mvn clean install -DskipTests -Dfast -Dhadoop.version=3.2.3 -T 1
              复制

              针对该Flink作业定制配置prometheus监控,方便验证效果如下:          

                metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
                metrics.reporter.promgateway.hostUrl: http://felixzh:9091
                metrics.reporter.promgateway.jobName: felixzhJob
                metrics.reporter.promgateway.randomJobNameSuffix: true
                metrics.reporter.promgateway.deleteOnShutdown: false
                复制
                通过观察TaskManager对应采集Metric指标信息,可以看出heap内存使用率在正常范围内波动。Metric指标:
                  flink_taskmanager_Status_JVM_Memory_Heap_Used
                  复制
                    

                  至此,Flink作业出现TaskManager内存溢出超分被杀原生bug问题定位解决完成!    

                  文章转载自大数据从业者,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论