PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:
python --version
1.创建python3.9虚拟环境,具体请参考:
②. Jupyter Notebook中配置多版本Python
2. 安装PyFlink
python -m pip install apache-flink==1.16.0
如果速度太慢,可以设置国内源:
python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。
3. 编写代码
table_api_basic_operations.py
import jsonimport loggingimport sysfrom pyflink.common import Rowfrom pyflink.table import (DataTypes, TableEnvironment, EnvironmentSettings)from pyflink.table.expressions import *from pyflink.table.udf import udtf, udf, udaf, AggregateFunction, TableAggregateFunction, udtafdef basic_operations():t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# define the sourcetable = t_env.from_elements(elements=[(1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),(2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),(3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),(4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')],schema=['id', 'data'])right_table = t_env.from_elements(elements=[(1, 18), (2, 30), (3, 25), (4, 10)],schema=['id', 'age'])table = table.add_columns(col('data').json_value('$.name', DataTypes.STRING()).alias('name'),col('data').json_value('$.tel', DataTypes.STRING()).alias('tel'),col('data').json_value('$.addr.country', DataTypes.STRING()).alias('country')) \.drop_columns(col('data'))table.execute().print()# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# | op | id | name | tel | country |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# | +I | 1 | Flink | 123 | Germany |# | +I | 2 | hello | 135 | China |# | +I | 3 | world | 124 | USA |# | +I | 4 | PyFlink | 32 | China |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+
# limit the number of outputstable.limit(3).execute().print()# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# | op | id | name | tel | country |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# | +I | 1 | Flink | 123 | Germany |# | +I | 2 | hello | 135 | China |# | +I | 3 | world | 124 | USA |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# filtertable.filter(col('id') != 3).execute().print()# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# | op | id | name | tel | country |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# | +I | 1 | Flink | 123 | Germany |# | +I | 2 | hello | 135 | China |# | +I | 4 | PyFlink | 32 | China |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+# aggregationtable.group_by(col('country')) \.select(col('country'), col('id').count, col('tel').cast(DataTypes.BIGINT()).max) \.execute().print()# +----+--------------------------------+----------------------+----------------------+# | op | country | EXPR$0 | EXPR$1 |# +----+--------------------------------+----------------------+----------------------+# | +I | Germany | 1 | 123 |# | +I | USA | 1 | 124 |# | +I | China | 1 | 135 |# | -U | China | 1 | 135 |# | +U | China | 2 | 135 |# +----+--------------------------------+----------------------+----------------------+# distincttable.select(col('country')).distinct() \.execute().print()# +----+--------------------------------+# | op | country |# +----+--------------------------------+# | +I | Germany |# | +I | China |# | +I | USA |# +----+--------------------------------+# join# Note that it still doesn't support duplicate column names between the joined tablestable.join(right_table.rename_columns(col('id').alias('r_id')), col('id') == col('r_id')) \.execute().print()# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+# | op | id | name | tel | country | r_id | age |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+# | +I | 4 | PyFlink | 32 | China | 4 | 10 |# | +I | 1 | Flink | 123 | Germany | 1 | 18 |# | +I | 2 | hello | 135 | China | 2 | 30 |# | +I | 3 | world | 124 | USA | 3 | 25 |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+----------------------+# join lateral@udtf(result_types=[DataTypes.STRING()])def split(r: Row):for s in r.name.split("i"):yield stable.join_lateral(split.alias('a')) \.execute().print()# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+# | op | id | name | tel | country | a |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+# | +I | 1 | Flink | 123 | Germany | Fl |# | +I | 1 | Flink | 123 | Germany | nk |# | +I | 2 | hello | 135 | China | hello |# | +I | 3 | world | 124 | USA | world |# | +I | 4 | PyFlink | 32 | China | PyFl |# | +I | 4 | PyFlink | 32 | China | nk |# +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+# show schematable.print_schema()# (# `id` BIGINT,# `name` STRING,# `tel` STRING,# `country` STRING# )# show execute planprint(table.join_lateral(split.alias('a')).explain())# == Abstract Syntax Tree ==# LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{}])# :- LogicalProject(id=[$0], name=[JSON_VALUE($1, _UTF-16LE'$.name', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], tel=[JSON_VALUE($1, _UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))], country=[JSON_VALUE($1, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR))])# : +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, data)]]])# +- LogicalTableFunctionScan(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$1f0568d1f39bef59b4c969a5d620ba46*($0, $1, $2, $3)], rowType=[RecordType(VARCHAR(2147483647) a)], elementType=[class [Ljava.lang.Object;])## == Optimized Physical Plan ==# PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$1f0568d1f39bef59b4c969a5d620ba46*($0, $1, $2, $3)], correlate=[table(split(id,name,tel,country))], select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, VARCHAR(2147483647) a)], joinType=[INNER])# +- Calc(select=[id, JSON_VALUE(data, _UTF-16LE'$.name', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS name, JSON_VALUE(data, _UTF-16LE'$.tel', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS tel, JSON_VALUE(data, _UTF-16LE'$.addr.country', FLAG(NULL), FLAG(ON EMPTY), FLAG(NULL), FLAG(ON ERROR)) AS country])# +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])## == Optimized Execution Plan ==# PythonCorrelate(invocation=[*org.apache.flink.table.functions.python.PythonTableFunction$1f0568d1f39bef59b4c969a5d620ba46*($0, $1, $2, $3)], correlate=[table(split(id,name,tel,country))], select=[id,name,tel,country,a], rowType=[RecordType(BIGINT id, VARCHAR(2147483647) name, VARCHAR(2147483647) tel, VARCHAR(2147483647) country, VARCHAR(2147483647) a)], joinType=[INNER])# +- Calc(select=[id, JSON_VALUE(data, '$.name', NULL, ON EMPTY, NULL, ON ERROR) AS name, JSON_VALUE(data, '$.tel', NULL, ON EMPTY, NULL, ON ERROR) AS tel, JSON_VALUE(data, '$.addr.country', NULL, ON EMPTY, NULL, ON ERROR) AS country])# +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_249535355, source: [PythonInputFormatTableSource(id, data)]]], fields=[id, data])if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")basic_operations()
4. 运行代码
python table_api_basic_operations.py
至此,Pyflink入门TableAPI基本操作代码编写完成。
更多实战详情请关注字节智传公众号

往期精彩
16. FlinkSql 集成 hive catalog模式进行读写数据
17. PyFlink 集成 hive catalog模式读写数据
18. PyFlink 集成 hive catalog模式建设数仓
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




