实时数据湖业务场景
实时数据湖解决方案对比
实时数据流管道
DFS上实时数据流解决方案
Hudi表存储类型对比
一旦首次数据写入时确定了Hudi存储格式,不能再修改。COW存储格式不需要压缩:
ERROR HoodieCompactor: org.apache.hudi.HoodieNotSupportedException: Compaction is not supported on a CopyOnWrite table
复制
Hudi操作类型使用场景
并发:
hoodie.bulkinsert.shuffle.parallelism
hoodie.insert.shuffle.parallelism
hoodie.upsert.shuffle.parallelism
hoodie.delete.shuffle.parallelism
设置并发的数量,spark会用coalesce或reduceByKey对RDD进行重新分区,分区数就是并发数。
在bulk_insert里有个设置排序的参数,默认是全局排序,即对读取到的所有数据根据排序字段进行排序,全局排序是用rdd.sortBy实现的:
如果时分区排序,就先用coalesce(outputSparkPartitions)重分区,然后在mapPartition转换内对每个分区内的数据排序:
很明显第二种方式效率会更好高。
Hudi查询类型
代码--批量同步Hive表到Hudi
import spark.sql
val df = sql(sparkSql)
val originCount = sql(s"select count(1) from ${cliConfig.sourceTableName}").take(1)(0)
LOG.info(s"Origin Count:$originCount")
val writerClientOpts = initWriterClientOpts(cliConfig)
val dfWriter = df.write
.format("hudi") // 指定Spark写入类型
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,cliConfig.tableType)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,cliConfig.operation)
.option(HoodieWriteConfig.TABLE_NAME, cliConfig.targetTableName)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, cliConfig.recordkeyField) // 主键
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, cliConfig.precombineField) // 记录更新时间:Hudi表Merge时的依据
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, cliConfig.partitionField)
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY,true)
.options(writerClientOpts)
val sparkSaveMode=
if(cliConfig.operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)) {
SaveMode.Overwrite
}else {
SaveMode.Append
}
dfWriter.mode(sparkSaveMode)
.save(cliConfig.targetBasePath)
spark.stop()
复制
参数说明:
hoodie.datasource.write.partitionpath.field:在分区表的情况下,起作用;指定分区表的partition字段,如果没有配置,那partition默认字段"partitionpath";如果分区字段在查询的表里不存在,那分区的默认值是"default"。也就说,hudi表是分区表,默认分区字段partitionpath,并且只有一个“default”分区
hoodie.table.name:指定hudi 表名,hudi表名在hudi元数据文件里存储
如果目标路径已经存在,Hudi会认为表元数据已经有了,否则会报org.apache.hudi.exception.TableNotFoundException: Hoodie table not found in path
脚本--实时同步Kafka到Hudi表
用HoodieDeltaStreamer 工具,可以实时同步Kafka数据到Hudi表:
spark-submit --master yarn \
--deploy-mode client \
--queue root.kp.swift \
--name DeltaStreamer_ods \
--driver-memory 2G \
--executor-cores 2 \
--num-executors 4 \
--executor-memory 4G \
--jars /opt/hudi/libs/hudi-spark-bundle_2.11-0.6.0.jar,/opt/hudi/libs/spark-avro_2.11-2.4.4.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /opt/hudi/tools/hudi-utilities-bundle_2.11-0.6.0.jar \
--continuous \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--hoodie-conf hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///apps/hive/warehouse/ods.db/ods_product/kafka-schema.avro \
--hoodie-conf hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs:///apps/hive/warehouse/ods.db/ods_product/kafka-schema.avro \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--props hdfs:///apps/hive/warehouse/ods.db/ods_product/kafka-source.properties \
--target-base-path /apps/hive/warehouse/ods.db/ods_product/ \
--op UPSERT \
--commit-on-errors \
--table-type COPY_ON_WRITE \
--target-table ods.ods_product\
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--source-ordering-field last_updated_stamp \
--checkpoint ods_product,0:15523,1:15562,2:15815,3:15601
复制
参数说明:
checkpoint可以指定初始化消费Kafka的offset。当首次用HoodieDeltaStreamer写Hudi表,此时Hudi表元数据中还没有checkpoint信息,如果没有指定checkpoint参数,会使用auto.offset.reset的配置项,默认LATEST,即从最新的offset开始消费;如果配置了checkpoint,不管hudi表有没有checkpoint信息,都根据参数指定的checkpoint开始消费。
target-table指定的是在hudi元数据中的hudi表名称
schemaprovider-clas指定schema解析器
source/target.schema.file,如果用的文件schema解析,该参数指定avro格式的schema配置文件
source-ordering-field排序列,用于merge数据
transformer-class 数据转换类,定义一个spark sql,加工源表数据
props:指定一个配置文件,配置参数统一放到这个文件里配置;props配置文件里的参数,会被--hoodie-conf指定的参数替换,如果不指定,可能会报默认加载的配置文件找不到得异常
HoodieDeltaStreamer有checkpoint功能,会把当前消费的Kafka偏移量放到Hudi Table元数据(deltastreamer.checkpoint.key)。
提交的spark应用名规则:delta-streamer-$targetTableName
kafka-source.properties配置文件样例:
hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=product_id
hoodie.datasource.write.partitionpath.field=""
hoodie.deltastreamer.source.kafka.topic=product_price
bootstrap.servers=ip1:9092,ip2:9092
auto.offset.reset=earliest
group.id=hudi_ods
复制
代码--增量查询
val beginInstantTime = "20201213200000"
spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginInstantTime).
load(s"$hudiTablePath").
createOrReplaceTempView(hudiTableName)
spark.sql(s"select count(1) from ${hudiTableName} limit 10").show()
复制
上面代码用的SparkDataSource的方法,这种方法不支持对Merge On Read表的增量查询;可以把Hudi表同步到Hive,然后用Spark SQL查hive里的数据,Spark Sql支持Copy On Write和Merge On Read的增量查询;
BEGIN_INSTANTTIME_OPT_KEY指定增量查询时间。
「在一起看」👇