背景
使用flink消费kafka中事件,计算规则以及维度数据保存在达梦数据库中。
达梦数据库
达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的高性能数据库管理系统,简称DM。
安装达梦数据库驱动
将达梦的jdbc驱动安装到maven local仓库中
start cmd k "%mvn% install:install-file -Dfile=Dm7JdbcDriver17.jar -DgroupId=com.dm -DartifactId=Dm7JdbcDriver -Dversion=1.7 -Dpackaging=jar"
复制
引入maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.dm</groupId>
<artifactId>Dm7JdbcDriver</artifactId>
<version>1.7</version>
<scope>compile</scope>
</dependency>复制
通过jdbc进行连接
public class DmRichMapFunction<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
private static final Logger logger = LoggerFactory.getLogger(DmRichMapFunction.class);
private static final String dmjdbcString = "dm.jdbc.driver.DmDriver";
protected Connection connect = null;
public void loadJdbcDriver() throws SQLException {
try {
System.out.println("Loading JDBC Driver...");
// 加载 JDBC 驱动程序
//DriverManager.registerDriver(new dm.jdbc.driver.DmDriver());
Class.forName(dmjdbcString);
} catch (ClassNotFoundException e) {
throw new SQLException("Load JDBC Driver Error1: " + e.getMessage());
} catch (Exception ex) {
throw new SQLException("Load JDBC Driver Error : "
+ ex.getMessage());
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ParameterTool parameterTool = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String user = parameterTool.get("u", "");
String passwd = parameterTool.get("p", "");
String address = parameterTool.get("a", "localhost:3306");
String url = "jdbc:dm://" + address;
logInfo("url:" + url + ",user:" + user );
loadJdbcDriver();
connect = DriverManager.getConnection(url, user, passwd);
}
@Override
public void close() throws Exception {
if (connect != null) {
connect.close();
}
super.close();
}
@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {
}
}复制
过程中遇到的问题
•默认的flink-java中是未引入jdbc的支持,需要单独引入flink-jdbc•开始想通过withParameters(configuration)方法传递数据库相关参数到FlatMapFunction中,后面发现该方法只在DataSet上使用而不可以在DataStream上使用。最后采用getConfig().setGlobalJobParameters方法传递配置参数。
environment.getConfig().setGlobalJobParameters(parameterTool);
ParameterTool parameterTool = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();复制
文章转载自奥思立方,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
SQLark V3.4 更新 | 百灵说,PG 和春天一起来了!
达梦数据
46次阅读
2025-04-17 10:41:51
达梦数据共享集群在湖北银行绩效考核系统上线
达梦数据
37次阅读
2025-04-23 11:19:07
权威双认达梦实力丨武汉数博会秀创新成果,行业峰会斩获三省殊荣
达梦数据
31次阅读
2025-04-24 09:53:34
达梦数据2024年年度报告正式发布
达梦数据
26次阅读
2025-04-15 09:49:32
全民国家安全教育日|守护数据堡垒 筑牢安全防线
达梦数据
20次阅读
2025-04-15 09:49:33
达梦“1+X”证书|数据库管理系统职业技能等级证书申报工作已启动
达梦数据
17次阅读
2025-04-11 15:34:22
达梦数据加入“可信数据空间发展伙伴计划”
达梦数据
15次阅读
2025-04-25 10:10:11
七载征程,图启新章 | 蜀天梦图七周年:以自主之“图”,绘就数字未来
达梦数据
13次阅读
2025-04-28 10:18:37
达梦数据2025年第一次投资者交流会成功召开
达梦数据
12次阅读
2025-05-06 10:21:10
达梦数据X久其金建 以安全高效智能方案革新企业EPM
达梦数据
10次阅读
2025-04-25 10:10:10