官网介绍
Converts records from one data format to another using configured Record Reader and Record Write Controller Services.
个人解读
一个用于转换Flwofile的content的处理器。准确来讲这个处理器就是一个壳子,通过动态配置reader单元和writer单元来实现其强大的数据格式转换功能。支持的reader和writer详细见附录
配置详情
Record Reader | 配置继承于RecordReaderFactory的controller service用于读取Flwofile的content数据 GrokReader AvroReader见附录一SyslogReader ScriptedReader Syslog5424Reader XMLReader ParquetReader JsonTreeReader JsonPathReader CSVReader |
Record Writer | 配置继承于RecordSetWriterFactory的controller service用于写出Flwofile的content数据 AvroRecordSetWriter见附录二 ScriptedRecordSetWriterCSVRecordSetWriter FreeFormTextRecordSetWriter JsonRecordSetWriter ParquetRecordSetWriter XMLRecordSetWriter |
Include Zero Record FlowFiles |
指出没有通过reader单元如果拿到数据条数为空的处理策略。false空结果不路由,直接丢弃; true不管结果如果,任然路由到success |
路由关系
success | FlowFiles that are successfully transformed will be routed to this relationship |
failure | If a FlowFile cannot be transformed from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship。 解析异常,转换异常路由到failure |
写出属性
mime.type | 由writer单元配置 |
record.count | 读取数据条数 |
实战:一个json数据写成avro数据再写成json的例子
自定义schema
{
"type":"record",
"name":"achievement_source",
"namespace":"any.data1",
"fields":[
{
"name":"id",
"type":[
"null",
"string"
]
},
{
"name":"idcard",
"type":[
"null",
"string"
]
}
]
}
json数据
{"id":"1","idcard":"253618648491312","name":"Crystal Lee","chinese":"85","math":"81","english":"107","multiple":"256","total":"529","creat_time":"2020-11-04 23:01:01.0","update_time":null}
AvroRecordSetWriter配置
AvroSchemaRegistry
AvroReader
模板自取
链接:https://pan.baidu.com/s/1VgS9zKqn99fgqCslK4IGaQ
提取码:pp11
附录一:AvroReader
数据格式支持:1 schema内置 2 正常的纯avro数据
schema来源:1.数据内置 2.Schema Name模式下从Schema Registry中通过Schema Name提取schema
3.Schema Tex模式下通过Schema Text配置提取schema
Schema Access Strategy |
指明获取schema的策略,默认从数据内置获取 |
Schema Registry | schema的注册集 |
Schema Name | Schema Name' Property模式下从Schema Registry提取schema的配置名称 |
Schema Version | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Branch不能都填写(reader模块无效) |
Schema Branch | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Version不能都填写(reader模块无效) |
Schema Text | Use 'Schema Text' Property 读取schema |
Cache Size | 为了提高效率,schema不是每次都去编译的。这里指明缓存大小,缓存使用过的是LoadingCache,key是schema内容,value是编译后的schema对象 |
源码关于获取schema的一个对象设计
对应着Schema Name,Schema Version,Schema Branch
附录二:AvroRecordSetWriter
写avro数据的controller service
Schema Write Strategy |
指定数据如何添加schema |
Schema Cache | 指定要向其中添加记录模式的模式缓存,以便记录读取器可以快速查找该模式。 |
Schema Access Strategy |
指明获取schema的策略,默认从数据内置获取 |
Schema Registry | schema的注册集 |
Schema Name | Schema Name' Property模式下从Schema Registry提取schema的配置名称 |
Schema Version | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Branch不能都填写 |
Schema Branch | Schema Name' Property模式下从Schema Registry提取schema参数之一,和Schema Version不能都填写 |
Schema Text | Use 'Schema Text' Property 读取schema |
Compression Format | 数据压缩方式 |
Cache Size | 为了提高效率,schema不是每次都去编译的。这里指明缓存大小,缓存使用过的是LoadingCache,key是schema内容,value是编译后的schema对象 |
Encoder Pool Size | avro-writer需要使用一个编码器。编码器的创建是昂贵的,但一旦创建,它们就可以被重用。此属性控制可合用和重用的编码器的最大数量。将该值设置得太小可能会导致性能下降,但将其设置得更高可能会使用更多堆。如果Avro写入器配置了“Embed Avro Schema”的模式写入策略,则该属性将被忽略。 |