点击关注上方“知了小巷”,
设为“置顶或星标”,第一时间送达干货。
一、Linux & Shell
二、Hadoop
三、Zookeeper
四、Flume
五、Kafka
六、Hive
七、Sqoop
八、Azkaban
一、Linux & Shell
列一下比较熟悉的常用命令
df -h
find
ps -ef
iotop
free
tree
top
lsof -i
查看端口号、查看进程、查看磁盘空间
netstat
lsof -i
ps -ef
ps -aux
top
df -h
du --max-depth 1 -h
高级命令sed awk cut sort
单引号和双引号区别以及引号嵌套
单引号 ''; 双引号 ""
'$do_date' -> $do_date
"$do_date" -> 2020-11-16
"'$do_date'" -> '2020-11-16'
'"$do_date"' -> "$do_date"
union与union all区别(和SQL语义一致)
union 去重
union all 不去重
进程号未知,如何一条命令退出进程
比如flume
ps -ef | grep flume | -V grep | awk | xargs kill -9
写过哪些Shell脚本
启动停止、分发
#!/bin/bash
case $1 in
"start") {
for i in hadoop101 hadoop102 hadoop103 hadoop104 hadoop105 hadoop106
do
# ssh $i "绝对路径 启动命令"
done
};;
"stop"){
for i in hadoop101 hadoop102 hadoop103 hadoop104 hadoop105 hadoop106
do
# ssh $i "绝对路径 停止命令"
done
};;
# case命令可类比C语言的switch/case语句
# esac表示case语句块的结束
esac
数仓层级内部ods->...->ads
#!/bin/bash
# 定义变量
# 获取时间(传进来一个日期,或者当前日期减1)
# 一些公用变量定义到外面,比如库名、项目名、时间、自定义函数加上库名等
# 编写 sql = ""
# 执行sql 比如 hive -e ...
# 也可以单独把sql写到模板文件里面,使用sed命令给sql文件赋值变量,再执行
# sed -i "s|paramsdaystr|$daystr|g" xxx.sql
# 比如 impala-shell -i ip -f xxx.sql
# 为lzo压缩文件手动添加索引
数仓项目和关系存储(比如MySQL)的导入导出,比如sqoop datax
二、Hadoop
Hadoop基础
端口号
版本:2.x 3.x
| 版本 | HDFS-Web | YARN-WEB | 历史服务器 | 客户端访问端口 |
|---|---|---|---|---|
| 2.x | 50070 | 8088 | 19888 | 9000/8020 |
| 3.x | 9870 | 8088 | 19888 | 9000/8020 |
配置文件及配置项
版本:2.x 3.x
环境变量可直接配成系统的,/etc/profile/my_env.sh
| 2.x | hdfs-site.xml | core-site.xml | yarn-site.xml | mapred-site.xml | slaves |
|---|---|---|---|---|---|
| 3.x | hdfs-site.xml | core-site.xml | yarn-site.xml | mapred-site.xml | workers |
HDFS
HDFS读写流程 小文件有哪些危害
(1)占用NameNode内存
(2)增加MapTask数量,消耗过多资源怎么解决小文件问题
(1)har归档、自定义InputFormat(很少使用了)
(2)CombineTextInputFormat
将大量小文件合并到一起,统一做切片 (3)JVM重用
任务启动和结束时间比任务运行时间还要多,或者叠加耗时较长,避免多次启动和关闭所花费的时间;
JVM重用可以使得JVM实例在同一个job中重新使用N次,N的值在mapred-site.xml中配置:
mapreduce.job.jvm.numtasks (10~20;-1表示不限制N)HDFS副本数,默认3 数据块大小
| 版本或场景 | 块大小 |
|---|---|
| 1.x | 64M |
| 2.x | 128M |
| 3.x | 128M |
| 2.x/3.x本地 | 32M |
| Hive默认 | 256M |
| 业界 | 128M/256M |
MapReduce
Shuffle及其优化
3.1. Shuffle的定义
在MapReduce中,map方法之后,reduce方法之前,混洗的过程就是Shuffle。
本质是group by对数据进行分组(聚合)。
3.2. Map
Map阶段,两次排序(快排 + 归并) getPartition(标记数据是哪个分区):自定义分区器,尽可能将数据打散,避免数据倾斜; 环形缓冲区,内存-反向溢写(默认100M,可以调到200M):默认80%(可以调到90%~95%); 溢写:对Key的索引按照字典顺序进行快速排序,产生大量的溢写文件,然后对指定分区的数据进行归并(归并排序),最后持久化到磁盘,等待Reduce端来拉取数据; 在不影响最终业务逻辑的前提下,可以对溢写文件做一个Combiner;默认一次归并10个文件,服务器内存性能比较好,可以提高到15~20; 在落盘之前,可以对数据进行压缩,减少磁盘和后续的网络IO;采用Snappy或Lzo压缩算法。
对于压缩:
数据量小,仍然考虑快 数据量大,考虑切片Split(Bzip2和Lzop-要创建索引)
3.3. Reduce
去Map端拉取指定分区的数据,放到内存,内存不够溢写到磁盘(也会做归并),进入到Reduce方法;默认是一次拉5个MapTask的分区数据(可以调整到10~20); 内存,增加Reduce内存,归并 reduce方法,还可以做分组排序;
压缩,看实际需求:
如果数据需要永久保存,压缩的就越小越好; 作为下一个MR的输入:看上面的Map压缩。
3.4. 内存配置
生产服务器:128G;
NodeManager默认堆内存大小:8G 可调整为100G左右;
单个任务(Job)默认内存:8G;
MapTask默认内存:1G(注意MapTask堆内存的配置);
ReduceTask默认内存:1G(注意ReduceTask堆内存的配置)。
处理128M数据,需要1G内存;处理1G数据,需要8G内存;处理2G数据,需要16G内存...
数据是否支持切片,如果不支持切片,则内存需要往上调;默认如果1G数据不支持切片,需要4~6G内存。
还有就是总内存与堆内存,不同的配置项。
3.5. 工作线程池大小
dfs.namenode.handler.count=20*math.loge(8)
比如集群规模为8台,工作线程池大小设置为41左右:
int(20*math.loge(8)) = 41
3.6 MapReduce中,分片、分区、排序和分组(Group)的关系
分片的数量等于启动的MapTask的数量。默认情况下,分片的大小就是HDFS的blockSize;
分区是分割map每个节点的结果,按照key分别映射给不同的reduce。分区的设置需要与ReduceTaskNum配合使用。比如想要得到5个分区的数据结果。那么就得设置5个ReduceTask。另外,几个分区数,就会产生几个结果文件。
YARN
(1)YARN调度流程
(2)YARN调度策略 调度器:FIFO,公平,容量;
默认调度器:Apache->容量调度器;CDH->公平调度器;
(3)FIFO调度器特点:
单个队列,先进先出,同一时间只有一个APP执行。
(4)公平调度器特点:
多个队列,队列中每个APP公平享有队列资源,并发度是最高的;
队列之间也可以相互整合资源。
(5)容量调度器特点:
多个队列,每个队列分配相应的资源;底层是FIFO调度器;
一个队列中,优先满足先进入的APP;
队列之间可以相互整合(借用和抢占)资源。
(6)业界选择:并发度较高,选公平;普通企业,并发度不高,选容量。
(7)容量调度器默认只有一个default队列
按照框架/执行引擎创建队列:Hive Spark Flink;
按照业务模块或场景:登录注册、购物、物流模块;
好处:防止递归死循环把整个集群资源全部被耗尽;多队列相互解耦,降低风险;数据量特别大,无法保证所有任务都能正常完成,需要降级运行。
大数据集群基准测试
需要对HDFS读写性能和MR计算能力做测试,测试相关jar包在hadoop的share文件夹下面。
10T数据,需要花多长时间上传到HDFS;100T呢...读呢
Hadoop宕机
MR造成,控制YARN同时运行的APP,每个APP申请的最大内存
yarn.scheduler.maximum-allocation-mb(默认8GB)写文件块造成NameNode宕机。控制数据写入速度。
数据倾斜
大量数据进入同一个Reduce。
Map端提前聚合,进行Combiner,减少数据量传输
导致数据倾斜的Key,大量分布在不同的Mapper
(1)局部聚合(随机打散) + 全局聚合(去掉随机前缀)
(2)增加Reduce数量,提升并行度
(3)实现自定义分区(散列函数)空值处理
null +1
null +2
null +3
集群资源分配参数
假如有30台机器,跑MR任务的时候,发现5个Map任务全部分配到了同一台机器上:yarn.scheduler.fail.assignmultiple
需要关掉。
可以在一个节点上同时开启20个容器(考虑节点距离)。
三、Zookeeper
ZK选举
半数机制常用命令
ls create delete get安装多少台
10台服务器,安装 3;
20台服务器,安装 5;
50台 7,100台 11(不再增加);
台数多,通信时间变长,同步数据耗时,可用性会受到影响。Paxos算法
CAP (ZK是CP,强一致,分区容错)
四、Flume
Flume组成
taildir source
(1)断电续传、多目录
(2)版本:apache->1.7;CDH->1.6
(3)没有taildir source,怎么实现断点续传,自定义source
(4)原理:读取文件,偏移量offset
(5)Flume挂了怎么办?
有可能导致数据重复、但是不会丢失数据。
(6)处理重复数据
不处理:概率小,造成的负面影响可以忽略和接受;
处理:自定义source,实现事务(不建议);下游处理:hive的dwd层等去重,比如group by或开窗取第一条。
(7)是否支持递归:不支持
自定义实现。递归遍历文件夹 + taildirchannel
(1)file Channel 基于磁盘,可靠性高、性能低
(2)memory Channel 基于内存,可靠性低、性能高
(3)kafka Channel 数据存储在kafka集群(基于磁盘),可靠性高、性能优于memory Channel + kafka sink
kafka Channel版本:apache->1.6(有BUG 头+内容;parseAsFlumeEvent不起作用),1.7版本解决掉了这个BUG
(4)生产选择:下游是kafka,优先选择kafka Channel;如果下游不是kafka,金融类公司,选择file Channel;普通日志,选择memory Channelhdfs sink
小文件;文件大小,128M;时间:1~2小时;event个数:每个event大小不一样,文件块有大有小,不利于后期计算
Flume拦截器、选择器、监控器
拦截器
(1)常用到的拦截器
ETL拦截器:数据清洗;或者下游处理。
Flume更换为日志数据本身的时间。
(2)自定义拦截器
定义一个Java类,实现interceptor接口,重写4个方法;
初始化、关闭、单event处理、多event处理;
实现一个静态内部类Builder;
打包上传到flume/lib目录下面;
在配置文件中关联到 自定义类的全限定名+$builder。选择器
replicating 默认选择器:把数据发送到下游所有的Channel;
multiplexing 多路复用选择器:选择性发往自己指定的Channel。监控器
ganglia:尝试提交次数、最终成功的次数,如果差值比较大,说明发生了大量重试,flume性能需要优化。
(1)提高内存,默认2000M(2G) flume-env.sh 提高到4~6G (2)增加flume节点数 日志服务器配置:4~8、16G(业务应用)
Flume优化:
file Channel 配置多目录(挂载多块磁盘),提高吞吐量 调整内存大小;扩展flume节点数
Flume节点挂了怎么办?
(1)memory Channel有风险,可能会丢失数据,默认存储100个Event;最多丢失100个;File Channel默认存储100万个Event
(2)source有可能会重复数据,可以在下游做去重处理
五、Kafka
Kafka基础
Producer Broker Consumer Zookeeper ZK:没有Producer信息;存储了Broker相关信息;存储了Consumer相关信息(高低版本有差异) 生产环境多少台Broker节点:一般3台节点(2 * n + 1)足够、压测 + 动态调整确认;
n是生产者峰值的生产速率 * 副本 100;峰值50MB/秒的时候是3台服务器;压测:生产者峰值的生产速率、消费者消费的峰值消费速率 TopicPartition副本:2~3,2个的居多,默认是1个副本
副本越多,可靠性越高;通信效率会降低Kafka的数据量
100万日活、1人每天100条、1条日志0.5K~2K之间;
平均速率:100万 * 100条 (3600 * 24) = 1150条/秒;
每秒1MB/秒;
一天当中,哪个时间段,数据峰值最高,和业务本身高度相关,比如20M<50M。Kafka默认数据保存7天时间;或者保存3天。不同数据设置不同的保存时间,比如实时数仓(6个小时)。 磁盘空间预留多少
100G日志量 * 副本数2 * 3天 0.7 (保留30%磁盘余量)Kafka监控 Kafka Eagle:开源 分区数 一般3~10个 设置1个分区,然后做压测,得到生产者和消费者数据速率tp和tc;
期望吞吐量为t,并发度-分区数;
分区数 = t min(tp, tc);
假如100MB/秒,tp是20MB/秒,tc是50MB/秒;
分区数 = 100 20 = 5个分区。分区分配策略(消费者) RangeAssignor(The range assignor works on a per-topic basis);
3个KafkaConsumer实例消费10个分区:
0 1 2 3
4 5 6
7 8 9
RoundRobinAssignor(uniformly distributed);
注意可能导致数据倾斜的情况;Hash打散 + 轮询消费,减少数据倾斜。ISR
主要是解决:Leader挂掉了,下一个Leader是谁?在ISR中的都有机会称为新的Leader。
延迟时间、延迟条数(旧版本);新版本:延迟时间。有多少个Topic flume/canal -> kafka;满足下游所有消费者(比如:spark hive kylin flink);
业务模块或场景划分,避免重复解析。
Kafka Broker挂掉
上下游影响,上游积压,下游空跑。
flume:有Channel,可以缓冲数据;
日志服务器,有30天的数据。
消息丢失
ack配置:
0,只发送,不管发送结果,效率最高,可靠性差 1,发送,1个Leader应答,效率中等,可靠性中等 -1,发送,Leader和Follower都应答,效率最低、可靠性最高
生产选择:
金融类公司:-1;普通日志:选择1
消息重复
事务、幂等性 + ack=-1;交给下游处理(hive dwd;spark streaming等);
事务:单分区、单会话(Session);
金融公司:最好不要使用Kafka,可以使用RocketMQ;或者使用Kafka事务 + 幂等性。
普通日志:下游去重处理。
消息积压
消费能力不足,增加TopicPartition数量,同时提升消费组的消费者数量,消费者数=KafkaConsumer实例数=分区数;消费者所在机器的CPU核数也要相应增加。 加快消费者消费速度:下游数据处理及时能力有待提高,比如增加每批次拉取的数据量(batchSize)。
Kafka优化
日志保留策略:Kafka磁盘数据保存时间7天->3天 副本调整为2个 采用压缩 调整内存:默认1G -> 4~6G;可以考虑增加Broker节点数
Kafka高效读写数据
集群、分区、顺序读写600M/秒;随机读写100M/秒 零拷贝技术
Kafka传输一条2M的日志会有什么问题?
Kafka对于消息体的大小默认是单条最大1M,如果消息大于1M,生产者无法将消息推送到Kafka,或者消费者无法去消费Kafka里面的数据,需要调整配置:
# broker可复制的消息的最大字节数,默认为1M
replica.fetch.max.bytes: 1048576
# kafka会接收单个消息size的最大限制,默认1M左右
message.max.bytes: 1000012
# message.max.bytes必须小于replica.fetch.max.bytes
# 否则会导致replica之间同步数据失败
Kafka过期数据清理
保证数据没有被引用(没人消费);日志清理策略:delete和compact(压缩)
# 启用删除或压缩策略
# 生产一般都是直接过期删除
log.cleanup.policy=delete/compact
Kafka按照时间消费数据
可以根据时间拿到offset,再从对应offset开始消费
// partitionInfos
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new ArrayList<>();
// timestampsToSearch
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (PartitionInfo partitionInfo : partitionInfos) {
// topicPartitions
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
// timestampsToSearch
timestampsToSearch.put(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()), fetchDataTime);
}
consumer.assign(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> map = consumer.offsetsForTimes(timestampsToSearch);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : map.entrySet()) {
offsetTimestamp = entry.getValue();
if (offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
// ...
}
}
Kafka消费者拉取数据还是接收推送数据
拉取数据
Kafka数据顺序性
分区内有序;分区之间无序
六、Hive
Hive组成
默认derby数据库;一般存储在MySQL里面;
编译器 选择器 优化器 执行器;
默认引擎:MR 已过时,建议Tez、Spark;
MR引擎:统计周指标、月指标、年指标、数据量比较大
Tez引擎:即席查询,临时使用 测试 Spark引擎:每天的固定定时任务(基于内存和磁盘)
Hive与MySQL的区别
hive:数据量大的时候快,擅长大数据量查询 mysql:数据量小的时候快,小数据量的增删改查
内部表、外部表
删除数据:元数据和原始数据;
内部表:元数据和原始数据都会被删掉;外部表:只删除元数据
在生产环境使用中,什么时候创建内部表、外部表?
绝大多数表都是外部表;自己使用的(临时)表一般使用内部表。
4个by
order by:全局排序,慎用,易于导致数据倾斜
sort by:排序
distribute by:分区
分区内排序,sort by , distribute by
cluster by:排序、分区 字段相同时,使用cluster by
系统函数
日(date_add date_sub)、周(next_day)、月(date_format last_day); 解析json(get_json_object); 判空 nvl; 多行边一行 collect_set() sum...; 一行变多行 炸裂函数 explode()
自定义函数
UDF(一进一出:行)、UDTF(多进多出、一进多出)、UDAF(多进一出)
有系统函数可以解决,为什么还要用自定义函数??更加灵活、方便调试和定位错误(数据比较复杂,可以增加外部引用的jar包,比如地图经纬度计算、IP地址、json嵌套)
自定义UDF的步骤:定义类继承UDF、重写evaluate方法; 自定义UDTF的步骤:定义类继承GenericUDTF,重写三个方法 初始化(定义返回值名称、对返回值类型校验、对输入参数个数类型进行校验)、process(Object[0]=>jsonarray=>forward)、关闭资源close
打包并上传到HDFS路径下,在Hive客户端创建函数create function func_xxx...;变更jar包即可更新函数逻辑
窗口函数
rank
over topN
Hive优化
17 or 30+
mapjoin默认打开,不要关闭掉 行列过滤 join where => where join 分区 小文件处理 CombineHiveInputformat 减少切片Split,进而减少MapTask;
JVM重用 减少JVM结束和启动时间;
merge 在maponly任务中是默认打开的,将小于16M的文件合并到256M,如果是reduce任务,需要开启压缩 列存储 加快查询速度 提前Combiner 不能影响最终业务逻辑 合理选用引擎 mr tez spark 合理设置map个数和reduce个数 split = max(0, min(blocksize, Long.MAX)) 块大小
128M 数据 => 1G 内存
数据倾斜
怎么判断发生了数据倾斜?怎么判断优化起到了效果?
Reduce Tasks for job_xxxxxxxxxx_mmmm
其中有一个任务卡主99%,占用时间最长
大多数任务都已经执行完,个别1~2个任务没有执行完,卡在99%左右 不同数据类型关联产生数据倾斜(越界,int join string)
t_user#user_id int => t_log#user_id string空值分布 自定义分区,key 随机数或123...456
group by优于distinct(新版hive已优化);
mapjoin;
开启负载均衡,任务拆成两个阶段(set hive.groupby.skewindata=true);
设置reduce个数
// 倾斜案例
select xxx from t1 left join on t2 where t1.aid=t2.sid left join t3 on t1.uid=t3.uid
null量在千万级别,直接把null干掉;发现reduce只有一个,手动设置为800个;开启负载均衡,试试还是99%;数据类型,bigint vs string string,cast(t1.sid as string) = t2.sid,bigint超出了范围越界了(越界匹配,注意Hive新版本)。
Hive字段分隔符默认\001,\t
sqoop \t,数据导入会发生异常;内容和分隔符要有规范要求,数据清洗掉。
MySQL元数据备份: 元数据丢掉了,整个数仓就挂掉了,Keepalived;
MySQL utf8超过字节数问题: 主要是表情符号 注意采用utf8mb4
七、Sqoop
在使用Sqoop过程中遇到哪些问题,怎么解决的?
空值问题
hive mysql null \N null \N <==> null 4个参数(官方文档)一致性问题
hive => mysql 4map相互之间没有交互,并行执行,有可能既有成功又有失败;
数仓是为老板做决策提供依据的。指标可以算不出来,但是不能算错。
Sqoop每天向HDFS导入多少数据?
100万日活为例(目前交易量也上每日百万了):100万 * 1KB => 1G (SDC RDBMS => Kudu)
每天订单表、支付表数据多少?(确实公司机密)
30张表,平均也是百万行了...所以做了数据归档、分库分表
数据量大的表是平均值的 2~5倍之间,每天几百M
Sqoop在导数据的时候,是够遇到过数据倾斜?
Hadoop(Hive) Spark Flink Sqoop
没遇到,但还是可能发生数据倾斜(底层MR),一般发生数据倾斜的场景...
rownum() 生成一个严格均匀分布的字段(etl_id),然后指定为分割字段;split-by:按照某一列来切分表的工作单元;num-mappers:启动N个map来并行导入数据,默认4个。
Sqoop Parquet文件
列式存储 ==> mysql直接导会是乱码;使用临时表tmp行式存储、官网转换参数;ads层默认text即可。
每天什么时候执行Sqoop任务?一般执行多久
凌晨,一般00:30分 看具体数据表和数据量,一般40分钟~50分钟
Sqoop参数
举一反三;基本的JDBC参数;数据源 & 数据目标;指定表或query(自定义查询);空值处理;map个数;分隔符默认\001。
八、Azkaban
Azkaban中每天跑多少指标,什么时间开始执行?
DolphinScheduler同上,出现过什么问题...
正常每天100个左右,节假日、活动日 200个;
00:30开始执行,08点之前必须结束(万一完成不了,增加机器、减少分析的指标)
执行发生故障怎么办?
发送邮件、电话告警(第三方智能告警平台 睿象云 restful接口) 24小时有人值班
https://azkaban.github.io/
https://github.com/azkaban/azkaban/releases
通过VPN连上公司服务器 => 任务调度重试失败后手动重试、查看日志紧急排查是否存在特殊原因
猜你喜欢

点一下,代码无 Bug





