暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

基于SPARK SQL 读写ORACLE 的简单案例分析常见问题(二)

偷功 2016-04-27
526

3无法找到无法找到驱动类的问题及其解决方法


扩展知识:JVM的类加载机制——该知识是解决这一类问题的根本方法。


下面的解决方法主要从JVM进程的classpath配置出发去分析。


在Spark框架中,为JVM进程的classpath配置提供了以下几种方式:


1. 使用环境变量SPARK_CLASSPATH:将jar包放置在类查找路径,比如前面的SPARK_CLASSPATH,设置该变量后可以查看4040(默认应用监控端口)的环境设置,此时需要在driver和executor的classpath的路径中放置所需的jar包。


由前面启动过程中的日志可知,这种方式已经废弃,对应的方式是直接设置driver和executor的classpath相关的配置属性。


2. 使用配置属性分别设置driver和executor执行时加载jar包的路径。

    a)    spark.executor.extraClassPath

    b)    spark.driver.extraClassPath


取代使用环境变量进行配置的方法。由于通常情况下,driver和executor的进程不在同一个节点上,因此分别给出各自的配置属性。


和前面使用环境变量进行配置的方式一样,在设置相应的classpath路径之后,需要将对应的jar包部署到各个节点上的该classpath路径下。


说明:上述两种方法都是通过设置JVM进程的classpath属性,为进程提供jar包查找路径的。而对应的jar包,需要人为地部署到各个节点的对应路径下。


补充:另外可以通过提交命令的命令行选项--driver-class-path来设置driver端进程的classpath,原理类似,因此不作为单独一种。


注意:--conf命令项方式设置时,--confPROP=VALUE的等号中间不要加空格….


3. 除了前面两种方式外,Spark框架还提供了第三种方法,可以通过提交命令(spark-submit或spark-shell)的命令行选项--jars,自动上传、下载所需jar包,并同时将下载的jar包放入JVM进程的classpath路径中。


说明:通过该命令行选项设置的jar包,会通过http服务(注意,该服务在driver端启动后才会启动)上传jar包,并在executor端执行时下载该jar包,然后放入classpath。即,此时不需要手动部署到各个节点上,但每个提交的应用都会增加jar包上传、下载的网络IO、磁盘IO开销。


下面针对提交时的--master选项、--deploymode的选项分别进行解析。


不同选项的简单说明如下:


1. 当--master选项为local时,对应为in-process方式,此时仅一个进程(local-cluster时也对应一个进程,内部通过实例模拟),因此,对应的classpath实际上使用的都是driver短对应进程的classpath。即只需要配置driver的classpath即可。


2. 当--master选项为集群的MasterURL(本文主要基于Standalone模式的集群)时,对应driver和executor是以不同的进程方式启动,因此需要分别进行设置。并且,在不同的部署模式(--deploymode)下也会有细节上的差异(本质上还是根据JVM类加载机制):

    a)   --deploymode为client时:driver进程在当前提交节点上启动

    b)   --deploymode为cluster时:driver进程提交到集群中,由集群调度Master负责分配节点,并在分配的节点上启动driver进程。


说明:不同的部署模式和jar包的上传、下载有关,即在使用--jars方式时会有所差异,其关键点在于,jar包的上传、下载是通过driver进程启动过程中启动的http服务来完成的,当指定的jar包是以本地文件系统的路径提供时,在另一个节点启动的driver进程中的http服务根据该路径上传jar包时,也会根据本地文件系统指定的路径去上传,所以此时必须保证由Master节点分配给driver的节点,对应的该路径上也存在需要上传的jar包。


因此,建议在使用cluster部署模式提交应用程序时,所使用的路径尽可能与节点无关,比如使用hdfs文件系统的路径等。


3.1   测试类的设计


关于下面使用的SparkTest.jar中的com.TestClass测试类,分别在driver端和executor端同时访问Oracle的表数据,即两处都需要加载Oracle的驱动器类。


通过这种方式,方便分别测试driver端和executor端与各自的classpath配置及其jar包放置等相关的内容。


3.1.1    包含Driver与Executor执行逻辑的代码


如下所示:


object TestJarwithOracle {

  // 硬编码

  valurl = "jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid"

  valuser = "userName"

  valpassword = "password"

  valdbtable = "TABLE_TEST"

 

  defmain(args: Array[String]) {

   val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

   val sc = new SparkContext(conf)

   val sqlContext = new SQLContext(sc)

 

   val logRDD = sc.parallelize( List( ("name","action", "date", 1), ("name","action", "date", 2) ))

   // 处理逻辑:在Driver端

    deleteRecodes("date")

 

   // 处理逻辑:位于Executor端

   logRDD.foreachPartition(insertInto)

   sc.stop()

  }

 

  defdeleteRecodes(date: String): Unit = {

   var conn: Connection = null

   var ps: PreparedStatement = null

   val sql = s"delete from $dbtable where log_date in ('$date')"

   println(sql)

   try {

     Class.forName("oracle.jdbc.driver.OracleDriver")

     conn = DriverManager.getConnection(url, user, password)

     ps = conn.prepareStatement(sql)

     ps.executeUpdate()

    }catch {

      case e: Exception => e.printStackTrace

    }finally {

     if (ps != null) {

       ps.close()

     }

     if (conn != null) {

       conn.close()

     }

    }

  }

 

  definsertInto(iterator: Iterator[(String, String, String, Int)]): Unit = {

    var conn: Connection = null

   var ps: PreparedStatement = null

   val sql = s"insert into $dbtable( USER_ACTION, USER_NAME, LOG_DATE,ACTION_CNT) values (?, ?, ?, ?)"

 

   try {

     //conn =DriverManager.getConnection("jdbc:mysql://localhost:3306/spark","root", "123456")

     Class.forName("oracle.jdbc.driver.OracleDriver")

     conn = DriverManager.getConnection(url, user, password)

     iterator.foreach(data => {

       ps = conn.prepareStatement(sql)

       ps.setString(1, data._1)

       ps.setString(2, data._2)

       ps.setString(3, data._3)

       ps.setInt(4, data._4)

       // ps.setInt(3, data._3)

       ps.executeUpdate()

     }

     )

    }catch {

     case e: Exception => e.printStackTrace

    }finally {

     if (ps != null) {

       ps.close()

 

     }

     if (conn != null) {

       conn.close()

     }

    }

  }

}


3.1.2    代码说明


前面给出了从oracle(其他数据库基本一样)中读取的简单案例,对应insert的话,和普通oracle表的insert类似,需要注意的是连接的创建位置(可以参考Spark官网的流部分)。


大致原理简单描述如下:


1. Driver端创建的对象需要序列化到Executor端才能使用,当特定对象(如数据库连接)与具体节点绑定(如hostname绑定)时,即使序列化成功,在Executor端反序列化后对象也不能使用(比如反序列化时的初始化失败或hostname不同导致连接等无法使用等)


2. 一个分区对应一个task,Executor上的执行单位为task。

在一个执行单位中复用,即针对分区提供一个连接——可以复用连接池。

比如:rdd.foreachPartition(insertInto)


对应的function :insertInto,和普通的数据库insert方式是一样的(可以采用批量插入),针对每个分区Partition,然后获取或构建一个数据库连接,通过该连接将分区的Iterator数据插入到数据库表。





文章转载自偷功,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论