暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Flink实战:FlinkSQL接收开启Kerberos认证的Kafka集群存入MySQL

大数据开发运维架构 2020-03-01
1399

    上篇文章展示了Flink连接Kafka集群的代码,平时我们做统计分析,经常会用到FlinkSQL,这里就贴一下FlinkSQL消费Kafka数据存入Mysql的代码实例,更多实战内容关注微信公众号:“大数据开发运维架构”

版本信息:

    flink1.9.0

    kafka0.10.0

    mysql5.6.40

废话不多说直接上实战代码:   

1.这里mysql数据库recommend中有一张表student,创建表语句:

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    -- ----------------------------
    -- Table structure for student
    -- ----------------------------
    DROP TABLE IF EXISTS `student`;
    CREATE TABLE `student` (
    `id` int(64) NULL DEFAULT NULL,
    `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `course` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
    `score` double(128, 0) NULL DEFAULT NULL
    ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;

    SET FOREIGN_KEY_CHECKS = 1;
    复制

    2.对应student表的实体类:

      package com.hadoop.ljs.flink.sql;
      /**
      * @author: Created By lujisen
      * @company ChinaUnicom Software JiNan
      * @date: 2020-03-01 07:50
      * @version: v1.0
      * @description: com.hadoop.ljs.flink.sql
      */
      public class Student {
      /*唯一ID*/
      int id;
      /*名字*/
      String name;
      /*课程*/
      String course;
      /*分数*/
          double score;
      public Student(Integer f0, String f1, String f2, Double f3) {
      id=f0;
      name=f1;
      course=f2;
      score=f3;
          }
      public int getId() {
      return id;
          }
      public void setId(int id) {
      this.id = id;
          }
      public String getName() {
      return name;
          }
      public void setName(String name) {
      this.name = name;
          }
      public String getCourse() {
      return course;
          }
      public void setCourse(String course) {
      this.course = course;
          }
      public double getScore() {
      return score;
          }
      public void setScore(double score) {
      this.score = score;
      }
      }
      复制

      3.自定义Sink类,存数据到mysql中:

        package com.hadoop.ljs.flink.sql;
        import org.apache.flink.configuration.Configuration;
        import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
        import java.sql.Connection;
        import java.sql.DriverManager;
        import java.sql.PreparedStatement;
        /**
        * @author: Created By lujisen
        * @company ChinaUnicom Software JiNan
        * @date: 2020-03-01 07:48
        * @version: v1.0
        * @description: com.hadoop.ljs.flink.sql
        */
        public class SinkStudent2MySQL extends RichSinkFunction<Student> {

        public static final String url="jdbc:mysql://10.124.165.31:3306/recommend??useUnicode=true&characterEncoding=UTF-8";
        public static final String userName="root";
        public static final String password="123456a?";
        private static final long serialVersionUID = -4443175430371919407L;
        PreparedStatement ps;
        private Connection connection;
        /**这里的open只调用一次
        * @param parameters
        * @throws Exception
        */
        @Override
        public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "replace into student(id,name,course,score) values(?, ?, ?,?);";
        ps = this.connection.prepareStatement(sql);
        }
        @Override
        public void close() throws Exception {
        super.close();
        if (connection != null) {
        connection.close();
        }
        if (ps != null) {
        ps.close();
        }
        }
        /**
        * 每条数据的插入都要调用一次 invoke() 方法
        *
        * @param context
        * @throws Exception
        */
        @Override
        public void invoke(Student student, Context context) throws Exception {
        /*对每一条数据进行处理,组装数据*/
        ps.setLong(1, student.getId());
        ps.setString(2,student.getName());
        ps.setString(3, student.getCourse());
        ps.setDouble(4,student.getScore());
        ps.executeUpdate();
            }
        private static Connection getConnection() {
        Connection con = null;
        try {
        Class.forName("com.mysql.jdbc.Driver");
        con = DriverManager.getConnection(url,userName,password);
        System.out.println("msql连接成功!");
        } catch (Exception e) {
        System.out.println("msql连接失败,错误信息"+ e.getMessage());
        }
        return con;
        }
        }
        复制

        4.主函数类,从kafka接收消息,对每行数据进行拆分,注册为临时表,调用自定义SinkStudent2MySQL类,存入数据到student表中:

          package com.hadoop.ljs.flink.sql;
          import org.apache.flink.api.common.functions.FilterFunction;
          import org.apache.flink.api.common.functions.MapFunction;
          import org.apache.flink.api.common.serialization.SimpleStringSchema;
          import org.apache.flink.api.common.typeinfo.Types;
          import org.apache.flink.api.java.tuple.Tuple4;
          import org.apache.flink.streaming.api.datastream.DataStream;
          import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
          import org.apache.flink.table.api.EnvironmentSettings;
          import org.apache.flink.table.api.Table;
          import org.apache.flink.table.api.java.StreamTableEnvironment;
          import java.util.Properties;
          /**
          * @author: Created By lujisen
          * @company ChinaUnicom Software JiNan
          * @date: 2020-03-01 07:47
          * @version: v1.0
          * @description: com.hadoop.ljs.flink.sql
          */
          public class FlinkKafkaKerberosSQLConsumer {
          public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";
          public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";
          public static final String topic="topic2";
          public static final String consumerGroup="test_topic2";
          public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";

          public static void main(String[] args) throws Exception {

          //在windows中设置JAAS,也可以通过-D方式传入
          System.setProperty("java.security.krb5.conf", krb5Conf);
                  System.setProperty("java.security.auth.login.config", kafkaJaasConf);

          StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
          senv.setDefaultLocalParallelism(1);
          EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
                  StreamTableEnvironment tableEnv = StreamTableEnvironment.create(senv, bsSettings);
                  
                  FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(),getComsumerProperties());
          DataStream<String> stream = senv.addSource(myConsumer);
          stream.filter(new FilterFunction<String>() {
          @Override
          public boolean filter(String value) throws Exception {
          if(null==value||value.split(",").length!=4){
          return false;
          }
          return true;
          }
          });
          DataStream<Tuple4<Integer, String, String, Double>> map = stream.map(new MapFunction<String, Tuple4<Integer, String, String, Double>>() {
          private static final long serialVersionUID = 1471936326697828381L;
          @Override
          public Tuple4<Integer, String, String, Double> map(String value) throws Exception {
          String[] split = value.split(",");
          return new Tuple4<>(Integer.valueOf(split[0]), split[1], split[2], Double.valueOf(split[3]));
          }
          });
          //将数据注册为临时表,并制定fields
                  tableEnv.registerDataStream("student", map, "id,name,course,score");
          Table sqlQuery = tableEnv.sqlQuery("select id,name,course,score from student");
          DataStream<Tuple4<Integer, String,String, Double>> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING, Types.STRING,Types.DOUBLE));
                  appendStream.print();
          /*将每条数据转换成student实体类数据,sink到mysql中*/
          appendStream.map(new MapFunction<Tuple4<Integer, String,String, Double>, Student>() {
          private static final long serialVersionUID = -4770965496944515917L;
          @Override
          public Student map(Tuple4<Integer, String,String, Double> value) throws Exception {

          return new Student(value.f0, value.f1, value.f2,value.f3);
          }
          }).addSink(new SinkStudent2MySQL());

          senv.execute("FlinkKafkaKerberosSQLConsumer");
          }
          /*获取Kafka消费端配置*/
          private static Properties getComsumerProperties() {
          Properties props = new Properties();
          props.put("bootstrap.servers",bootstrapServer);
          props.put("group.id",consumerGroup);
          props.put("auto.offset.reset", "earliest");
          /*keberos集群,必须制定以下三项配置*/
          props.put("security.protocol", "SASL_PLAINTEXT");
          props.put("sasl.kerberos.service.name", "kafka");
          props.put("sasl.mechanism", "GSSAPI");
          return props;
          }
          }
          复制

          5.这里贴下pom.xml:

            <properties>
            <flink.version>1.9.0</flink.version>
            <java.version>1.8</java.version>
            <scala.binary.version>2.11</scala.binary.version>
            <hbase.version>1.2.5</hbase.version>
            <kafka.version>0.10.1.0</kafka.version>
            </properties>

            <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>${flink.version}</version>
            </dependency>
            <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            </dependency>
            <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            </dependency>
            <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            </dependency>
            <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
            </dependency>
            复制

            6.发送数据到kafka,每条记录用逗号“,”拆分:

              1001,name1,yuwen1,81
              1002,name2,yuwen2,82
              1003,name3,yuwen3,83
              复制

              发送数据截图:

                  最近一些文章都是根据粉丝留言进行编写实战代码,如有其他需求直接给我公众号留言即可,觉得有用,多给转转朋友圈,谢谢关注!!!

              文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论