1. 分布式文件系统HDFS
1.1 测试安装好的hadoop集群
1.在hadoop的同级目录input中创建两个文件 2.将两个文件拷贝到hadoop目录下的in目录(注意这个in目录不是linux实际目录)
bin/hadoop dfs -put ../input in
复制
3.查看这两个文件
bin/hadoop dfs -ls ./in/*
复制
4.提交运行作业
bin/hadoop jar hadoop-0.x.x-examples.jar wordcount in out
复制
其中hadoop-0.x.x-examples.jar为源码中的样例,wordcount表示该jar包中的某一项功能,in输入目录,out输出目录
列出相应的目录 in和out
bin/hadoop dfs -ls
复制
查看out中的计算结果
bin/hadoop dfs -cat ./out/*
复制
1.2 通过web了解Hadoop的活动
通过浏览器和http访问jobtracker所在节点xxxxx端口监控jobtracker 可以查看作业运行日志,使用的节点等 通过浏览器和http访问namenode所在节点的xxxx端口监控集群
1.3 数据写在哪(从os角度看)
配置文件中定义了文件的存放位置 数据+元数据(xx.meta)
1.4 HDFS设计的基础和目标
硬件错误是常态。因此需要冗余。 流式数据访问。即数据批量读取而非随机读取,Hadoop擅长做数据分析而不是事务处理 大规模数据集 简单一致模型。为了降低系统复杂度,对文件采用一次性写多次读的逻辑设计,即文件一经写入,关闭,就再也不能修改 程序采用“数据就近”原则分配节点执行
1.5 HDFS体系结构
NameNode DataNode 事务日志 映像文件 SecondaryNameNode
1.5.1 NameNode
管理文件系统的命名空间 记录每个文件数据块在各个Datanode上的位置和副本信息 协调客户端对文件的访问 记录命名空间内的改动或空间本身属性的改动 Namenode使用事务日志记录HDFS元数据的变化。使用映像文件存储文件系统的命名空间,包括文件映射,文件属性等。找到元数据保存的地方(注意:同级目录下有一些其他目录) current目录下fsimage即映象文件
1.5.2 Datanode
负责所在物理节点的存储管理 一次写入,多次读取(不修改) 文件由数据块组成,典型的块大小是64MB 数据块尽量散步到各个节点
1.5.3 读取数据的流程
客户端要访问HDFS中的一个文件 首先从namenode获得组成这个文件的数据块位置列表 根据列表知道存储数据块的datanode 访问datanode获取数据 Namenode并不参与数据实际传输
1.6 HDFS的可靠性
1.冗余副本策略 可以在hdfs-site.xml中设置复制因子指定副本数量 所有数据块都有副本 Datanode启动时,遍历本地文件系统,产生一份hdfs数据块和本地文件的对应关系列表(blockreport)汇报给namenode 2.机架策略 集群一般是放在不同的机架上,机架间的带宽要比机架内带宽要小 HDFS的“机架感知” 一般在本机架存放一个副本,在其他机架再存放别的副本,这样可以防止机架失效时丢失数据,可以提高带宽利用率 3.心跳机制 Namenode周期性从datanode接收心跳信号和块报告 Namenode根据块报告验证元数据 没有按时发送心跳的datanode会被标记为宕机,不会再给它任何I/O请求 如果datanode失效造成副本数量下降,并且低于预先设置的阈值,namenode会检测出这些数据块,并在合适的时机进行重新复制 引发重新复制的原因还包括数据副本本身损坏,磁盘错误,复制因子被增大等。 4.安全模式 Namenode启动时会先经过一个“安全模式”阶段 安全模式阶段不会产生数据写 在此阶段Namenode收集各个datanode的报告,当数据块达到最小副本数以上时,会被认为是“安全”的 在一定比例(可设置)的数据块被确定为“安全”后,再过若干时间,安全模式结束 当检测到副本数不足的数据块时,该块会被复制到达最小副本数。 5.校验和 在文件创立时,每个数据块都产生校验和 校验和会作为单独一个隐藏文件保存在命名空间下 客户端获取数据时可以检查校验和是否相同,从而发现数据块是否损坏 如果正在读取的数据块损坏,则可以继续读取其它副本。 6.回收站 删除文件时,其实是放入回收站/trash 回收站里的文件可以快速恢复 可以设置一个时间阈值,当回收站里文件的存放时间超过这个阈值,就被彻底删除,并且释放占用的数据块。 7.元数据保护 映像文件刚和事务日志是Namenode的核心数据。可以配置为拥有多个副本 副本会降低Namenode的处理速度,但增加安全性 Namenode依然是单点,如果发生故障要手工切换 8.快照机制 支持存储某个时间点的映像,需要时可以使数据重返这个时间点的状态
1.7 HDFS文件操作
命令行方式 API方式
1.7.1 列出HDFS下的文件
注意,hadoop没有当前目录的概念,也没有cd命令
bin/hadoop dfs -ls
假设列出的目录有in目录,再去查看in目录
bin/hadoop dfs -ls ./in复制
1.7.2 上传文件到HDFS
bin/hadoop dfs -put ../abc abc
前面一个文件在本地的位置(采用linux风格的目录),后一个表示hadoop中的文件
上传文件后,使用bin/hadoop dfs -ls 查看,可以看到hadoop中的abc文件夹复制
1.7.3 将HDFS的文件复制到本地
bin/hadoop dfs -get abc ./xyz
将abc文件拷贝到当前目录下的xyz文件中复制
1.7.4 删除HDFS下的文档
bin/hadoop dfs -rmr abc
复制
1.7.5 查看HDFS基本统计信息
bin/hadoop dfsadmin -report
复制
1.7.6 查看hdfs文件夹大小
hadoop fs -du -s -h /user/hive/warehouse/ods.db
复制
1.7.7 进入和退出安全模式
进入安全模式
bin/hadoop dfsadmin -safemode enter
复制
退出安全模式
bin/hadoop dfsadmin -safemode leave
复制
1.8 其他一些操作
1.8.1 怎样添加节点?
在新节点安装好hadoop 把namenode的有关配置文件复制到该节点 修改masters和slaves文件,增加该节点 设置ssh免密码进出该节点 单独启动该节点上的datanode和tasktracker(hadoop-daemoon.sh start datanode/tasktracker) 运行start-balancer.sh进行数据负载均衡
1.8.2 启动某些特定后台进程而非所有后台进程
观察Start-all.sh脚本的内容,其实就是分别启动各个脚本-对应特定的后台进程
1.8.3 负载均衡
作用:当节点出现故障,或新增节点时,数据块分布可能不均匀,负载均衡可以重新平衡各个datanode上数据块的分布
1.9 Hadoop API
上传本地文件到HDFS
import org.apache.hadoop.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFile{
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
//conf.addResources(new Path("conf/hadoop-default.xml"));
//conf.addResources(new Path("conf/hadoop-site.xml"));
FileSystem hdfs=FileSystem.get(conf);
Path src=new Path("xxx.txt");
Path dst=new Path("/");
hdfs.copFormLocalFile(src,dst);
for(FileStatus file:files){
System.out.println(file.getPath());
}
}
}复制
创建HDFS文件
import org.apache.hadoop.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CreateFile{
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();
bte[] buff="hello word!".getBytes();
FileSystem hdfs=FileSystem.get(conf);
Path dfs=new Path("/test");
FSDataOutputStream outputstream=hdfs.create(dfs);
outputStream.write(buff,0,buff.length);
}
}复制
重命名HDFS文件
import org.apache.hadoop.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class Rename{
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();
FileSystem hdfs=FileSystem.get(conf);
Path frpath=new Path("/test");
Path topath=new Path("/test1");
boolean isRename=hdfs.rename(frpath,topath);
}
}复制
查看HDFS文件的最后修改时间
import org.apache.hadoop.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class GetLTime{
public void main(String[] args) throws Exception{
Configuration conf=new Configuration();
FileSystem hdfs=FileSystem.get(conf);
Path fpath=new Path("/test1");
FileStatus fileStatus=hdfs.getFileStatus(fpath);
long modificationTime=fileStatus.getModificationTime();
System.out.println("Modification times:"+modificationTime);
}
}复制
2. Map-Reduce体系架构与实战
2.1 Map-Reduce编程模型
input->map->shuffle->reduce->output
复制
2.2 Map-Reduce原理
2.2.1 Mapper
Map-reduce的思想就是“分而治之” Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”执行 “简单任务”有几个含义:1.数据或计算规模相对于原任务要大大缩小;2.就近计算,即会被分配到存放了所需数据的节点进行计算;3.这些小任务可以并行计算,彼此间几乎没有依赖关系。
2.2.2 Reducer
对map阶段的结果进行汇总 Reducer的数目由mapred-site.xml配置文件里的项目mapred.reduce.tasks决定。缺省值为1,用户可以覆盖之。
2.2.3 Shuffler
在mapper和reducer中间的一个步骤(可以没有) 可以把mapper的输出按照某种key值重新切分和组合成n份,把key值符合某种范围的输出到特定的reducer那里去处理 可以简化reducer过程
2.2.4 例子
Mapper
public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one=new IntWritable(1);
private Text word=new Text();
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
System.out.println("key="+key.toString());//添加查看key值
System.out.println("value="=value.toString());//添加查看value值
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
}
}
}复制
Reducer
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result=new IntWritable();
public void reduce(Text key,Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values){
sum+=val.get();
}
result.set(sum);
context.write(key,result);
}
}复制
运行mapper和reducer
public static void main(String[] args) throws Exception{
Configuration conf=new Configuration();
String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage:wordcount<in><out>");
System.exit(2);
}
Job job=new Job(conf,"word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0:1);
}复制
2.3 性能调优
究竟需要多少个reducer? 输入:大文件优于小文件 减少网络传输:压缩map的输出 优化每个节点能运行的任务数:mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum(缺省值均为2)
2.4 调度机制
缺省为先入先出作业队列调度 支持公平调度器 支持容量调度器
2.5 任务执行优化
推测式优化:即如果jobtracker发现有拖后腿的任务,会再启动一个相同的备份任务,然后哪个先执行完就会kill去另一个。因此在监控网页上经常能看到正常执行完的作业有被kill掉的任务 推测式执行缺省打开,但如果是代码问题,并不能解决问题,而且会使集群更慢,通过在mapred-site.xml配置文件中设置mapred.map.tasks.speculative.excution和mapred.reduce.tasks.speculative.execution可以为map任务或reduce任务开启或关闭推测式执行 重用JVM,可以省去启动新的JVM消耗时间,在mapred-site.xml配置文件中设置mapred.job.reuse.jvm.num.tasks设置单个JVM上运行的最大任务数(1,>1或-1表示没有限制) 忽略模式,任务在读取数据失败2次后,会把数据位置告诉jobtracker,后者重新启动该任务并且在遇到所记录的坏数据时直接跳过(缺省关闭,用SkipBadRecord方法打开)
2.6 错误处理机制:硬件故障
硬件故障是指jobtracker故障或tasktracker故障 jobtracker是单点,若发生故障目前hadoop还无法处理,唯有选择最牢靠的硬件作为jobtracker Jobtracker通过心跳(周期1分钟)信号了解tasktracker是否发生故障或负载过于严重 Jobtracker将从任务节点列表中移除发生故障的tasktracker 如果故障节点在执行map任务并且尚未完成,jobtracker会要求其他节点继续执行尚未完成的reduce任务。
2.7 错误处理机制:任务失败
由于代码缺陷或进程崩溃引起任务失败 JVM自动退出,向tasktracker父进程发送方错误信息,错误信息也会写入到日志 Tasktracker监听程序会发现进程退出,或进程很久没有更新信息送回,将任务标记为失败 标记失败任务后,任务计数器减去1以便接收新任务,并通过心跳信号告诉jobtracker任务失败的信息 Jobtrack获悉任务失败后,将把该任务重新放入调度队列,重新分配出去再执行 如果一个任务失败超过4次(可以设置),将不会再被执行,同时作业也宣布失败。
2.8 审计日志
把log4j.logger.org.apache.hadoop.fs.FSNamesystem.audit=WARN改为“INFO”可以打开审计日志。每个HDFS事件都会在namenode的log中写入一行记录。
2.9 第三方工具
Ganglia Chukwa Openstack
3. Hadoop中重要的进程
3.1 Namenode
HDFS的守护进程 每个进程都有一个 记录文件如何分割成数据块的,以及这些数据存放的节点位置 对内存和I/O进行集中管理 是单点,发生故障将使集群奔溃
3.2 Secondar NameNode(辅助名称节点)
监控HDFS状态的辅助后台程序 每个集群都有一个 与NameNode进行通讯,定期保存HDFS元数据快照 当NameNode故障可以作为备用NameNode使用 需要手动开启
3.3 DataNode
每台从服务器都运行一个 负责HDFS数据块读写到本地文件系统
3.4 JobTracker
用于处理作业(用户提交代码)后台程序 决定有哪些文件参与处理,然后切割task并分配节点 监控task,重启失败的task(于不同的节点) 每个集群只有唯一一个JobTracker,位于Master节点。
3.5 TaskTracker
位于slave节点上,与datanode结合(代码与数据一起的原则) 管理各自节点上的task(由jobtracker分配) 每个节点只有一个tasktracker,但一个tasktracker可以启动多个JVM,用于并行执行map或reduce任务 与jobtracker交互
3.6 Master和Slave
Master:Namenode,Secondary Namenode,Jobtracker.浏览器(用于观看管理页面),其他Hadoop工具。 Slave:Tasktracker、Datanode Master不是唯一的
3.7 三种运行模式
单机模式:安装简单,几乎不用做任何配置,仅限于调试用途 伪分布模式:在单节点上同时启动namenode、datanode、jobtracker、tasktracker、secondary namenode等5个进程,模拟分布式运行的各个节点 完全分布模式:正常的Hadoop集群,由多个各司其职的节点构成
扫码关注公众号
文章转载自程序猿小P,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
相关阅读
Hologres x 函数计算 x Qwen3,对接MCP构建企业级数据分析 Agent
阿里云大数据AI技术
639次阅读
2025-05-06 17:24:44
一页概览:Oracle GoldenGate
甲骨文云技术
503次阅读
2025-04-30 12:17:56
GoldenDB数据库v7.2焕新发布,助力全行业数据库平滑替代
GoldenDB分布式数据库
481次阅读
2025-04-30 12:17:50
优炫数据库成功入围新疆维吾尔自治区行政事业单位数据库2025年框架协议采购!
优炫软件
369次阅读
2025-04-18 10:01:22
XCOPS广州站:从开源自研之争到AI驱动的下一代数据库架构探索
韩锋频道
318次阅读
2025-04-29 10:35:54
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
阿里云大数据AI技术
303次阅读
2025-04-27 15:28:51
Coco AI 入驻 GitCode:打破数据孤岛,解锁智能协作新可能
极限实验室
255次阅读
2025-05-04 23:53:06
优炫数据库成功应用于晋江市发展和改革局!
优炫软件
204次阅读
2025-04-25 10:10:31
首批!百度智能云向量数据库以优异成绩通过中国信通院向量数据库性能测试
百度智能云
192次阅读
2025-05-08 19:35:25
优炫数据库四个案例入选《2024网信自主创新调研报告》
优炫软件
191次阅读
2025-04-22 10:12:23