PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:
python --version
1.创建python3.9虚拟环境,具体请参考:
②. Jupyter Notebook中配置多版本Python
2. 安装PyFlink
python -m pip install apache-flink==1.16.0
如果速度太慢,可以设置国内源:
python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。
3. 编写代码
datastream_api_basic_operations.py
import jsonimport loggingimport sysfrom pyflink.common import Typesfrom pyflink.datastream import StreamExecutionEnvironmentdef show(ds, env):ds.print()env.execute()def basic_operations():env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)# define the sourceds = env.from_collection(collection=[(1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),(2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),(3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),(4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')],type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()]))# mapdef update_tel(data):# parse the jsonjson_data = json.loads(data.info)json_data['tel'] += 1return data.id, json.dumps(json_data)show(ds.map(update_tel), env)# (1, '{"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}')# (2, '{"name": "hello", "tel": 136, "addr": {"country": "China", "city": "Shanghai"}}')# (3, '{"name": "world", "tel": 125, "addr": {"country": "USA", "city": "NewYork"}}')# (4, '{"name": "PyFlink", "tel": 33, "addr": {"country": "China", "city": "Hangzhou"}}')# filtershow(ds.filter(lambda data: data.id == 1).map(update_tel), env)# (1, '{"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}')# key byshow(ds.map(lambda data: (json.loads(data.info)['addr']['country'],json.loads(data.info)['tel'])).key_by(lambda data: data[0]).sum(1), env)# ('Germany', 123)# ('China', 135)# ('USA', 124)# ('China', 167)if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")basic_operations()
4. 运行代码
python datastream_api_basic_operations.py
5. 输出结果
(1, '{"name": "Flink", "tel": 124, "addr": {"country": "Germany", "city": "Berlin"}}')(2, '{"name": "hello", "tel": 136, "addr": {"country": "China", "city": "Shanghai"}}')(3, '{"name": "world", "tel": 125, "addr": {"country": "USA", "city": "NewYork"}}')(4, '{"name": "PyFlink", "tel": 33, "addr": {"country": "China", "city": "Hangzhou"}}')
至此,Pyflink入门DataStream API 基础算子操作案例代码编写完成。
更多实战详情请关注字节智传公众号

往期精彩
16. FlinkSql 集成 hive catalog模式进行读写数据
17. PyFlink 集成 hive catalog模式读写数据
18. PyFlink 集成 hive catalog模式建设数仓
24. PyFlink入门--DataStream API wordcount案例
25. PyFlink入门--Table API wordcount案例
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




