PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:
python --version
1.创建python3.9虚拟环境,具体请参考:
②. Jupyter Notebook中配置多版本Python
2. 安装PyFlink
python -m pip install apache-flink==1.16.0
如果速度太慢,可以设置国内源:
python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。
3. 编写代码
elasticsearch.pyimport loggingimport sysfrom pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, FlushBackoffType, \ElasticsearchEmitterfrom pyflink.common import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.datastream.connectors import DeliveryGuaranteeimport loggingimport sysfrom pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, \Elasticsearch7SinkBuilder, FlushBackoffType, ElasticsearchEmitterfrom pyflink.common import Typesfrom pyflink.datastream import StreamExecutionEnvironmentfrom pyflink.datastream.connectors import DeliveryGuaranteedef write_to_es6(env):ELASTICSEARCH_SQL_CONNECTOR_PATH = \'file:///opt/software/flink-1.16.0/lib/flink-sql-connector-elasticsearch6-1.16.0.jar'env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)ds = env.from_collection([{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],type_info=Types.MAP(Types.STRING(), Types.STRING()))es_sink = Elasticsearch6SinkBuilder() \.set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \.set_hosts(['localhost:9200']) \.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \.set_bulk_flush_max_actions(1) \.set_bulk_flush_max_size_mb(2) \.set_bulk_flush_interval(1000) \.set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \.set_connection_username('foo') \.set_connection_password('bar') \.set_connection_path_prefix('foo-bar') \.set_connection_request_timeout(30000) \.set_connection_timeout(31000) \.set_socket_timeout(32000) \.build()ds.sink_to(es_sink).name('es6 sink')env.execute()def write_to_es6_dynamic_index(env):ELASTICSEARCH_SQL_CONNECTOR_PATH = \'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)ds = env.from_collection([{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],type_info=Types.MAP(Types.STRING(), Types.STRING()))es_sink = Elasticsearch6SinkBuilder() \.set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \.set_hosts(['localhost:9200']) \.build()ds.sink_to(es_sink).name('es6 dynamic index sink')env.execute()def write_to_es7(env):ELASTICSEARCH_SQL_CONNECTOR_PATH = \'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)ds = env.from_collection([{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],type_info=Types.MAP(Types.STRING(), Types.STRING()))es7_sink = Elasticsearch7SinkBuilder() \.set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \.set_hosts(['localhost:9200']) \.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \.set_bulk_flush_max_actions(1) \.set_bulk_flush_max_size_mb(2) \.set_bulk_flush_interval(1000) \.set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \.set_connection_username('foo') \.set_connection_password('bar') \.set_connection_path_prefix('foo-bar') \.set_connection_request_timeout(30000) \.set_connection_timeout(31000) \.set_socket_timeout(32000) \.build()ds.sink_to(es7_sink).name('es7 sink')env.execute()def write_to_es7_dynamic_index(env):ELASTICSEARCH_SQL_CONNECTOR_PATH = \'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)ds = env.from_collection([{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],type_info=Types.MAP(Types.STRING(), Types.STRING()))es7_sink = Elasticsearch7SinkBuilder() \.set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \.set_hosts(['localhost:9200']) \.build()ds.sink_to(es7_sink).name('es7 dynamic index sink')env.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)print("start writing data to elasticsearch6")write_to_es6(env)write_to_es6_dynamic_index(env)print("start writing data to elasticsearch7")write_to_es7(env)write_to_es7_dynamic_index(env)
4. 运行代码
python elasticsearch.py
至此,Pyflink入门DataStream API 基础算子操作案例代码编写完成。
更多实战详情请关注字节智传公众号

往期精彩
16. FlinkSql 集成 hive catalog模式进行读写数据
17. PyFlink 集成 hive catalog模式读写数据
18. PyFlink 集成 hive catalog模式建设数仓
24. PyFlink入门--DataStream API wordcount案例
25. PyFlink入门--Table API wordcount案例
26. PyFlink入门--Table DataStream 相互转换案例
文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




