暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

如何在flink程序中访问达梦数据库?

奥思立方 2021-07-30
1076

背景

使用flink消费kafka中事件,计算规则以及维度数据保存在达梦数据库中。

达梦数据库

达梦数据库管理系统是达梦公司推出的具有完全自主知识产权的高性能数据库管理系统,简称DM。

安装达梦数据库驱动

将达梦的jdbc驱动安装到maven local仓库中

start cmd k "%mvn% install:install-file -Dfile=Dm7JdbcDriver17.jar -DgroupId=com.dm -DartifactId=Dm7JdbcDriver -Dversion=1.7 -Dpackaging=jar"

复制

引入maven依赖

        <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.7.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.dm</groupId>
<artifactId>Dm7JdbcDriver</artifactId>
<version>1.7</version>
<scope>compile</scope>
</dependency>

复制

通过jdbc进行连接

public class DmRichMapFunction<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
private static final Logger logger = LoggerFactory.getLogger(DmRichMapFunction.class);
private static final String dmjdbcString = "dm.jdbc.driver.DmDriver";
protected Connection connect = null;

public void loadJdbcDriver() throws SQLException {
try {
System.out.println("Loading JDBC Driver...");
// 加载 JDBC 驱动程序
//DriverManager.registerDriver(new dm.jdbc.driver.DmDriver());
Class.forName(dmjdbcString);
} catch (ClassNotFoundException e) {
throw new SQLException("Load JDBC Driver Error1: " + e.getMessage());
} catch (Exception ex) {
throw new SQLException("Load JDBC Driver Error : "
+ ex.getMessage());
}
}


@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ParameterTool parameterTool = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String user = parameterTool.get("u", "");
String passwd = parameterTool.get("p", "");
String address = parameterTool.get("a", "localhost:3306");
String url = "jdbc:dm://" + address;

logInfo("url:" + url + ",user:" + user );
loadJdbcDriver();
connect = DriverManager.getConnection(url, user, passwd);
}

@Override
public void close() throws Exception {

if (connect != null) {
connect.close();
}
super.close();
}


@Override
public void flatMap(IN value, Collector<OUT> out) throws Exception {

}
}

复制

过程中遇到的问题

默认的flink-java中是未引入jdbc的支持,需要单独引入flink-jdbc开始想通过withParameters(configuration)方法传递数据库相关参数到FlatMapFunction中,后面发现该方法只在DataSet上使用而不可以在DataStream上使用。最后采用getConfig().setGlobalJobParameters方法传递配置参数。

environment.getConfig().setGlobalJobParameters(parameterTool);
ParameterTool parameterTool = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

复制


文章转载自奥思立方,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论