上篇文章展示了Flink连接Kafka集群的代码,平时我们做统计分析,经常会用到FlinkSQL,这里就贴一下FlinkSQL消费Kafka数据存入Mysql的代码实例,更多实战内容关注微信公众号:“大数据开发运维架构”
版本信息:
flink1.9.0
kafka0.10.0
mysql5.6.40
废话不多说直接上实战代码:
1.这里mysql数据库recommend中有一张表student,创建表语句:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for student
-- ----------------------------
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
`id` int(64) NULL DEFAULT NULL,
`name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`course` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
`score` double(128, 0) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
SET FOREIGN_KEY_CHECKS = 1;
复制
2.对应student表的实体类:
package com.hadoop.ljs.flink.sql;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-03-01 07:50
* @version: v1.0
* @description: com.hadoop.ljs.flink.sql
*/
public class Student {
/*唯一ID*/
int id;
/*名字*/
String name;
/*课程*/
String course;
/*分数*/
double score;
public Student(Integer f0, String f1, String f2, Double f3) {
id=f0;
name=f1;
course=f2;
score=f3;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCourse() {
return course;
}
public void setCourse(String course) {
this.course = course;
}
public double getScore() {
return score;
}
public void setScore(double score) {
this.score = score;
}
}
复制
3.自定义Sink类,存数据到mysql中:
package com.hadoop.ljs.flink.sql;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-03-01 07:48
* @version: v1.0
* @description: com.hadoop.ljs.flink.sql
*/
public class SinkStudent2MySQL extends RichSinkFunction<Student> {
public static final String url="jdbc:mysql://10.124.165.31:3306/recommend??useUnicode=true&characterEncoding=UTF-8";
public static final String userName="root";
public static final String password="123456a?";
private static final long serialVersionUID = -4443175430371919407L;
PreparedStatement ps;
private Connection connection;
/**这里的open只调用一次
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "replace into student(id,name,course,score) values(?, ?, ?,?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param context
* @throws Exception
*/
@Override
public void invoke(Student student, Context context) throws Exception {
/*对每一条数据进行处理,组装数据*/
ps.setLong(1, student.getId());
ps.setString(2,student.getName());
ps.setString(3, student.getCourse());
ps.setDouble(4,student.getScore());
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection(url,userName,password);
System.out.println("msql连接成功!");
} catch (Exception e) {
System.out.println("msql连接失败,错误信息"+ e.getMessage());
}
return con;
}
}
复制
4.主函数类,从kafka接收消息,对每行数据进行拆分,注册为临时表,调用自定义SinkStudent2MySQL类,存入数据到student表中:
package com.hadoop.ljs.flink.sql;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import java.util.Properties;
/**
* @author: Created By lujisen
* @company ChinaUnicom Software JiNan
* @date: 2020-03-01 07:47
* @version: v1.0
* @description: com.hadoop.ljs.flink.sql
*/
public class FlinkKafkaKerberosSQLConsumer {
public static final String krb5Conf="D:\\kafkaSSL\\krb5.conf";
public static final String kafkaJaasConf="D:\\kafkaSSL\\kafka_client_jaas.conf";
public static final String topic="topic2";
public static final String consumerGroup="test_topic2";
public static final String bootstrapServer="salver31.hadoop.unicom:6667,salver32.hadoop.unicom:6667";
public static void main(String[] args) throws Exception {
//在windows中设置JAAS,也可以通过-D方式传入
System.setProperty("java.security.krb5.conf", krb5Conf);
System.setProperty("java.security.auth.login.config", kafkaJaasConf);
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setDefaultLocalParallelism(1);
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(senv, bsSettings);
FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>(topic, new SimpleStringSchema(),getComsumerProperties());
DataStream<String> stream = senv.addSource(myConsumer);
stream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
if(null==value||value.split(",").length!=4){
return false;
}
return true;
}
});
DataStream<Tuple4<Integer, String, String, Double>> map = stream.map(new MapFunction<String, Tuple4<Integer, String, String, Double>>() {
private static final long serialVersionUID = 1471936326697828381L;
@Override
public Tuple4<Integer, String, String, Double> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple4<>(Integer.valueOf(split[0]), split[1], split[2], Double.valueOf(split[3]));
}
});
//将数据注册为临时表,并制定fields
tableEnv.registerDataStream("student", map, "id,name,course,score");
Table sqlQuery = tableEnv.sqlQuery("select id,name,course,score from student");
DataStream<Tuple4<Integer, String,String, Double>> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING, Types.STRING,Types.DOUBLE));
appendStream.print();
/*将每条数据转换成student实体类数据,sink到mysql中*/
appendStream.map(new MapFunction<Tuple4<Integer, String,String, Double>, Student>() {
private static final long serialVersionUID = -4770965496944515917L;
@Override
public Student map(Tuple4<Integer, String,String, Double> value) throws Exception {
return new Student(value.f0, value.f1, value.f2,value.f3);
}
}).addSink(new SinkStudent2MySQL());
senv.execute("FlinkKafkaKerberosSQLConsumer");
}
/*获取Kafka消费端配置*/
private static Properties getComsumerProperties() {
Properties props = new Properties();
props.put("bootstrap.servers",bootstrapServer);
props.put("group.id",consumerGroup);
props.put("auto.offset.reset", "earliest");
/*keberos集群,必须制定以下三项配置*/
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("sasl.mechanism", "GSSAPI");
return props;
}
}
复制
5.这里贴下pom.xml:
<properties>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<hbase.version>1.2.5</hbase.version>
<kafka.version>0.10.1.0</kafka.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
复制
6.发送数据到kafka,每条记录用逗号“,”拆分:
1001,name1,yuwen1,81
1002,name2,yuwen2,82
1003,name3,yuwen3,83
复制
发送数据截图:
最近一些文章都是根据粉丝留言进行编写实战代码,如有其他需求直接给我公众号留言即可,觉得有用,多给转转朋友圈,谢谢关注!!!
文章转载自大数据开发运维架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。