暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

28. PyFlink入门--Table API 处理json数据操作案例

大数据技能圈 2023-01-03
27

PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:

    python --version

    1.创建python3.9虚拟环境,具体请参考:

    ① . Jupyter Notebook介绍、安装及使用

    ②. Jupyter Notebook中配置多版本Python

    2. 安装PyFlink

      python -m pip install apache-flink==1.16.0

      如果速度太慢,可以设置国内源:

        python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

        注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。

        3. 编写代码

        process_json_data_with_udf.py
          import json
          import logging
          import sys


          from pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor,
          Schema)
          from pyflink.table.expressions import col
          from pyflink.table.udf import udf




          def process_json_data_with_udf():
          t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())


          # define the source
          table = t_env.from_elements(
          elements=[
          (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
          (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
          (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
          (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
          ],
          schema=['id', 'data'])


          # define the sink
          t_env.create_temporary_table(
          'sink',
          TableDescriptor.for_connector('print')
          .schema(Schema.new_builder()
          .column('id', DataTypes.BIGINT())
          .column('data', DataTypes.STRING())
          .build())
          .build())


          # update json columns
          @udf(result_type=DataTypes.STRING())
          def update_tel(data):
          json_data = json.loads(data)
          json_data['tel'] += 1
          return json.dumps(json_data)


          table = table.select(col('id'), update_tel(col('data')))


          # execute
          table.execute_insert('sink') \
          .wait()
          # remove .wait if submitting to a remote cluster, refer to
          # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
          # for more details




          if __name__ == '__main__':
          logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")


          process_json_data_with_udf()

          4. 运行代码

            python process_json_data_with_udf.py

            5. 输出结果

              8> +I[1, {"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}]
              8> +I[2, {"name": "hello", "tel": 136, "addr": {"country": "China", "city": "Shanghai"}}]
              8> +I[3, {"name": "world", "tel": 125, "addr": {"country": "USA", "city": "NewYork"}}]
              8> +I[4, {"name": "PyFlink", "tel": 33, "addr": {"country": "China", "city": "Hangzhou"}}]

              至此,Pyflink入门TableAPI基本操作代码编写完成。

              更多实战详情请关注字节智传公众号

              往期精彩

              16. FlinkSql 集成 hive catalog模式进行读写数据

              17. PyFlink 集成 hive catalog模式读写数据

              18. PyFlink 集成 hive catalog模式建设数仓

              22. 离线数仓实战项目

              23. 实时数仓实战项目

              24. PyFlink入门--DataStream API wordcount案例

              25. PyFlink入门--Table API wordcount案例

              文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论