一、基本概念和用法
Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源。
当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD。这是因为结果是以数据框(DataFrame)返回的,且这样 Spark SQL操作轻松或便于连接其它数据源。
因为这种 JDBC 数据源不需要用户提供 ClassTag,所以它也更适合使用 Java 操作
二、工具类
1.jdbc连接工具类
package com.web.zhangyong168.cn.scala.utilimport java.sql.{Connection, DriverManager}import java.util/*** @description : jdbc连接工具类* @author zhangyong* @version 1.0.0* @date 2019/12/1 13:55*/object JDBCUtil {//从配置文件读取mysql的基本信息val properties= SparkJdbcUtil.getSparkJdbcConfig("mysql")val url: String =properties.getProperty("url")val driverClass:String =properties.getProperty("driver")val user:String =properties.getProperty("user")val password:String =properties.getProperty("password")var driverConnNum:Int=0/*** 连接数*/var connectionNum:Int=10/*** 连接池大小*/val poolSize:Int=200/*** 连接池同步列队*/val pool= new util.LinkedList[Connection]()/*** 加载Driver*/def getDriver(): Unit ={if(connectionNum>poolSize&&pool.isEmpty){print("当前暂无可用Connection")Thread.sleep(2000)getDriver()}else{Class.forName(driverClass)}}/*** 得到的链接* @return*/def getConnection() :Connection={println("step into this method")println("pool.isEmpty: " + pool.isEmpty)println("打印:"+pool)if(pool.isEmpty){getDriver()for(i <- 1 to connectionNum){ //创建10个连接val conn=DriverManager.getConnection(url,user,password)pool.push(conn)connectionNum += 1}}val connection:Connection=pool.pop() //从线程池所在的LinkedListreturn connection}/*** 关闭连接* @param connection*/def closeConn(connection: Connection):Unit={pool.push(connection)}}
2.构造SQL记录scala 语法
package com.web.zhangyong168.cn.scala.utilimport scala.collection.mutable/*** @description : 记录* @author zhangyong* @version 1.0.0* @date 2019/12/2 13:23*/class RecordOne {/*** 列名*/var columnName: List[String] = null/*** 列值*/var columnValue: Array[String] = null/*** 表名*/var tableName: String = null/*** 条件*/var whereOption: Map[String, String] = null/*** 添加单条数据** @return*/def getInsertSqlExp: String = {val sb: StringBuilder = new StringBuilder();sb.append("insert into " + tableName + "(")if (columnValue.length > 0 && columnName.size == columnValue.length) {val str: List[String] = columnNamefor (i <- columnName.indices) {sb.append(str(i) + ",")}sb.replace(sb.length - 1, sb.length, ")")sb.append("values(")for (i <- columnValue.indices) {sb.append("#{" + str(i) + "},")}}println(sb.substring(0, sb.toString.length - 1) + ")")sb.substring(0, sb.toString.length - 1) + ")"}/*** 删除SQL表达式* @return*/def deleteSqlExp: String = {val sb: StringBuilder = new StringBuilder();sb.append("delete from " + tableName)if (whereOption.size > 0) {sb.append(" where 1=1 ")for ((k, v) <- whereOption) {sb.append(" and " + k + "=#{" + v + "}")}}println(sb.toString())sb.toString()}/*** 修改SQL表达式* @return*/def updateSqlExp: String = {val sb: StringBuilder = new StringBuilder();sb.append("update " + tableName)if (columnName.size > 0 && columnValue.size == columnName.size)sb.append(" set ")var getColumnMap:mutable.Map[String,String]=new mutable.HashMap[String,String]for (i <- 0 to columnName.length -1) {println("打印:"+columnName(i))if (columnName(i) != null) {sb.append(columnName(i))sb.append("=#{" + columnName(i) + "},")getColumnMap = mutable.Map(columnName(i) -> columnName(i))}}sb.replace(sb.length - 1, sb.length, " ")if (whereOption.head._1.size > 0) {sb.append(" where 1=1 ")for ((k, v) <- whereOption) {val getkey: String = kif (getColumnMap.head._1.size > 0 && getColumnMap.contains(getkey)) {val mapColKey: String = getColumnMap.get(getkey).head.toStringif (mapColKey.equals(getkey)) {sb.append(" and " + k + "=#{" + k + "-" + k + "} ")}} else sb.append(" and " + k + "=#{" + k + "} ")}}println(sb.toString())sb.toString()}}
2.构造SQL记录 java 语法
package com.web.zhangyong168.cn.api.entity.storage;import com.web.zhangyong168.cn.entity.RecordOne;import java.util.HashMap;import java.util.Map;/*** @version 1.0.0* @Author zhangyong* @Description 记录* @Date 2019/10/19 15:42**/public class AccessOne extends RecordOne {/*** 表名*/public String tableName;/*** 条件*/public Map<String, Object> whereMap;/*** 添加单条* @return*/public String getInsertSqlExp() {StringBuffer sb = new StringBuffer();sb.append("insert into " + tableName + "(");if(this.getColumnValues().length>0&&this.getColumnValues().length== this.getColumnNames().length)for (int i = 0; i < this.getColumnValues().length; i++) {sb.append(this.getColumnName(i) + ",");}sb.replace(sb.length() - 1, sb.length(), ")");sb.append("values(");for (int i = 0; i < this.getColumnValues().length; i++) {sb.append("#{" + this.getColumnName(i) + "},");}return sb.substring(0, sb.toString().length() - 1) + ")";}/*** 删除单条* @return*/public String getDeleteSqlExp() {StringBuffer sb = new StringBuffer();sb.append("delete from " + tableName);if (whereMap.entrySet().size() > 0) {sb.append(" where 1=1 ");for (Map.Entry<String, Object> entry : whereMap.entrySet()) {sb.append(" and " + entry.getKey() + "=#{" + entry.getKey() + "}");}}return sb.toString();}/*** 修改单条* @return*/public String getUpdateSqlExp() {StringBuffer sb = new StringBuffer();sb.append("update " + tableName + " ");if (this.getColumnValues().length > 0 && this.getColumnValues().length== this.getColumnNames().length)sb.append(" set ");Map<String, Object> getColumnMap = new HashMap<String, Object>();for (int i = 0; i < this.getColumnValues().length; i++) {if (this.getColumnName(i) != null) {sb.append(this.getColumnName(i));sb.append("=#{" + this.getColumnName(i) + "},");getColumnMap.put(this.getColumnName(i), this.getColumnName(i));}}sb.replace(sb.length() - 1, sb.length(), " ");if (whereMap.entrySet().size() > 0) {sb.append(" where 1=1 ");for (Map.Entry<String, Object> entry : whereMap.entrySet()) {String getKey = entry.getKey();if (getColumnMap.entrySet().size() > 0 && getColumnMap.containsKey(getKey)) {String mapColKey = getColumnMap.get(getKey).toString();if (mapColKey.equals(getKey)) {sb.append(" and " + entry.getKey()+ "=#{" + entry.getKey() + "-" + entry.getKey() + "} ");}} else {sb.append(" and " + entry.getKey() + "=#{" + entry.getKey() + "} ");}}}return sb.toString();}public String getTableName() {return tableName;}public void setTableName(String tableName) {this.tableName = tableName;}public Map<String, Object> getWhereMap() {return whereMap;}public void setWhereMap(Map<String, Object> whereMap) {this.whereMap = whereMap;}}
3.SQL工具
package com.web.zhangyong168.cn.scala.utilimport java.utilimport java.util.{ArrayList, List}import scala.beans.BeanPropertyimport scala.collection.mutable/** *** @description SQL工具* @author 张勇* @version 0.0.1* @date 2019年12月2日下午5:25:39*/class SQLHelper {/*** 源数据*/@BeanProperty var sourceSql: String = null/*** 替换后的数据*/@BeanProperty var convertSql: String = null/*** 参数key值数组*/@BeanProperty var paramValueKeys: List[String] = nulldef build(sourceSql: String): SQLHelper = {build(sourceSql, null)}/*** 创建sql工具* @param sourceSql 源数据* @param replaceParams 参数* @return*/def build(sourceSql:String,replaceParams:mutable.Map[String,String]):SQLHelper={val sqlHelper = new SQLHelperval paramValuePrefix = "#{"val paramValuePostfix = "}"val paramStringPrefix = "${"val paramStringPostfix = "}"val replaceParamValuePrefix = "#\\{"val replaceParamValuePostfix = "\\}"val replaceParamStringPrefix = "\\$\\{"val replaceParamStringPostfix = "\\}"if (sourceSql != null && sourceSql.length > 0) {var psPreIndex: Int = 0var psPostIndex: Int = 0var paramName: String = nullvar paramValue: String = nullvar tmpSourceSql: String = sourceSqlvar replaceString: String = nullval tmpParamValueKeys: util.List[String] = new util.ArrayList[String]//替换 参数字符if (replaceParams != null && replaceParams.size > 0) {psPreIndex = tmpSourceSql.indexOf(paramStringPrefix)while (psPreIndex > -1) { //解决字符串索引未找到的问题psPostIndex = tmpSourceSql.indexOf(paramStringPostfix, psPreIndex)//${}空参数名if (psPostIndex - psPreIndex == paramStringPrefix.length)println("查询错误: --> position[" + psPreIndex + "," + psPostIndex + "]")else {paramName=tmpSourceSql.substring(psPreIndex+paramStringPrefix.length,psPostIndex)paramValue = replaceParams.get(paramName).head.toStringreplaceString = replaceParamStringPrefix + paramName + replaceParamStringPostfixif (replaceParams.contains(paramName))if (paramValue != null)tmpSourceSql = tmpSourceSql.replaceAll(replaceString, paramValue.toString)elsetmpSourceSql = tmpSourceSql.replaceAll(replaceString, " ")elseprintln("查询错误:[" + paramName + "]")}psPreIndex = tmpSourceSql.indexOf(paramStringPrefix)}}//替换 参数值if (tmpSourceSql.length > 0) {//替换 参数字符psPreIndex = tmpSourceSql.indexOf(paramValuePrefix)while (psPreIndex > -1) {val index = psPreIndexpsPostIndex = tmpSourceSql.indexOf(paramValuePostfix, index)//#{}空参数名if (psPostIndex - psPreIndex == paramValuePrefix.length)println("查询错误: --> position[" + psPreIndex + "," + psPostIndex + "]")else {paramName=tmpSourceSql.substring(psPreIndex + paramValuePrefix.length, psPostIndex)replaceString = replaceParamValuePrefix + paramName + replaceParamValuePostfix//替换参数字符tmpParamValueKeys.add(paramName)tmpSourceSql = tmpSourceSql.replaceFirst(replaceString, "?")}psPreIndex = tmpSourceSql.indexOf(paramValuePrefix)}}sqlHelper.setSourceSql(sourceSql)sqlHelper.setConvertSql(tmpSourceSql)sqlHelper.setParamValueKeys(tmpParamValueKeys)}sqlHelper}}
4.jdbc操作数据库工具类
package com.web.zhangyong168.cn.scala.utilimport java.sql.{Connection, PreparedStatement, ResultSet, Statement}import scala.collection.mutable/*** @description : jdbc操作数据工具类* @author zhangyong* @version 1.0.0* @date 2019/12/3 14:55*/object JDBCQuery {private var conn:Connection =nullvar preparedStatement:PreparedStatement=null/*** 数据库连接* @return*/def getDataConnect:Connection={JDBCUtil.getConnection}/*** 关闭所有的链接* @param conn* @param statement* @param rs*/def closeConnect(conn:Connection,statement:Statement,rs:ResultSet)={if(conn!=null) JDBCUtil.closeConn(conn)if(statement!=null) statement.close()if(rs!=null) rs.close()}/*** 删除单条SQL语句* @param storeId 操作数据库配置文件oracle mysql tidb* @param recordOne 构造SQL语句的方法* @return*/def deleteQueryDemo( storeId :Int, recordOne: RecordOne ):Boolean ={val flag=trueval map:mutable.Map[String,String]=new mutable.HashMap[String,String]()for((k,v) <- recordOne.whereOption ){map.put(k,v)}val sqlHelper = new SQLHelpersqlHelper.build(recordOne.deleteSqlExp,map)println("start to getConnection")val conn=JDBCUtil.getConnection()val preparedStatement=conn.prepareStatement(sqlHelper.getConvertSql)println("========================打印删除SQL表达式======================")println(" " + sqlHelper.getConvertSql)println("===========================================================")var index:Int=1;val list:List[String]= sqlHelper.getParamValueKeys()[String]for( re <- list){preparedStatement.setObject(index,map.get(re).head.toString)index = index+1}println("========================打印删除SQL语句========================")println(" " + preparedStatement)println("==========================================================")preparedStatement.executepreparedStatement.execute()JDBCQuery.closeConnect(conn,null,null)flag}}
5.sparkSQL 操作mysql及hive互相导入数据
package com.web.zhangyong168.cn.scalaimport java.util.Propertiesimport com.web.zhangyong168.cn.scala.util.{SparkJdbcUtil, SparkUtil}import org.apache.spark.sql.SaveModeimport org.apache.spark.sql.execution.datasources.jdbc.JDBCOptionsimport org.apache.spark.sql.execution.datasources.jdbc2.JDBCSaveModeimport scala.collection.JavaConverters/*** @description : dataframe 常用的基本操作* @author zhangyong* @version 1.0.0* @date 2019/12/5 14:55*/object DataFremeUtil {System.setProperty("HADOOP_USER_NAME","root")val jdbcConf:Properties =SparkJdbcUtil.getSparkJdbcConfig("mysql")val jdbcMap=JavaConverters.propertiesAsScalaMapConverter(jdbcConf).asScalajdbcMap +=(JDBCOptions.JDBC_BATCH_INSERT_SIZE -> "150")jdbcMap +=(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL -> "NONE")jdbcMap +=(JDBCOptions.JDBC_TABLE_NAME -> "CLASS") //尽量大写val spark=SparkUtil.getSparkSessionWithHive("hive")val df=spark.read.format("jdbc").options(jdbcMap).load()println("打印mysql数据库:CLASS 里面的SQL语句")df.show()df.persist() //存入内存里面//可以写条件但是字段智能是*号val df2=spark.sql(s"""select * from yunduo.calss""")//只展示这几列 前提hive 也只有这几列 为mysql导入 hive做准备val df3=df2.selectExpr("uid","name","birthday") df3.show()df3.persist()jdbcMap +=(JDBCOptions.JDBC_TABLE_NAME -> "CLASS") //尽量大写jdbcMap +=("savamode"-> JDBCSaveMode.Update.toString)df3.write.format("org.apache.spark.sql.execution.datasources.jdbc2").options(jdbcMap).save() //直接操作mysql语句 将hive数据 导入mysql数据库//将mysql数据导入到hive表里面 并且多加了2个字段一个 是count_date count_type//i代表是添加 u代表修改 注意format里面可以写hvie 或者是 orc//saveAsTable 有新字段加入hive 如果没有新字段加入//就用insertInto("ZYDB.TB_ROLE_LOGIN_LOG")df.selectExpr("*","current_date() as count_date","'u' as count_type").write.mode(SaveMode.Overwrite).format("orc").saveAsTable("ZYDB.CLASS")//如果没有新字段加入 就用insertInto("ZYDB.TB_ROLE_LOGIN_LOG")// df.write.mode(SaveMode.Append).format("hive").insertInto("ZYDB.TB_ROLE_LOGIN_LOG")}
文章转载自大数据Java张勇Linux数据库LTL,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




