3无法找到无法找到驱动类的问题及其解决方法
扩展知识:JVM的类加载机制——该知识是解决这一类问题的根本方法。
下面的解决方法主要从JVM进程的classpath配置出发去分析。
在Spark框架中,为JVM进程的classpath配置提供了以下几种方式:
1. 使用环境变量SPARK_CLASSPATH:将jar包放置在类查找路径,比如前面的SPARK_CLASSPATH,设置该变量后可以查看4040(默认应用监控端口)的环境设置,此时需要在driver和executor的classpath的路径中放置所需的jar包。
由前面启动过程中的日志可知,这种方式已经废弃,对应的方式是直接设置driver和executor的classpath相关的配置属性。
2. 使用配置属性分别设置driver和executor执行时加载jar包的路径。
a) spark.executor.extraClassPath
b) spark.driver.extraClassPath
取代使用环境变量进行配置的方法。由于通常情况下,driver和executor的进程不在同一个节点上,因此分别给出各自的配置属性。
和前面使用环境变量进行配置的方式一样,在设置相应的classpath路径之后,需要将对应的jar包部署到各个节点上的该classpath路径下。
说明:上述两种方法都是通过设置JVM进程的classpath属性,为进程提供jar包查找路径的。而对应的jar包,需要人为地部署到各个节点的对应路径下。
补充:另外可以通过提交命令的命令行选项--driver-class-path来设置driver端进程的classpath,原理类似,因此不作为单独一种。
注意:--conf命令项方式设置时,--confPROP=VALUE的等号中间不要加空格….
3. 除了前面两种方式外,Spark框架还提供了第三种方法,可以通过提交命令(spark-submit或spark-shell)的命令行选项--jars,自动上传、下载所需jar包,并同时将下载的jar包放入JVM进程的classpath路径中。
说明:通过该命令行选项设置的jar包,会通过http服务(注意,该服务在driver端启动后才会启动)上传jar包,并在executor端执行时下载该jar包,然后放入classpath。即,此时不需要手动部署到各个节点上,但每个提交的应用都会增加jar包上传、下载的网络IO、磁盘IO开销。
下面针对提交时的--master选项、--deploymode的选项分别进行解析。
不同选项的简单说明如下:
1. 当--master选项为local时,对应为in-process方式,此时仅一个进程(local-cluster时也对应一个进程,内部通过实例模拟),因此,对应的classpath实际上使用的都是driver短对应进程的classpath。即只需要配置driver的classpath即可。
2. 当--master选项为集群的MasterURL(本文主要基于Standalone模式的集群)时,对应driver和executor是以不同的进程方式启动,因此需要分别进行设置。并且,在不同的部署模式(--deploymode)下也会有细节上的差异(本质上还是根据JVM类加载机制):
a) --deploymode为client时:driver进程在当前提交节点上启动
b) --deploymode为cluster时:driver进程提交到集群中,由集群调度Master负责分配节点,并在分配的节点上启动driver进程。
说明:不同的部署模式和jar包的上传、下载有关,即在使用--jars方式时会有所差异,其关键点在于,jar包的上传、下载是通过driver进程启动过程中启动的http服务来完成的,当指定的jar包是以本地文件系统的路径提供时,在另一个节点启动的driver进程中的http服务根据该路径上传jar包时,也会根据本地文件系统指定的路径去上传,所以此时必须保证由Master节点分配给driver的节点,对应的该路径上也存在需要上传的jar包。
因此,建议在使用cluster部署模式提交应用程序时,所使用的路径尽可能与节点无关,比如使用hdfs文件系统的路径等。
3.1 测试类的设计
关于下面使用的SparkTest.jar中的com.TestClass测试类,分别在driver端和executor端同时访问Oracle的表数据,即两处都需要加载Oracle的驱动器类。
通过这种方式,方便分别测试driver端和executor端与各自的classpath配置及其jar包放置等相关的内容。
3.1.1 包含Driver与Executor执行逻辑的代码
如下所示:
object TestJarwithOracle {
// 硬编码
valurl = "jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid"
valuser = "userName"
valpassword = "password"
valdbtable = "TABLE_TEST"
defmain(args: Array[String]) {
val conf = new SparkConf().setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val logRDD = sc.parallelize( List( ("name","action", "date", 1), ("name","action", "date", 2) ))
// 处理逻辑:在Driver端
deleteRecodes("date")
// 处理逻辑:位于Executor端
logRDD.foreachPartition(insertInto)
sc.stop()
}
defdeleteRecodes(date: String): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = s"delete from $dbtable where log_date in ('$date')"
println(sql)
try {
Class.forName("oracle.jdbc.driver.OracleDriver")
conn = DriverManager.getConnection(url, user, password)
ps = conn.prepareStatement(sql)
ps.executeUpdate()
}catch {
case e: Exception => e.printStackTrace
}finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
definsertInto(iterator: Iterator[(String, String, String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = s"insert into $dbtable( USER_ACTION, USER_NAME, LOG_DATE,ACTION_CNT) values (?, ?, ?, ?)"
try {
//conn =DriverManager.getConnection("jdbc:mysql://localhost:3306/spark","root", "123456")
Class.forName("oracle.jdbc.driver.OracleDriver")
conn = DriverManager.getConnection(url, user, password)
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setString(2, data._2)
ps.setString(3, data._3)
ps.setInt(4, data._4)
// ps.setInt(3, data._3)
ps.executeUpdate()
}
)
}catch {
case e: Exception => e.printStackTrace
}finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
}
3.1.2 代码说明
前面给出了从oracle(其他数据库基本一样)中读取的简单案例,对应insert的话,和普通oracle表的insert类似,需要注意的是连接的创建位置(可以参考Spark官网的流部分)。
大致原理简单描述如下:
1. Driver端创建的对象需要序列化到Executor端才能使用,当特定对象(如数据库连接)与具体节点绑定(如hostname绑定)时,即使序列化成功,在Executor端反序列化后对象也不能使用(比如反序列化时的初始化失败或hostname不同导致连接等无法使用等)
2. 一个分区对应一个task,Executor上的执行单位为task。
在一个执行单位中复用,即针对分区提供一个连接——可以复用连接池。
比如:rdd.foreachPartition(insertInto)
对应的function :insertInto,和普通的数据库insert方式是一样的(可以采用批量插入),针对每个分区Partition,然后获取或构建一个数据库连接,通过该连接将分区的Iterator数据插入到数据库表。





