暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

SLS使用Confluent-Kafka-Python实现Kafka消费,并写入Starrocks数据库持久化

skylines 2024-05-02
70

本期主要介绍如何使用Confluent-Kafka-Python实现Kafka消费阿里云SLS的日志内容,并写入到Starrocks数据库持久化。阿里云提供的Kafka消费方案,可以提供以下链接:使用Confluent-Kafka-Python实现Kafka消费_日志服务(SLS)-阿里云帮助中心 (aliyun.com)

大致流程如下:

SLS-->Confluent-Kafka-Python-->Kafka-->Confluent-Kafka-Python-->Starrocks

那从这里,我们也知道了,如果要完成这个消费任务,除了需要阿里云的API,还需要借助python、kafka和Confluent-Kafka-Python等组件。以下就通过这些组件,将任务拆分成多个部分,进行一些相应测试,讲述消费的实现过程。

假如当前环境已经配备了python,这次操作背景,我的环境已经配备了python3.

--安装Kafka

kafka下载地址:Apache Kafka

    --创建安装目录
    mkdir -p opt/soft/kafka_data
    mkdir -p opt/soft/kafka_data/zookeeper
    mkdir -p opt/soft/kafka_data/log
    mkdir -p opt/soft/kafka_data/log/kafka
    mkdir -p opt/soft/kafka_data/log/zookeeper




    --添加kafka配置
    # vi server.properties
    broker.id=0
    port=9092
    host.name=172.17.7.11
    log.dirs=/opt/soft/kafka_data/log/kafka
    zookeeper.connect=localhost:2181


    --添加zookeeper配置
    dataDir=/opt/soft/kafka_data/zookeeper
    dataLogDir=/opt/soft/kafka_data/log/zookeeper
    clientPort=2181
    maxClientCnxns=100
    tickTimes=2000
    initLimit=10
    syncLimit=5


    --添加启动Kafka的脚本
    vi kafka_start.sh
    #!/bin/sh
    #启动zookeeper
    /opt/kafka_2.12-3.6.2/bin/zookeeper-server-start.sh opt/kafka_2.12-3.6.2/config/zookeeper.properties &

    sleep 3 #等3秒后执行

    #启动kafka
    /opt/kafka_2.12-3.6.2/bin/kafka-server-start.sh opt/kafka_2.12-3.6.2/config/server.properties &




    --添加关闭Kafka的脚本
    vi kafka_stop.sh
    #!/bin/sh
    #关闭zookeeper
    /opt/kafka_2.12-3.6.2/bin/zookeeper-server-stop.sh opt/kafka_2.12-3.6.2/config/zookeeper.properties &

    sleep 3 #等3秒后执行

    #关闭kafka
    /opt/kafka_2.12-3.6.2/bin/kafka-server-stop.sh opt/kafka_2.12-3.6.2/config/server.properties &
    复制


    --安装Confluent-Kafka和mysql-connector-python

      pip install confluent-kafka mysql-connector-python
      复制

      安装两个python客户端组件,为了可以通过python连接Kafka和Starrocks数据库,实现日志生产端和消费端,以及数据存储端之间的连接通讯。

      --消费SLS的配置

        import sys
        import os


        from confluent_kafka import Consumer, KafkaError, KafkaException


        endpoint = "cn-huhehaote.log.aliyuncs.com"
        """
        阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
        此处以把AccessKey和AccessKeySecret保存在环境变量为例说明。您可以根据业务需要,保存到配置文件里。
        强烈建议不要把AccessKey和AccessKeySecret保存到代码里,会存在密钥泄漏风险。
        """
        accessKeyId = os.getenv("SLS_ACCESS_KEY_ID")
        accessKeySecret = os.getenv("SLS_ACCESS_KEY_SECRET")
        project = "etl-dev"
        logstore = "test"
        port = "10012"
        groupId = "kafka-test"


        kafkaEndpoint = "{}.{}:{}".format(project, endpoint, port)


        groupId = "kafka-test2112"


        c = Consumer({
        "bootstrap.servers": kafkaEndpoint,
        "sasl.mechanism": "PLAIN",
        "security.protocol": "sasl_ssl",
        "sasl.username": project,
        "sasl.password": "%s#%s" % (accessKeyId, accessKeySecret),
        "group.id": groupId,
        "enable.auto.commit": "true",
        "auto.commit.interval.ms": 30000,
        "session.timeout.ms": 120000,
        "auto.offset.reset": "earliest",
        "max.poll.interval.ms": 130000,
        "heartbeat.interval.ms": 5000,
        })




        c.subscribe([logstore])


        while True:
        msg = c.poll(1.0)


        if msg is None:
        continue
        if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue


        print('Received message: {}'.format(msg.value().decode('utf-8')))


        c.close()
        复制

        可以直接使用阿里云官网提供的示例代码,根据要求提示,补充配置参数就可以消费SLS的内容到Kafka上了。

        --消费Kafka的内容到Starrocks

          cat >kafka_test_001.py
          from confluent_kafka import Consumer, KafkaError
          import mysql.connector
          import json


          # Kafka消费者配置
          kafka_conf = {
          'bootstrap.servers': '172.17.7.11', # 替换为你的Kafka broker地址
          'group.id': '100001', # 替换为你的消费者组ID
          'auto.offset.reset': 'earliest',
          }


          # StarRocks数据库连接配置
          starrocks_conf = {
          'host': '172.17.199.36', # 替换为你的StarRocks主机地址
          'port': 9030, # 替换为你的StarRocks端口
          'user': 'user', # 替换为你的StarRocks用户名
          'password': 'XXXXXXXXX', # 替换为你的StarRocks密码
          'database': 'mytest', # 替换为你的StarRocks数据库名称
          }


          # 创建Kafka消费者实例
          consumer = Consumer(kafka_conf)
          consumer.subscribe(['mykafka_test03']) # 替换为你的Kafka主题


          # 创建StarRocks连接
          starrocks_connection = mysql.connector.connect(**starrocks_conf)
          cursor = starrocks_connection.cursor()


          try:
          while True:
          msg = consumer.poll(1.0)
          if msg is None:
          continue
          if msg.error():
          if msg.error().code() == KafkaError._PARTITION_EOF:
          continue
          else:
          print(f"Kafka error: {msg.error()}")
          break


          try:
          # 解析Kafka消息的内容
          message_content = json.loads(msg.value().decode('utf-8'))


          # 构建SQL插入语句,表名和字段应根据StarRocks表的结构来定义
          sql = "INSERT INTO mytest_kafka_tab (id, city,country) VALUES (%s, %s,%s)"
          data = (message_content['id'], message_content['city'],message_content['country'])


          # 执行SQL语句并提交到StarRocks
          cursor.execute(sql, data)
          starrocks_connection.commit()


          except json.JSONDecodeError as e:
          print(f"Error decoding JSON: {e}")
          except mysql.connector.Error as e:
          print(f"Error inserting data into StarRocks: {e}")


          finally:
          # 清理Kafka消费者和StarRocks连接
          consumer.close()
          cursor.close()
          starrocks_connection.close()
          复制


          以上,基本把整个数据流的链路打通了。以下是部分消费环节的拆分测试。

          --测试一:Kafka消费json数据并写入Starrocks数据库

          ##创建一个topic

            1. 在一个终端执行创建生产者:(推消息到wd_test)
            cd /opt/kafka_2.12-3.6.2/bin/ #进入kafka目录
            ./kafka-console-producer.sh --broker-list 172.17.7.11:9092 --topic mykafka_test03 #(注:wd_test你要建立的topic名)




            2. 在另一个终端执行创建消费者:(从wd_test上消费消息)
            cd /opt/kafka_2.12-3.6.2/bin/ #进入kafka目录
            ./kafka-console-consumer.sh --bootstrap-server 172.17.7.11:9092 --topic mykafka_test03 #消费wd_test的topic消息
            复制

            ##添加消费数据

              {"id":1001,"city":"New York","country":"Amarica"}
              {"id":1002,"city":"Los Angeles","country":"Amarica"}
              {"id":1003,"city":"Chicago","country":"Amarica"}
              {"id":1004,"city":"Florid","country":"Amarica"}
              {"id":1006,"city":"Florid","country":"Amarica"}


              {"id":"1005","city":"New York","country":"Amarica"}
              复制

              ##效果展示

              #生产端

              #消费端

              #Starrocks数据库存储端

              --测试二:使用Confluent-Kafka-Python将SLS的数据并写入Starrocks数据库

              #测试片段代码

                cat >new_test.py
                import json
                import pymysql


                # StarRocks数据库连接配置
                starrocks_config = {
                'host': '172.17.199.36',
                'port': 9030, # StarRocks MySQL协议端口
                    'user': 'user',
                'password': 'xxxxxxxx',
                'database': 'example_db'
                }


                # 假设JSON数据存储在 'log.json' 文件中
                json_file_path = '/root/pythondir/jsonfile/mytest.json'


                # 从文件中读取JSON字符串
                with open(json_file_path, 'r') as file:
                json_str = file.read()


                # 将JSON字符串解析为Python字典
                log_entry = json.loads(json_str)


                # 映射和转换数据以匹配StarRocks表结构
                # 这里需要根据你的StarRocks表的实际结构来调整
                starrocks_data = (
                log_entry['winlog']['record_id'],
                log_entry['winlog']['api'],
                log_entry['winlog']['computer_name'],
                log_entry['log']['level'],
                log_entry['host']['hostname'],
                log_entry['host']['mac'],
                log_entry['host']['architecture'],
                log_entry['host']['os']['name'],
                log_entry['host']['os']['platform'],
                log_entry['host']['os']['kernel'],
                log_entry['host']['os']['type'],
                log_entry['host']['os']['build'],
                log_entry['host']['os']['version'],
                log_entry['host']['os']['family'],
                log_entry['host']['id'],
                log_entry['host']['ip'],
                log_entry['agent']['type'],
                log_entry['agent']['version'],
                log_entry['winlog']['channel'],
                log_entry['winlog']['provider_name'],
                log_entry['winlog']['task'],
                log_entry['winlog']['keywords'],
                log_entry['winlog']['event_id'],
                log_entry['winlog']['event_data']['TargetUserName'],
                log_entry['winlog']['event_data']['TargetDomainName'],
                log_entry['winlog']['event_data']['IpAddress'],
                log_entry['winlog']['event_data']['LogonGuid'],
                log_entry['@timestamp'],
                log_entry['message'],
                log_entry['event']['action'],
                log_entry['event']['outcome'],
                log_entry['event']['provider'],
                log_entry['event']['code'],
                log_entry['event']['kind']
                # ...其他字段映射
                )


                # 插入数据到StarRocks的函数
                def insert_into_starrocks(data):
                # 构建插入SQL语句
                insert_sql = """
                INSERT INTO alicloud_sls_even_log_audit_tbl (record_id, api,computer_name,level,hostname,mac,architecture,os_name,platform,kernel,type,build,version,family,host_id,host_ip,agent_type,agent_version,winlog_channel,winlog_provider_name,winlog_task,winlog_keywords,winlog_event_id,winlog_event_data_tuname,winlog_event_data_tdoname,winlog_event_data_ip,winlog_event_data_logid,timestamp,message,event_action,event_outcome,event_provider,event_code,event_kind)
                VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,json_arry%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s);
                """

                # 连接到StarRocks数据库
                connection = pymysql.connect(**starrocks_config)
                try:
                with connection.cursor() as cursor:
                # 执行SQL语句
                cursor.execute(insert_sql, data)
                # 提交事务
                connection.commit()
                print("Data inserted into StarRocks successfully.")
                except Exception as e:
                print(f"Failed to insert data into StarRocks: {e}")
                finally:
                connection.close()


                # 调用函数,插入数据
                insert_into_starrocks(starrocks_data)
                复制

                #存储表结构

                  CREATE TABLE `alicloud_sls_event_log_audit_tbl` (
                  `record_id` int(11) NOT NULL COMMENT "",
                  `api` varchar(65533) NULL COMMENT "",
                  `computer_name` varchar(65533) NULL COMMENT "",
                  `level` varchar(65533) NULL COMMENT "",
                  `hostname` varchar(65533) NULL COMMENT "",
                  `mac` varchar(65533) NULL COMMENT "",
                  `architecture` varchar(65533) NULL COMMENT "",
                  `os_name` varchar(65533) NULL COMMENT "",
                  `platform` varchar(65533) NULL COMMENT "",
                  `kernel` varchar(65533) NULL COMMENT "",
                  `type` varchar(65533) NULL COMMENT "",
                  `build` varchar(65533) NULL COMMENT "",
                  `version` varchar(65533) NULL COMMENT "",
                  `family` varchar(65533) NULL COMMENT "",
                  `host_id` varchar(65533) NULL COMMENT "",
                  `host_ip` ARRAY<STRING> NULL COMMENT "",
                  `agent_type` varchar(65533) NULL COMMENT "",
                  `agent_version` varchar(65533) NULL COMMENT "",
                  `winlog_channel` varchar(65533) NULL COMMENT "",
                  `winlog_provider_name` varchar(65533) NULL COMMENT "",
                  `winlog_task` varchar(65533) NULL COMMENT "",
                  `winlog_keywords` varchar(65533) NULL COMMENT "",
                  `winlog_event_id` varchar(65533) NULL COMMENT "",
                  `winlog_event_data_tuname` varchar(65533) NULL COMMENT "",
                  `winlog_event_data_tdoname` varchar(65533) NULL COMMENT "",
                  `winlog_event_data_ip` varchar(65533) NULL COMMENT "",
                  `winlog_event_data_logid` varchar(65533) NULL COMMENT "",
                  `timestamp` varchar(65533) NULL COMMENT "",
                  `message` varchar(65533) NULL COMMENT "",
                  `event_action` varchar(65533) NULL COMMENT "",
                  `event_outcome` varchar(65533) NULL COMMENT "",
                  `event_provider` varchar(65533) NULL COMMENT "",
                  `event_code` varchar(65533) NULL COMMENT "",
                  `event_kind` varchar(65533) NULL COMMENT ""
                  ) ENGINE=OLAP
                  PRIMARY KEY(`record_id`)
                  DISTRIBUTED BY HASH(`record_id`)
                  PROPERTIES (
                  "replication_num" = "2",
                  "in_memory" = "false",
                  "enable_persistent_index" = "true",
                  "replicated_storage" = "true",
                  "compression" = "LZ4"
                  );
                  复制

                  #效果展示

                  以上就是本次SLS消费的所有详细内容,希望对你有用。谢谢!


                  文章转载自skylines,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论