暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

30. PyFlink入门--DataStream API 连接ElasticSearch操作案例

大数据技能圈 2023-01-05
39

PyFlink 需要 Python 版本(3.6、3.7、3.8 或 3.9)。请运行以下命令以确保它满足要求:

    python --version

    1.创建python3.9虚拟环境,具体请参考:

    ① . Jupyter Notebook介绍、安装及使用

    ②. Jupyter Notebook中配置多版本Python

    2. 安装PyFlink

      python -m pip install apache-flink==1.16.0

      如果速度太慢,可以设置国内源:

        python -m pip install apache-flink==1.16.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

        注意:从 Flink 1.11 开始,支持在 Windows 上本地运行 PyFlink 作业,因此你可以在 Windows 上开发和调试 PyFlink 作业。

        3. 编写代码

        elasticsearch.py
          import logging
          import sys


          from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, FlushBackoffType, \
          ElasticsearchEmitter


          from pyflink.common import Types
          from pyflink.datastream import StreamExecutionEnvironment
          from pyflink.datastream.connectors import DeliveryGuarantee




          import logging
          import sys


          from pyflink.datastream.connectors.elasticsearch import Elasticsearch6SinkBuilder, \
          Elasticsearch7SinkBuilder, FlushBackoffType, ElasticsearchEmitter


          from pyflink.common import Types
          from pyflink.datastream import StreamExecutionEnvironment
          from pyflink.datastream.connectors import DeliveryGuarantee




          def write_to_es6(env):
          ELASTICSEARCH_SQL_CONNECTOR_PATH = \
          'file:///opt/software/flink-1.16.0/lib/flink-sql-connector-elasticsearch6-1.16.0.jar'
          env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)


          ds = env.from_collection(
          [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
          type_info=Types.MAP(Types.STRING(), Types.STRING()))


          es_sink = Elasticsearch6SinkBuilder() \
          .set_emitter(ElasticsearchEmitter.static_index('foo', 'id', 'bar')) \
          .set_hosts(['localhost:9200']) \
          .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
          .set_bulk_flush_max_actions(1) \
          .set_bulk_flush_max_size_mb(2) \
          .set_bulk_flush_interval(1000) \
          .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
          .set_connection_username('foo') \
          .set_connection_password('bar') \
          .set_connection_path_prefix('foo-bar') \
          .set_connection_request_timeout(30000) \
          .set_connection_timeout(31000) \
          .set_socket_timeout(32000) \
          .build()


          ds.sink_to(es_sink).name('es6 sink')


          env.execute()




          def write_to_es6_dynamic_index(env):
          ELASTICSEARCH_SQL_CONNECTOR_PATH = \
          'file:///path/to/flink-sql-connector-elasticsearch6-1.16.0.jar'
          env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)


          ds = env.from_collection(
          [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
          type_info=Types.MAP(Types.STRING(), Types.STRING()))


          es_sink = Elasticsearch6SinkBuilder() \
          .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id', 'bar')) \
          .set_hosts(['localhost:9200']) \
          .build()


          ds.sink_to(es_sink).name('es6 dynamic index sink')


          env.execute()




          def write_to_es7(env):
          ELASTICSEARCH_SQL_CONNECTOR_PATH = \
          'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
          env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)


          ds = env.from_collection(
          [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
          type_info=Types.MAP(Types.STRING(), Types.STRING()))


          es7_sink = Elasticsearch7SinkBuilder() \
          .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
          .set_hosts(['localhost:9200']) \
          .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
          .set_bulk_flush_max_actions(1) \
          .set_bulk_flush_max_size_mb(2) \
          .set_bulk_flush_interval(1000) \
          .set_bulk_flush_backoff_strategy(FlushBackoffType.CONSTANT, 3, 3000) \
          .set_connection_username('foo') \
          .set_connection_password('bar') \
          .set_connection_path_prefix('foo-bar') \
          .set_connection_request_timeout(30000) \
          .set_connection_timeout(31000) \
          .set_socket_timeout(32000) \
          .build()


          ds.sink_to(es7_sink).name('es7 sink')


          env.execute()




          def write_to_es7_dynamic_index(env):
          ELASTICSEARCH_SQL_CONNECTOR_PATH = \
          'file:///path/to/flink-sql-connector-elasticsearch7-1.16.0.jar'
          env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)


          ds = env.from_collection(
          [{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
          type_info=Types.MAP(Types.STRING(), Types.STRING()))


          es7_sink = Elasticsearch7SinkBuilder() \
          .set_emitter(ElasticsearchEmitter.dynamic_index('name', 'id')) \
          .set_hosts(['localhost:9200']) \
          .build()


          ds.sink_to(es7_sink).name('es7 dynamic index sink')


          env.execute()




          if __name__ == '__main__':
          logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")


          env = StreamExecutionEnvironment.get_execution_environment()
          env.set_parallelism(1)


          print("start writing data to elasticsearch6")
          write_to_es6(env)
          write_to_es6_dynamic_index(env)


          print("start writing data to elasticsearch7")
          write_to_es7(env)
          write_to_es7_dynamic_index(env)

          4. 运行代码

            python elasticsearch.py

            至此,Pyflink入门DataStream API 基础算子操作案例代码编写完成。

            更多实战详情请关注字节智传公众号

            往期精彩

            16. FlinkSql 集成 hive catalog模式进行读写数据

            17. PyFlink 集成 hive catalog模式读写数据

            18. PyFlink 集成 hive catalog模式建设数仓

            22. 离线数仓实战项目

            23. 实时数仓实战项目

            24. PyFlink入门--DataStream API wordcount案例

            25. PyFlink入门--Table API wordcount案例

            26. PyFlink入门--Table DataStream 相互转换案例

            27. 祝大家新年快乐,2023冲冲冲

            28. PyFlink入门--Table API 处理json数据操作案例

            29. PyFlink入门--DataStream API 基础算子操作案例

            文章转载自大数据技能圈,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论