在 Flink 1.10 的 Table API 和 SQL 中,表支持的格式有四种:
CSV Format JSON Format Apache Avro Format Old CSV Format
官网地址如下:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#table-formats
我用 JSON Format 比较多,也有嵌套的JSON 数据需要解析,大概描述一下。
以下内容来下官网介绍:
JSON格式允许读取和写入与给定格式 schema 相对应的JSON数据。 格式 schema 可以定义为Flink类型,JSON schema 或从所需的表 schema 派生。 Flink类型启用了更类似于SQL的定义并映射到相应的SQL数据类型。 JSON模式允许更复杂和嵌套的结构。 如果格式 schema 等于表 schema,则也可以自动派生该 schema。 这只允许定义一次 schema 信息。 格式的名称,类型和字段的顺序由表的 schema 确定。 如果时间属性的来源不是字段,则将忽略它们。 表 schema 中的from定义被解释为以该格式重命名的字段。
大概意思就是,flink 在解析json的时候,可以自己通过 schema(支持复杂的嵌套json),如果不提供 schema,默认使用 table schema 自动派生 json 的 schema(不支持复杂json)。
官网对应 json format 的表的样例:
CREATE TABLE MyUserTable ( ... ) WITH ( 'format.type' = 'json', -- required: specify the format type 'format.fail-on-missing-field' = 'true' -- optional: flag whether to fail if a field is missing or not, false by default 'format.fields.0.name' = 'lon', -- optional: define the schema explicitly using type information. 'format.fields.0.data-type' = 'FLOAT', -- This overrides default behavior that uses table's schema as format schema. 'format.fields.1.name' = 'rideTime', 'format.fields.1.data-type' = 'TIMESTAMP(3)', 'format.json-schema' = -- or by using a JSON schema which parses to DECIMAL and TIMESTAMP. '{ -- This also overrides the default behavior. "type": "object", "properties": { "lon": { "type": "number" }, "rideTime": { "type": "string", "format": "date-time" } } }' )
注:flink 1.10 字段的名称和类型可以从 table schema 中推断,不用写 format.fields.0.name 和 format.fields.0.data-type 了。
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。