PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:
python --version
1.创建python3.9虚拟环境,具体请参考:
②. Jupyter Notebook中配置多版本Python
2. 安装PyFlink
python -m pip install apache-flink==1.16.0
如果速度太慢,可以设置国内源:
python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。
3. 编写代码
process_json_data_with_udf.pyimport jsonimport loggingimport sysfrom pyflink.table import (EnvironmentSettings, TableEnvironment, DataTypes, TableDescriptor,Schema)from pyflink.table.expressions import colfrom pyflink.table.udf import udfdef process_json_data_with_udf():t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# define the sourcetable = t_env.from_elements(elements=[(1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),(2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),(3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),(4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')],schema=['id', 'data'])# define the sinkt_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('id', DataTypes.BIGINT()).column('data', DataTypes.STRING()).build()).build())# update json columns@udf(result_type=DataTypes.STRING())def update_tel(data):json_data = json.loads(data)json_data['tel'] += 1return json.dumps(json_data)table = table.select(col('id'), update_tel(col('data')))# executetable.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")process_json_data_with_udf()
4. 运行代码
python process_json_data_with_udf.py
5. 输出结果
8> +I[1, {"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}]8> +I[2, {"name": "hello", "tel": 136, "addr": {"country": "China", "city": "Shanghai"}}]8> +I[3, {"name": "world", "tel": 125, "addr": {"country": "USA", "city": "NewYork"}}]8> +I[4, {"name": "PyFlink", "tel": 33, "addr": {"country": "China", "city": "Hangzhou"}}]
至此,Pyflink入门TableAPI基本操作代码编写完成。
更多实战详情请关注字节智传公众号

往期精彩
16. FlinkSql 集成 hive catalog模式进行读写数据
17. PyFlink 集成 hive catalog模式读写数据
18. PyFlink 集成 hive catalog模式建设数仓
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




