导读:“在互联网(尤其是移动互联网)、物联网、云计算、大数据等高速发展的大背景下,数据呈现爆炸式地增长。社会化网络、移动通信、网络视频音频、电子商务、传感器网络、科学实验等各种应用产生的数据,不仅存储容量巨大,而且还具有数据类型繁多、数据大小变化大、流动快等显著特点,往往能够产生千万级、亿级甚至十亿、百亿级的海量小文件,而且更多地是海量大小文件混合存储。由于在元数据管理、访问性能、存储效率等方面面临巨大的挑战性,因此海量小文件(LOSF,lots of small files)问题成为了工业界和学术界公认的难题。”这是从中科院一位博士的小文件综述中找到的概念描述,我要谈的小文件问题算是小文件问题的一个子集,也就是我们在日常的大数据场景中经常会遇到并头疼的问题—HDFS中的小文件问题。
导读:“在互联网(尤其是移动互联网)、物联网、云计算、大数据等高速发展的大背景下,数据呈现爆炸式地增长。社会化网络、移动通信、网络视频音频、电子商务、传感器网络、科学实验等各种应用产生的数据,不仅存储容量巨大,而且还具有数据类型繁多、数据大小变化大、流动快等显著特点,往往能够产生千万级、亿级甚至十亿、百亿级的海量小文件,而且更多地是海量大小文件混合存储。由于在元数据管理、访问性能、存储效率等方面面临巨大的挑战性,因此海量小文件(LOSF,lots of small files)问题成为了工业界和学术界公认的难题。”这是从中科院一位博士的小文件综述中找到的概念描述,我要谈的小文件问题算是小文件问题的一个子集,也就是我们在日常的大数据场景中经常会遇到并头疼的问题—HDFS中的小文件问题。
作者:小舰学长 中国人民大学计算机硕士
来源:DLab数据实验室(ID:rucdlab)
1.HDFS的小文件问题
HDFS的小文件问题说白了就是在HDFS中存储了大量的小于或者远小于块大小的文件。而这些小文件会占用一个块大小,这样会造成一些问题,下面咱们通过例子来分析一下:
在Hadoop2.x中每个block的大小是128M,在Hadoop3.x中每个block的大小是256M。我们以Hadoop2.x来举例,如果我们有一个1G大小的文件,那么存入HDFS中最终会形成8个block,但是如果我们有1024个1M大小的文件总共也是1G大小,这时候如果存入HDFS中会怎样呢,答案是会形成1024个block。从这里我们可以慢慢分析出小文件的一些危害了:
1、浪费磁盘空间,以前明明一个block可以存储128M数据的,现在却只存了1M,造成了极大的浪费,小文件多了造成的浪费就更大了。
2、维护成本高:HDFS的元数据管理会因为小文件过多从而造成维护成本升高,namenode节点负载过重,对集群的管理和扩展造成了制约;
3、处理效率低:如果小文件过多,每个小文件要占用一个slot,可能起的map或reduce task都会增多,每次启动和关闭task的时间都有可能会超过小文件的实际处理时间,最后导致处理大量小文件速度远远小于处理同等大小的大文件的速度,这样其实也会削弱处理效率。
2.小文件问题解决方案
说了这么多,小文件问题如何解决呢?
网上的解决方案很多,其实归类起来无非三种:
2.1通过配置类的参数来解决小文件问题:
对于Hive,可以氛围Map端的合并和Reduce端的合并。
(1)配置Map输入合并
-- 每个Map最大输入大小,决定合并后的文件数 set mapred.max.split.size=256000000; -- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并 set mapred.min.split.size.per.node=100000000; -- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并 set mapred.min.split.size.per.rack=100000000; -- 执行Map前进行小文件合并 set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
(2)配置Hive结果合并
我们可以通过一些配置项来使Hive在执行结束后对结果文件进行合并
hive.merge.mapfiles 在map-only job后合并文件,默认true
hive.merge.mapredfiles 在map-reduce job后合并文件,默认false
hive.merge.size.per.task 合并后每个文件的大小,默认256000000
hive.merge.smallfiles.avgsize 平均文件大小,是决定是否执行合并操作的阈值,默认16000000
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
set hive.exec.reducers.max = 888(每个任务最大的reduce数,默认为999)
(3)Sparksql reducer参数配置
对于Spark任务,也有相应的参数可以设置,就是对于reduce结果后的小文件进行自动合并
set spark.sql.adaptive.enabled=true; 启用 Adaptive Execution ,从而启用自动设置 Shuffle Reducer 特性
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128000000; 设置每个 Reducer 读取的目标数据量,其单位是字节。默认64M,一般改成集群块大小
(4)简单易用的归档
使用Hive Archive合并文件归档,会把分区的数据合并成一个.har的文件,使用方式如下:
# 设置archive参数
set hive.archive.enabled= true;
set hive.archive.har.parentdir.settable= true;
set har.partfile.size=1099511627776;
# 执行archive
alter table table_name archive PARTITION(dt='${DT}');
2.2 异步任务
以上讲了通过参数配置的方法,参数配置就是通过在任务执行的时候根据数据量的大小一并吧小文件合并做了,这样的好处就是一次到位,比较省事,但是有个缺点就是会使得整个任务因为小文件合并而降低效率、增加耗时。所以也就产生了另一种方法,异步独立任务小文件合并,就是通过单独起一个Hadoop任务或者spark任务,读取读取表的数据然后再重新写入,在写回的时候设置合适的并发来减少小文件。
以spark来举例,可以在sparksql初始化的时候就设置shuffle partition数,如下
SparkSession.builder().enableHiveSupport().config(conf). config("spark.sql.shuffle.partitions",5). getOrCreate()
也可以在进行rdd写入时通过repartition或者coalecse来重新规划分区。使用Spark重新读取小文件,修改分区并写入,这里不建议使用repartition(),推荐使用coalesce();repartition()会增加文件的大小,因为要历经shuffle阶段,但coalesce()不会经历shuffle阶段,数据大小也不会增加
异步任务的方式其实跟参数配置在底层来看无本质区别,最大的区别是一个是运行时调整,一个是事后调整,所以主要是看我们的业务需要,如果对表的实效性要求不高可以通过参数配置,如果表的实效性要求较高,可以先把表跑完,然后找个合适的时间定时合并小文件;
3.小结
网上小文件合并文章纷繁复杂,但是终归跑不出上面的两种思路,结合业务场景需要适当选择就行,这个过程总要有权衡取舍~