hadoop

1.hadoop的优势

  1. 高可靠:维护多个副本,数据不易丢失

  2. 高扩展:集群方便添加节点

  3. 高效:MapReduce并行处理程序

  4. 高容错:提交任务失败会再次提交

2.hadoop的组成

  • MapReduce:负责计算

    Map阶段并行处理输入的数据

    Reduce阶段对Map结果进行汇总

  • Yarn:资源调度

    Yarn主要由ResourceManager,NodeManager,ApplicationMaster和Container等组件构成。

    ResourceManager:接受客户端的请求,监控NodeManager,整个集群的资源调度和分配

    NodeManager:管理单个节点的资源,处理来自ReourceManager和ApplicationMaster的命令

    ApplicationMaster:负责数据的切分,为应用程序申请并且分配资源

    Container:封装资源,如cpu,内存

  • Hdfs:数据存储

    hdfs主要由NameNode,DataNode,SecondaryNameNode组成

    NameNode:负责处理客户端的请求,储存元数据

    DataNode:负责在本地文件系统中存储文件数据块

    SecondaryNameNode:保存NameNode元数据镜像fsimage,定时到NameNode获取edit logs,并合并到fsimage,一旦有了新的fsimage,SecondaryNameNode将其拷贝到NameNode,NameNode在下次重新启动的过程中会加载这个fsimage,减少重启时间。

  • Commons:辅助工具

三.Hdfs

3.1hdfs的读写流程

  • HDFS写文件流程

​ 1.客户端向NameNode发送写数据的请求
2.NameNode响应客户端的请求
3.客户端向NameNode请求发送第一个block的数据,NameNode返回可以写数据的DataNode。
4.客户端根据就近原则,选择DataNode建立连接,第一个DataNode建立连接后,会与其它的DataNode建立连接,建立一个成功的数据传输通道
5.客户端开始向第一个DataNode上传Block,以packet为单位,第一个DataNode收到packet后,会依次传递给后面的DataNode.
6.第一个block上传完毕后,会上传第二个。按照这个步骤依次上传

  • HDFS读文件流程
  1. 客户端发送读文件的请求
  2. NameNode返回目标文件的元信息(目标文件的块,块所在的位置)
  3. 客户端向最近的DataNode发送读数据的请求
  4. DataNode开始向客户端传输数据,客户端以Packet为单位接受

NameNode工作机制:启动时加载fsimage和Editlogs到内存,当客户端请求对元数据进行增删改的时候,NameNode将操作记录到Editlogs中,然后在内存中对元数据进行增删改

Secondary NameNode:会定时的向NameNode发送CheckPoint请求,将Editlogs和fsimage拷贝到Secondary NameNode进行合并,并且将新的fsimage返回给NameNode。

DataNode工作机制:一个数据块在DataNode上以文件形式存储在磁盘上,包括两个文件,一个是数据本身,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。DataNode启动后向NameNode注册,通过后,周期性(1小时)的向NameNode上报所有的块信息。DataNode会与Name进行心跳检测,每三秒一次,心跳返回结果带有NameNode给DataNode命令,如果超过10分钟没有收到某个DataNode的心跳,则认为该节点不可用。

3.2 副本节点选择

假设复制因为3
第一个副本:存储在客户端所在的节点(如果客户端不在集群中,则随机选择一个节点)
第二个副本:存储在与客户端相同机架的不同节点
第三个副本:存储在不同的机架上

3.3.数据完整性

hdfs写入数据的时候会计算校验和,读数据的时候也会计算校验和,客户端读数据的时候也会计算校验和
除了读写之外,datanode还跑着一个后台进程(DataBlockScanner)来定期校验存在在它上面的block,因为除了读写过程中会产生数据错误以外,硬件本身也会产生数据错误。

如果客户端发现有block坏掉,会恢复这个块
1. 客户端在跑出ChecksumException之前,会向NameNode报告错误的block以及它所在的DataNode,
2. NameNode将这个block标记已损坏,NameNode就不会再指向这个block,
3.namenode会把一个好的block复制到datanode
4.NameNode删除坏的block

四.MapReduce

4.1.MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程
1)MrAppMaster:负责整个程序运行过程的资源调度
2)MapTask:负责Map阶段的整个数据处理流程
3)ReduceTask:负责Reduce阶段的整个数据处理流程

4.2.文件切片

​ 1)一个job的MapTask的个数,取决于文件切片的数量
2)一个切片需要一个MapTask
3)默认情况下,切片大小为blockSize计公式(Math.max(minSize, Math.min(maxSize, blockSize));
4)切片时不考虑整体数据,针对每一个文件进行切割

默认的TextInputFormat对于每个文件进行切片,不管文件多小。

CombineTextInputFormat:用于小文件过多的场景,它可以从逻辑上将小文件划分到一个切片中

1).需要设置虚拟存储切片的最大值:CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m

2).切片机制
先按照虚拟存储进行逻辑分块,小于虚拟存储分成一块,大于虚拟存储小于虚拟存储×2的,分两块,对半分。切片的时候小于虚拟存储块的与下一块合并,大于虚拟存储的单独作为一个块

测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:

1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)

最终会形成3个切片,大小分别为:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

KeyValueTextInputFormat:也是按行读取数据,键值对<Text,Text>,默认按tab键分割,tab键前面的为key,后面的为value。可以通过conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR,”\t”)设置

NLineInputFormat:不在按block进行分片,而是按照指定的行数进行分片,NLineInputFormat.setNumLinesPerSplit(job,num);

4.3.Shuffle机制

Shuffle实在map方法之后,reduce方法之前。如图:

1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

4.4.Partition 分区

将统计结果按照条件输出到不同的文件

默认分区

1
2
3
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

自定义 Partitioner 步骤

  1. 自定义类继承 Partitioner, 重写 getPartition()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 1 获取电话号码的前三位
String preNum = key.toString().substring(0, 3);
int partition = 4;
// 2 判断是哪个省
if ("136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}
return partition;
}
}
  1. 在 job 驱动中,设置自定义 partitioner:
1
job.setPartitionerClass(CustomPartitioner.class);
  1. 自定义 partition 后,要根据自定义 partitioner 的逻辑设置相应数量的 reduce task
1
job.setNumReduceTasks(5);

注意:

如果reduceTask的个数大于分区数,则会生成几个空文件
如果reduceTask的个数小于分区数并且大于一,则会抛出IOException
如果reduceTask的个数等于1,生成一个文件
分区号必须从0开始

4.5排序

  • 部分排序

MapReduce 根据输入记录的键对数据集排序。 保证输出的每个文件内部排序。

  • 全排序

何用 Hadoop 产生一个全局排序的文件?最简单的方法是使用一个分区。但该方法在处理大型文件时效率极低, 因为一台机器必须处理所有输出文件,从而完全丧失了MapReduce 所提供的并行架构

  • 辅助排序( GroupingComparator 分组):

在Reduce端对键进行排序。应用于:在接受的key为bean对象时,想让一个或几个字段相同的key进入到同一个reduce方法时,可以采用分组排序。

4.6 Combiner 合并

combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件。combiner 组件的父类就是 Reducer。combiner 和 reducer 的区别在于运行的位置:Combiner 是在每一个 maptask 所在的节点运行;
Reducer 是接收全局所有 Mapper 的输出结果;Combiner 的意义就是对每一个 maptask 的输出进行局部汇总,以减小网络传输量。

combiner 能够应用的前提是不能影响最终的业务逻辑,而且, combiner 的输出 kv 应该跟 reducer 的输入 kv 类型要对应起来。

比如:文件中有两行数据 

3 5 7 

2 6   

要对他们求平均值,如果使用 combiner进行局部汇总,最终结果为

Combiner :(3+5+7)/2=5 ,(2+6)/2=4

reducer:(5+4)/2

不使用combiner,结果为

(3+5+7+2+6)/5=23/5

可以看到结果不相等,所有如果要使用combiner,前提是不影响业务逻辑

自定义combiner:

( 1)自定义一个 combiner 继承 Reducer,重写 reduce 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class WordcountCombiner extends Reducer<Text, IntWritable, Text,
IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// 1 汇总操作
int count = 0;
for(IntWritable v :values){
count = v.get();
}
// 2 写出
context.write(key, new IntWritable(count));
}
}

( 2)在 job 驱动类中设置

1
job.setCombinerClass(WordcountCombiner.class);

4.7.MapTask,ReduceTask工作机制

MapTask:

 1.reader阶段,通过RecoderReader从InputSplit中读取key/value,
 2.map阶段,将读取的key/value经过map处理形成新的key/value,
3.Collector收集阶段:将形成的新的key/value输出到环形缓冲区,进行分区和排序
4.溢写:当环形缓冲区的数据到达一定程度后,会将数据输出到本地磁盘,形成一个临时文件,在溢写的过程中也会进行分区和排序,当溢写的文件数量到达一定程度,会对这些文件进行合并。(sortAndSpill())
5.合并:当所有数据处理完成后,MapTask会将所有生成的临时文件进行合并,合并过程中是以分区为单位进行,以确保每个分区内部有序。 this.mergeParts();

ReduceTask:

​ 1.Copy阶段:ReduceTask会从不同的MapTask中拷贝对应分区的数据,放在内存中,如果数据大小超过阈值,会将数据溢写到磁盘。
2.Merge:ReduceTask将拷贝来的数据进行合并排序,防止内存中的文件过多
3.Sort:基于key的排序,确保将相同的key的数据聚集在一起
4.分组排序:将相同的key的数输入到reduce中
5.reduce:reduce函数,进行数据的处理,处理后的数据写到hdfs中(调用outputformat)

4.8.join操作

  • Map端的join

将小表置于内存中, 对于大表的一个纪录我们在内存中查找即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class MapJoin {
public static class MapJoinMapper extends Mapper<Object, Text, Text, Text> {
public Map<String, String> joinData = new HashMap();
//执行连接操作
public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
String[] values = value.toString().split("\t");
context.write(new Text(joinData.get(values[0])), value);
}
//加载小表
public void setup(Context context) throws IOException, InterruptedException{
Path[] path = DistributedCache.getLocalCacheFiles(context.getConfiguration());
BufferedReader reader = new BufferedReader(new FileReader(path[0].toString()));
String str = null;
while((str = reader.readLine()) != null) {
String[] s = str.split("\t");
joinData.put(s[0], s[1]);
}
}
}

public static class MapJoinReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
int ci = 0;
double total = 0.0;
for(Text val : values) {
ci ++;
String[] v = val.toString().split("\t");
total += Float.parseFloat(v[1]);
}
String str = String.format("%d\t%f", ci, total);
context.write(key, new Text(str));
}
}

public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);

Job job = new Job(conf, "MapJoin");
//设置相关类
job.setJarByClass(MapJoin.class);
job.setMapperClass(MapJoinMapper.class);
job.setReducerClass(MapJoinReducer.class);

//设置map输出格式
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//设置输入输出文件
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));

//等待作业执行完毕
System.exit(job.waitForCompletion(true)?0:1);
}
}
  • Reduce端的join

将Map端输出的key相同即可reduce端进行连接,在Shuffle需要处理的数据太多,会导致效率很低、

五.Yarn

Yarn主要由ResourceManager,NodeManager,ApplicationMaster和Container等组件构成。
ReourceManager:处理客户端请求,监控NodeManager,启动或监控ApplicationMaster,资源的调度和分配
NodeManager:管理单个节点的资源,处理来自ReourceManager命令,处理来自ApplicationMaster的命令
ApplicationMaster:负责数据的切分,为应用程序申请并且分配资源
Container:封装资源,如cpu,内存

yarn运行机制

 1).Client调用job.waitForCompletion方法,向整个集群提交MR作业
2).Client向RM请求job
3).RM返回job资源提交的路径和id
4).Client提交资源(文件分片信息,配置文件信息)
5).资源提交完毕后,向RM申请MRAppMaster
6).RM收到请求,初始化Task,将job添加到容量调度器(任务调度器)
7).空闲的NM领取到该job
8).NM创建一个Container,并产生MRAppMaster,下载client提交的资源
9).MRAppMaster向RM申请多个MapTask.
10).RM将MapTask分配给其它NM
11).MapTask开始运行,进行分区排序等操作
12).MRAppMaster在所有的MapTask任务完成后,向RM申请ReduceTask
13)ReduceTask获取MapTask分区的数据
14)程序运行完毕,MR会自动向RM申请注销自己

六.任务调度器

FIFO调度器:按照作业的优先级高低,再按照作业的到达时间先后选择被执行.

容量调度器(Capacity Scheduler):支持多个队列,每个队列分配一定的资源量,每个队列采用FIFO调度策略,为了防止同一个用户作业独占队列中的资源,该调度器会对同一用户提交的作业占资源量进行限定,调度时,首先按以下策略选择一个合适队列, :计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列.

公平调度器(Fair Scheduler):支持多个队列,每个队列分配一定的资源量,每个队列的job都会按照优先级分配一定的资源,公平调度器job的优先级是根据差额来计算的,差额是指理想状态下获得的资源与实际获得的资源的差额,差额越小,优先级越高

文章目录
  1. 1. 1.hadoop的优势
  2. 2. 2.hadoop的组成
  3. 3. 三.Hdfs
    1. 3.1. 3.1hdfs的读写流程
    2. 3.2. 3.2 副本节点选择
    3. 3.3. 3.3.数据完整性
  4. 4. 四.MapReduce
    1. 4.1. 4.1.MapReduce进程
    2. 4.2. 4.2.文件切片
    3. 4.3. 4.3.Shuffle机制
    4. 4.4. 4.4.Partition 分区
    5. 4.5. 4.5排序
    6. 4.6. 4.6 Combiner 合并
    7. 4.7. 4.7.MapTask,ReduceTask工作机制
    8. 4.8. 4.8.join操作
  5. 5. 五.Yarn
  6. 6. 六.任务调度器
|
载入天数...载入时分秒...