暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

29. PyFlink入门--DataStream API 基础算子操作案例

大数据技能圈 2023-01-04
9

PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:

    python --version

    1.创建python3.9虚拟环境,具体请参考:

    ① . Jupyter Notebook介绍、安装及使用

    ②. Jupyter Notebook中配置多版本Python

    2. 安装PyFlink

      python -m pip install apache-flink==1.16.0

      如果速度太慢,可以设置国内源:

        python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

        注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。

        3. 编写代码

        datastream_api_basic_operations.py

          import json
          import logging
          import sys


          from pyflink.common import Types
          from pyflink.datastream import StreamExecutionEnvironment




          def show(ds, env):
          ds.print()
          env.execute()




          def basic_operations():
          env = StreamExecutionEnvironment.get_execution_environment()
          env.set_parallelism(1)


          # define the source
          ds = env.from_collection(
          collection=[
          (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
          (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
          (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
          (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
          ],
          type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()])
          )


          # map
          def update_tel(data):
          # parse the json
          json_data = json.loads(data.info)
          json_data['tel'] += 1
          return data.id, json.dumps(json_data)


          show(ds.map(update_tel), env)
          # (1, '{"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}')
          # (2, '{"name": "hello", "tel": 136, "addr": {"country": "China", "city": "Shanghai"}}')
          # (3, '{"name": "world", "tel": 125, "addr": {"country": "USA", "city": "NewYork"}}')
          # (4, '{"name": "PyFlink", "tel": 33, "addr": {"country": "China", "city": "Hangzhou"}}')


          # filter
          show(ds.filter(lambda data: data.id == 1).map(update_tel), env)
          # (1, '{"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}')


          # key by
          show(ds.map(lambda data: (json.loads(data.info)['addr']['country'],
          json.loads(data.info)['tel']))
          .key_by(lambda data: data[0]).sum(1), env)
          # ('Germany', 123)
          # ('China', 135)
          # ('USA', 124)
          # ('China', 167)




          if __name__ == '__main__':
          logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")


          basic_operations()

          4. 运行代码

            python datastream_api_basic_operations.py

            5. 输出结果

              (1, '{"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}')
              (2, '{"name": "hello", "tel": 136, "addr": {"country": "China", "city": "Shanghai"}}')
              (3, '{"name": "world", "tel": 125, "addr": {"country": "USA", "city": "NewYork"}}')
              (4, '{"name": "PyFlink", "tel": 33, "addr": {"country": "China", "city": "Hangzhou"}}')

              至此,Pyflink入门DataStream API 基础算子操作案例代码编写完成。

              更多实战详情请关注字节智传公众号

              往期精彩

              16. FlinkSql 集成 hive catalog模式进行读写数据

              17. PyFlink 集成 hive catalog模式读写数据

              18. PyFlink 集成 hive catalog模式建设数仓

              22. 离线数仓实战项目

              23. 实时数仓实战项目

              24. PyFlink入门--DataStream API wordcount案例

              25. PyFlink入门--Table API wordcount案例

              26. PyFlink入门--Table DataStream 相互转换案例

              27. 祝大家新年快乐,2023冲冲冲

              28. PyFlink入门--Table API 处理json数据操作案例

              文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论