暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

解析大数据spark操作mysql及hive

大数据Java张勇Linux数据库LTL 2021-04-26
1099


一、基本概念和用法

Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源。

当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD。这是因为结果是以数据框(DataFrame)返回的,且这样 Spark SQL操作轻松或便于连接其它数据源。

因为这种 JDBC 数据源不需要用户提供 ClassTag,所以它也更适合使用 Java 操作


二、工具类

1.jdbc连接工具类

    package com.web.zhangyong168.cn.scala.util
    import java.sql.{Connection, DriverManager}
    import java.util
    /**
    * @description : jdbc连接工具类
    * @author zhangyong
    * @version 1.0.0
    * @date 2019/12/1 13:55
    */


    object JDBCUtil {
    //从配置文件读取mysql的基本信息
    val properties= SparkJdbcUtil.getSparkJdbcConfig("mysql")
    val url: String =properties.getProperty("url")
    val driverClass:String =properties.getProperty("driver")
    val user:String =properties.getProperty("user")
    val password:String =properties.getProperty("password")
    var driverConnNum:Int=0
    /**
    * 连接数
    */
    var connectionNum:Int=10
    /**
    * 连接池大小
    */
    val poolSize:Int=200
    /**
    * 连接池同步列队
    */
    val pool= new util.LinkedList[Connection]()


    /**
    * 加载Driver
    */
    def getDriver(): Unit ={
    if(connectionNum>poolSize&&pool.isEmpty){
    print("当前暂无可用Connection")
    Thread.sleep(2000)
    getDriver()
    }else{
    Class.forName(driverClass)
    }
    }
    /**
    * 得到的链接
    * @return
    */
    def getConnection() :Connection={
    println("step into this method")
    println("pool.isEmpty: " + pool.isEmpty)
    println("打印:"+pool)
    if(pool.isEmpty){
    getDriver()
    for(i <- 1 to connectionNum){ //创建10个连接
    val conn=DriverManager.getConnection(url,user,password)
    pool.push(conn)
    connectionNum += 1
    }
    }
    val connection:Connection=pool.pop() //从线程池所在的LinkedList
    return connection
    }
    /**
    * 关闭连接
    * @param connection
    */
    def closeConn(connection: Connection):Unit={
    pool.push(connection)
    }
    }



    2.构造SQL记录scala 语法

      package com.web.zhangyong168.cn.scala.util
      import scala.collection.mutable
      /**
      * @description : 记录
      * @author zhangyong
      * @version 1.0.0
      * @date 2019/12/2 13:23
      */
      class RecordOne {
      /**
      * 列名
      */
      var columnName: List[String] = null
      /**
      * 列值
      */
      var columnValue: Array[String] = null
      /**
      * 表名
      */
      var tableName: String = null
      /**
      * 条件
      */
      var whereOption: Map[String, String] = null
      /**
      * 添加单条数据
      *
      * @return
      */
      def getInsertSqlExp: String = {
      val sb: StringBuilder = new StringBuilder();
      sb.append("insert into " + tableName + "(")
      if (columnValue.length > 0 && columnName.size == columnValue.length) {
      val str: List[String] = columnName
      for (i <- columnName.indices) {
      sb.append(str(i) + ",")
      }
      sb.replace(sb.length - 1, sb.length, ")")
      sb.append("values(")
      for (i <- columnValue.indices) {
      sb.append("#{" + str(i) + "},")
      }
      }
      println(sb.substring(0, sb.toString.length - 1) + ")")
      sb.substring(0, sb.toString.length - 1) + ")"
      }


      /**
      * 删除SQL表达式
      * @return
      */
      def deleteSqlExp: String = {
      val sb: StringBuilder = new StringBuilder();
      sb.append("delete from " + tableName)
      if (whereOption.size > 0) {
      sb.append(" where 1=1 ")
      for ((k, v) <- whereOption) {
      sb.append(" and " + k + "=#{" + v + "}")
      }
      }
      println(sb.toString())
      sb.toString()
      }


      /**
      * 修改SQL表达式
      * @return
      */
      def updateSqlExp: String = {
      val sb: StringBuilder = new StringBuilder();
      sb.append("update " + tableName)
      if (columnName.size > 0 && columnValue.size == columnName.size)
           sb.append(" set ")
      var getColumnMap:mutable.Map[String,String]=new mutable.HashMap[String,String]
      for (i <- 0 to columnName.length -1) {
      println("打印:"+columnName(i))
      if (columnName(i) != null) {
      sb.append(columnName(i))
      sb.append("=#{" + columnName(i) + "},")
             getColumnMap = mutable.Map(columnName(i) -> columnName(i))
      }
      }
      sb.replace(sb.length - 1, sb.length, " ")
      if (whereOption.head._1.size > 0) {
      sb.append(" where 1=1 ")
      for ((k, v) <- whereOption) {
      val getkey: String = k
      if (getColumnMap.head._1.size > 0 && getColumnMap.contains(getkey)) {
      val mapColKey: String = getColumnMap.get(getkey).head.toString
      if (mapColKey.equals(getkey)) {
      sb.append(" and " + k + "=#{" + k + "-" + k + "} ")
      }
      } else sb.append(" and " + k + "=#{" + k + "} ")
      }
      }
      println(sb.toString())
      sb.toString()
        }
      }


      2.构造SQL记录 java 语法

        package com.web.zhangyong168.cn.api.entity.storage;
        import com.web.zhangyong168.cn.entity.RecordOne;
        import java.util.HashMap;
        import java.util.Map;
        /**
        * @version 1.0.0
        * @Author zhangyong
        * @Description 记录
        * @Date 2019/10/19 15:42
        **/
        public class AccessOne extends RecordOne {
        /**
        * 表名
        */
        public String tableName;
        /**
        * 条件
        */
        public Map<String, Object> whereMap;
        /**
        * 添加单条
        * @return
        */
        public String getInsertSqlExp() {
        StringBuffer sb = new StringBuffer();
        sb.append("insert into " + tableName + "(");


          if(this.getColumnValues().length>0&&this.getColumnValues().length== this.getColumnNames().length)for (int i = 0; i < this.getColumnValues().length; i++) {
        sb.append(this.getColumnName(i) + ",");
        }
        sb.replace(sb.length() - 1, sb.length(), ")");
        sb.append("values(");
        for (int i = 0; i < this.getColumnValues().length; i++) {
        sb.append("#{" + this.getColumnName(i) + "},");
        }
        return sb.substring(0, sb.toString().length() - 1) + ")";
        }


        /**
        * 删除单条
        * @return
        */
        public String getDeleteSqlExp() {
        StringBuffer sb = new StringBuffer();
        sb.append("delete from " + tableName);
        if (whereMap.entrySet().size() > 0) {
        sb.append(" where 1=1 ");
        for (Map.Entry<String, Object> entry : whereMap.entrySet()) {
        sb.append(" and " + entry.getKey() + "=#{" + entry.getKey() + "}");
        }
        }
        return sb.toString();
        }


        /**
        * 修改单条
        * @return
        */
        public String getUpdateSqlExp() {
        StringBuffer sb = new StringBuffer();
        sb.append("update " + tableName + " ");


               if (this.getColumnValues().length > 0 && this.getColumnValues().length== this.getColumnNames().length)sb.append(" set ");
        Map<String, Object> getColumnMap = new HashMap<String, Object>();
        for (int i = 0; i < this.getColumnValues().length; i++) {
        if (this.getColumnName(i) != null) {
        sb.append(this.getColumnName(i));
        sb.append("=#{" + this.getColumnName(i) + "},");
         getColumnMap.put(this.getColumnName(i), this.getColumnName(i));
        }
        }
        sb.replace(sb.length() - 1, sb.length(), " ");
        if (whereMap.entrySet().size() > 0) {
        sb.append(" where 1=1 ");
           for (Map.Entry<StringObject> entry : whereMap.entrySet()) {
        String getKey = entry.getKey();
        if (getColumnMap.entrySet().size() > 0 && getColumnMap.containsKey(getKey)) {
              String mapColKey = getColumnMap.get(getKey).toString();
              if (mapColKey.equals(getKey)) {
        sb.append(" and " + entry.getKey()+ "=#{" + entry.getKey() + "-" + entry.getKey() + "} ");
        }


        } else {
        sb.append(" and " + entry.getKey() + "=#{" + entry.getKey() + "} ");
        }
        }


        }
        return sb.toString();
        }


        public String getTableName() {
        return tableName;
        }


        public void setTableName(String tableName) {
        this.tableName = tableName;
        }






        public Map<String, Object> getWhereMap() {
        return whereMap;
        }


        public void setWhereMap(Map<String, Object> whereMap) {
        this.whereMap = whereMap;
        }
        }


        3.SQL工具

          package com.web.zhangyong168.cn.scala.util
          import java.util
          import java.util.{ArrayList, List}
          import scala.beans.BeanProperty
          import scala.collection.mutable
          /** *
          *
          * @description SQL工具
          * @author 张勇
          * @version 0.0.1
          * @date 2019年12月2日下午5:25:39
          */
          class SQLHelper {
          /**
          * 源数据
          */
          @BeanProperty var sourceSql: String = null
          /**
          * 替换后的数据
          */
          @BeanProperty var convertSql: String = null
          /**
          * 参数key值数组
          */
          @BeanProperty var paramValueKeys: List[String] = null
          def build(sourceSql: String): SQLHelper = {
          build(sourceSql, null)
          }


          /**
          * 创建sql工具
          * @param sourceSql 源数据
          * @param replaceParams 参数
          * @return
          */
          def build(sourceSql:String,replaceParams:mutable.Map[String,String]):SQLHelper={
          val sqlHelper = new SQLHelper
          val paramValuePrefix = "#{"
          val paramValuePostfix = "}"
          val paramStringPrefix = "${"
          val paramStringPostfix = "}"
          val replaceParamValuePrefix = "#\\{"
          val replaceParamValuePostfix = "\\}"
          val replaceParamStringPrefix = "\\$\\{"
          val replaceParamStringPostfix = "\\}"
          if (sourceSql != null && sourceSql.length > 0) {
          var psPreIndex: Int = 0
          var psPostIndex: Int = 0
          var paramName: String = null
          var paramValue: String = null
          var tmpSourceSql: String = sourceSql
          var replaceString: String = null
          val tmpParamValueKeys: util.List[String] = new util.ArrayList[String]
          //替换 参数字符
          if (replaceParams != null && replaceParams.size > 0) {
          psPreIndex = tmpSourceSql.indexOf(paramStringPrefix)
          while (psPreIndex > -1) { //解决字符串索引未找到的问题
                psPostIndex = tmpSourceSql.indexOf(paramStringPostfix, psPreIndex)
          //${}空参数名
          if (psPostIndex - psPreIndex == paramStringPrefix.length)
          println("查询错误: --> position[" + psPreIndex + "," + psPostIndex + "]")
          else {


          paramName=tmpSourceSql.substring(
          psPreIndex+paramStringPrefix.length,psPostIndex)paramValue = replaceParams.get(paramName).head.toString


           replaceString = replaceParamStringPrefix + paramName + replaceParamStringPostfixif (replaceParams.contains(paramName))
          if (paramValue != null)
          tmpSourceSql = tmpSourceSql.replaceAll(replaceString, paramValue.toString)
          else
          tmpSourceSql = tmpSourceSql.replaceAll(replaceString, " ")
          else
          println("查询错误:[" + paramName + "]")
          }
          psPreIndex = tmpSourceSql.indexOf(paramStringPrefix)
          }
          }
          //替换 参数值
          if (tmpSourceSql.length > 0) {
          //替换 参数字符
          psPreIndex = tmpSourceSql.indexOf(paramValuePrefix)
          while (psPreIndex > -1) {
          val index = psPreIndex
          psPostIndex = tmpSourceSql.indexOf(paramValuePostfix, index)
          //#{}空参数名
                  if (psPostIndex - psPreIndex == paramValuePrefix.length)
          println("查询错误: --> position[" + psPreIndex + "," + psPostIndex + "]")
          else {


          paramName=tmpSourceSql.substring(
          psPreIndex + paramValuePrefix.length, psPostIndex)replaceString = replaceParamValuePrefix + paramName + replaceParamValuePostfix
          //替换参数字符
          tmpParamValueKeys.add(paramName)
          tmpSourceSql = tmpSourceSql.replaceFirst(replaceString, "?")
          }
          psPreIndex = tmpSourceSql.indexOf(paramValuePrefix)
          }
          }
          sqlHelper.setSourceSql(sourceSql)
          sqlHelper.setConvertSql(tmpSourceSql)
          sqlHelper.setParamValueKeys(tmpParamValueKeys)
          }
          sqlHelper
          }
          }

           

          4.jdbc操作数据库工具类

            package com.web.zhangyong168.cn.scala.util
            import java.sql.{Connection, PreparedStatement, ResultSet, Statement}
            import scala.collection.mutable
            /**
            * @description : jdbc操作数据工具类
            * @author zhangyong
            * @version 1.0.0
            * @date 2019/12/3 14:55
            */
            object JDBCQuery {
            private var conn:Connection =null
            var preparedStatement:PreparedStatement=null
            /**
            * 数据库连接
            * @return
            */
            def getDataConnect:Connection={
            JDBCUtil.getConnection
            }

            /**
            * 关闭所有的链接
            * @param conn
            * @param statement
            * @param rs
            */
            def closeConnect(conn:Connection,statement:Statement,rs:ResultSet)={
            if(conn!=null) JDBCUtil.closeConn(conn)
            if(statement!=null) statement.close()
            if(rs!=null) rs.close()
            }

            /**
            * 删除单条SQL语句
            * @param storeId 操作数据库配置文件oracle mysql tidb
            * @param recordOne 构造SQL语句的方法
            * @return
            */
            def deleteQueryDemo( storeId :Int, recordOne: RecordOne ):Boolean ={
            val flag=true
            val map:mutable.Map[String,String]=new mutable.HashMap[String,String]()
            for((k,v) <- recordOne.whereOption ){
            map.put(k,v)
            }


            val sqlHelper = new SQLHelper
            sqlHelper.build(recordOne.deleteSqlExp,map)


            println("start to getConnection")
            val conn=JDBCUtil.getConnection()
            val preparedStatement=conn.prepareStatement(sqlHelper.getConvertSql)
            println("========================打印删除SQL表达式======================")
            println(" " + sqlHelper.getConvertSql)
            println("===========================================================")
            var index:Int=1;
            val list:List[String]= sqlHelper.getParamValueKeys()[String]
            for( re <- list){
            preparedStatement.setObject(index,map.get(re).head.toString)
            index = index+1
            }
            println("========================打印删除SQL语句========================")
            println(" " + preparedStatement)
            println("==========================================================")
            preparedStatement.execute
            preparedStatement.execute()
            JDBCQuery.closeConnect(conn,null,null)
            flag
            }


            }


            5.sparkSQL 操作mysql及hive互相导入数据

              package com.web.zhangyong168.cn.scala
              import java.util.Properties
              import com.web.zhangyong168.cn.scala.util.{SparkJdbcUtil, SparkUtil}
              import org.apache.spark.sql.SaveMode
              import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
              import org.apache.spark.sql.execution.datasources.jdbc2.JDBCSaveMode
              import scala.collection.JavaConverters
              /**
              * @description : dataframe 常用的基本操作
              * @author zhangyong
              * @version 1.0.0
              * @date 2019/12/5 14:55
              */
              object DataFremeUtil {
              System.setProperty("HADOOP_USER_NAME","root")
              val jdbcConf:Properties =SparkJdbcUtil.getSparkJdbcConfig("mysql")
              val jdbcMap=JavaConverters.propertiesAsScalaMapConverter(jdbcConf).asScala
              jdbcMap +=(JDBCOptions.JDBC_BATCH_INSERT_SIZE -> "150")
              jdbcMap +=(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL -> "NONE")
              jdbcMap +=(JDBCOptions.JDBC_TABLE_NAME -> "CLASS") //尽量大写
              val spark=SparkUtil.getSparkSessionWithHive("hive")
              val df=spark.read.format("jdbc").options(jdbcMap).load()
              println("打印mysql数据库:CLASS 里面的SQL语句")
              df.show()


              df.persist() //存入内存里面


              //可以写条件但是字段智能是*号
              val df2=spark.sql(s"""select * from yunduo.calss""")
              //只展示这几列 前提hive 也只有这几列 为mysql导入 hive做准备
              val df3=df2.selectExpr("uid","name","birthday") df3.show()
              df3.persist()


              jdbcMap +=(JDBCOptions.JDBC_TABLE_NAME -> "CLASS") //尽量大写
              jdbcMap +=("savamode"-> JDBCSaveMode.Update.toString)


              df3.write.format("org.apache.spark.sql.execution.datasources.jdbc2")
              .options(jdbcMap).save() //直接操作mysql语句 将hive数据 导入mysql数据库


              //将mysql数据导入到hive表里面 并且多加了2个字段一个 是count_date count_type
              //i代表是添加 u代表修改 注意format里面可以写hvie 或者是 orc


              //saveAsTable 有新字段加入hive 如果没有新字段加入
                //就用insertInto("ZYDB.TB_ROLE_LOGIN_LOG")
               df.selectExpr("*","current_date() as count_date","'u' as count_type").write.mode(SaveMode.Overwrite).format("orc").saveAsTable("ZYDB.CLASS")
              //如果没有新字段加入 就用insertInto("ZYDB.TB_ROLE_LOGIN_LOG")
              // df.write.mode(SaveMode.Append).format("hive").insertInto("ZYDB.TB_ROLE_LOGIN_LOG")
              }



              文章转载自大数据Java张勇Linux数据库LTL,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

              评论