hadoop jar example.jar WordCount /file/input /file/output
复制
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
复制
public static void main(String[] args) throws Throwable {
//构建一个RunJar对象并执行run方法
new RunJar().run(args);
}
复制
获取 jar 文件,即 example.jar 拿到 jar 文件中的 MainClass 名称,即 WordCount.class 解压 jar 文件 通过反射拿到 MainClass 实例对象 拿到 main 方法的实例 通过反射调用 main 方法,也就是我们编写的应用程序的 mian 方法 在应用程序的 main 方法中,通过 Job.waitForCompletion() 等待应用程序执行完毕
public void run(String[] args) throws Throwable {
String usage = "RunJar jarFile [mainClass] args...";
...
//获取到Jar文件,即提交的example.jar文件
JarFile jarFile;
try {
jarFile = new JarFile(fileName);
} catch(IOException io) {
throw new IOException("Error opening job jar: " + fileName)
.initCause(io);
}
//拿到Jar文件的Main Class,如WordCount.class
Manifest manifest = jarFile.getManifest();
if (manifest != null) {
mainClassName = manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close();
...
mainClassName = mainClassName.replaceAll("/", ".");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
ensureDirectory(tmpDir);
//声明一个临时文件
final File workDir;
try {
workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
} catch (IOException ioe) {
...
}
...
//解压Jar文件
unJar(file, workDir);
ClassLoader loader = createClassLoader(file, workDir);
Thread.currentThread().setContextClassLoader(loader);
//通过反射拿到MainClass实例
Class<?> mainClass = Class.forName(mainClassName, true, loader);
//拿到main方法的实例
Method main = mainClass.getMethod("main", new Class[] {
Array.newInstance(String.class, 0).getClass()
});
String[] newArgs = Arrays.asList(args)
.subList(firstArg, args.length).toArray(new String[0]);
try {
//通过反射调用main方法
main.invoke(null, new Object[] { newArgs });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
/**
* TODO 到此为止,跳转到Driver的main方法,也就是自己编写的应用程序的main方法
* 最后调用Job.waitForCompletion等待应用程序执行完毕
*/
}
复制
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
//TODO 提交任务
submit();
}
//打印执行的进度信息
...
return isSuccessful();
}
复制
设置当前Job的状态为DEFINE 设置启用NewAPI,2.x之后属于NewAPI 连接Yarn集群,获取到ResourceManager的代理对象 获取一个作业提交器,将作业提交到 Yarn 集群 提交之后,设置MR的状态为RUNNING
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
//1.设置当前Job的状态为DEFINE
ensureState(JobState.DEFINE);
//2.设置启用NewAPI,2.x之后属于NewAPI
setUseNewAPI();
//3.获取提交客户端,连接Yarn集群,获取到ResourceManager的代理对象
connect();
//4.获取一个提交器
//这里的cluster就是上面初始化的Cluster对象
//cluster.getFileSystem() ==> HDFS
//cluster.getClient() ==> YARNRunner
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
//调用提交器的submitJobInternal方法执行Job的提交
return submitter.submitJobInternal(Job.this, cluster);
}
});
//5.提交之后,设置MR的状态为RUNNING
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
复制
第3步:连接Yarn集群,获取到ResourceManager的代理对象
Job 的内部有一个 Cluster 类型的 cluster 成员变量
Cluster 的内部有一个 YARNRunner 类型的(ClientProtocol实现类) client 的成员变量
YARNRunner 的内部有一个 ResourceMgrDelegate 类型的 resMgrDelegate 成员变量
ResourceMgrDelegate 的内部有一个 YarnClientImpl 类型的(YarnClient子类) client 成员变量
YarnClientImpl 的内部有一个 ApplicationClientProtocol 类型的 rmClient 成员变量
a .在 connect() 方法中初始化了 Cluster 对象
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
//初始化了Cluster对象
return new Cluster(getConfiguration());
}
});
}
}
复制
本地模式:LocalJobRunner
Yarn模式:YARNRunner
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
//local模式
//provider=LocalClientProtocolProvider
//clientProtocol=LocalJobRunner
clientProtocol = provider.create(conf);
} else {
//Yarn模式
//provider=YarnClientProtocolProvider
//clientProtocal=YARNRunner
clientProtocol = provider.create(jobTrackAddr, conf);
}
...
}
复制
public YARNRunner(Configuration conf) {
this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
}
复制
public ResourceMgrDelegate(YarnConfiguration conf) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
//创建一个YarnClient,实现类是YarnClientImpl
this.client = YarnClient.createYarnClient();
init(conf);
start();
}
复制
//ResourceManager的代理对象
protected ApplicationClientProtocol rmClient;
复制
第4步:获取一个作业提交器,将作业提交到 Yarn 集群
生成或验证各种路径,将作业所需的资源进行上传 对输入数据源进行逻辑切片,并设置Map Task的数量 进行作业的提交
//验证作业的输出路径是否存在,如果存在会报错,
//正常情况是我们进行了配置,但是该路径在作业提交时是不存在的
checkSpecs(job);
//添加应用框架的路径到分布式缓存中
//DistributedCache:把应用的一些资源添加到分布式缓存中,那么程序在执行的时候,
//无论有多少个节点启动任务,这些节点都会自动把分布式缓存中的各种信息
//(小数据文件,配置信息...)同步到节点本地
addMRFrameworkToDistributedCache(conf);
//通过静态方法getStagingDir()获取作业执行时相关资源的存放路径
//默认是/tmp/hadoop-yarn/staging/提交作业用户名{$user}/.staging
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//TODO 记录作业提交的主机IP,主机名,并设置配置信息
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
//生成JobID
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//构造提交作业的路径,jobStagingArea 后面拼接 jobID
//当提交一个 Job时,YARN的客户端会把该Job的一切要用的资源初始化并且存储在HDFS
//的工作目录中,以后哪个节点要执行 Task,就从HDFS 目录中拉取资源文件
//主要包括以下三类资源:
//1.xxx.jar
//2.job.xml
//3.shell启动命令
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
try {
//将jar文件和配置文件上传到上面获取的资源的提交目录 submitJobDir
copyAndConfigureFiles(job, submitJobDir);
//上传完成后获取完整的配置文件所在路径,也就是job.xml的路径
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
...
复制
第二部分:对输入数据源进行逻辑切片,并设置Map Task的数量
...
//获取逻辑切片的数量
int maps = writeSplits(job, submitJobDir);
//配置需要启动的 Map Task 的个数
conf.setInt(MRJobConfig.NUM_MAPS, maps);
...
复制
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
//TODO 根据配置的InputFormat类,通过反射创建对应的实例对象
// 默认为TextInputFormat,是FileInputFormat的子类
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
//TODO 核心方法,InputSplit就是切片对象,getSplits() 方法返回切片对象的集合
// 这里的getSplits()在FileInputFormat中实现,TextInputFormat并未进行重写
List<InputSplit> splits = input.getSplits(job);
//将集合转为数组
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
//对数组中的元素根据切片大小进行排序
Arrays.sort(array, new SplitComparator());
//TODO 将逻辑切片信息写入Job提交的HDFS目录中
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
//返回切片数量
return array.length;
}
复制
public List<InputSplit> getSplits(JobContext job) throws IOException {
...
//初始化切片对象的集合容器
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
//遍历输入文件
for (FileStatus file: files) {
Path path = file.getPath();
//拿到文件的大小
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//TODO 判断文件是否可以进行切分
// 如果可以进行切分
if (isSplitable(job, path)) {
//获取配置的块大小
long blockSize = file.getBlockSize();
//计算逻辑切片的大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//bytesRemaining => 文件剩余大小
long bytesRemaining = length;
//TODO 判断文件的剩余大小是否大于切片大小的 1.1 倍
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//进行切片
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
//更新剩余文件大小
bytesRemaining -= splitSize;
}
//TODO 如果剩余大小小于切片大小的 1.1 倍,且剩余大小不为0,那么将剩余的文件作为一个逻辑切片
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
//TODO 如果无法切分,将文件整体作为一个逻辑切片
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
//如果文件大小为0,不需要进行切片,添加一个空的逻辑切片
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
...
}
return splits;
}
复制
遍历输入的所有文件,获取文件大小 如果文件大小为 0 ,则不需要进行切片,直接添加一个空的逻辑切片 如果文件大小不为 0,判断该文件是否可以进行切分,如果不可以切分则将整体文件作为一个逻辑切片 如果文件可以进行切分,首先计算逻辑切片的大小,然后对文件进行切片,并更新剩余文件大小 如果剩余文件大小大于逻辑切片大小的 1.1 倍,则一直进行切分,直到该条件不满足位置 如果剩余文件不满足 1.1 倍逻辑切片大小,且剩余文件大小不为0,则将剩余文件作为一个逻辑切片 返回逻辑切片的集合,这里的逻辑切片用 InputSplit 对象表示
//将解析该job生成的配置文件,写入到HDFS上
writeConf(conf, submitJobFile);
复制
第三部分:进行作业的提交
//这里的 submitClient 为 YARNRunner 实例
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
复制
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException {
...
try {
//TODO 提交应用并获取 ApplicationId
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
...
}
...
}
复制
文章转载自大数据记事本,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
大佬你只在这里发布吗?
2年前

评论
相关阅读
数据库国产化替代深化:DBA的机遇与挑战
代晓磊
1210次阅读
2025-04-27 16:53:22
2025年4月国产数据库中标情况一览:4个千万元级项目,GaussDB与OceanBase大放异彩!
通讯员
689次阅读
2025-04-30 15:24:06
数据库,没有关税却有壁垒
多明戈教你玩狼人杀
588次阅读
2025-04-11 09:38:42
国产数据库需要扩大场景覆盖面才能在竞争中更有优势
白鳝的洞穴
569次阅读
2025-04-14 09:40:20
【活动】分享你的压箱底干货文档,三篇解锁进阶奖励!
墨天轮编辑部
494次阅读
2025-04-17 17:02:24
一页概览:Oracle GoldenGate
甲骨文云技术
469次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
461次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
353次阅读
2025-04-18 10:01:22
国产数据库图谱又上新|82篇精选内容全览达梦数据库
墨天轮编辑部
268次阅读
2025-04-23 12:04:21
关于征集数据库标准体系更新意见和数据库标准化需求的通知
数据库标准工作组
239次阅读
2025-04-11 11:30:08