一、hadoop简介
HADOOP集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起
HDFS集群:负责海量数据的存储,集群中的角色主要有 NameNode / DataNode
YARN集群:负责海量数据运算时的资源调度,集群中的角色主要有 ResourceManager /NodeManager
二、hdfs的工作机制
(一)、概述
-
HDFS集群分为两大角色:NameNode、DataNode
-
NameNode负责管理整个文件系统的元数据
-
DataNode 负责管理用户的文件数据块
-
文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode上
-
每一个文件块可以有多个副本,并存放在不同的datanode上
-
Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
-
HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行
(二)、HDFS写数据流程
1、概述
客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后,客户端按顺序将文件逐个block传递给相应datanode,并由接收到block的datanode负责向其他datanode复制block的副本
2、详细步骤解析
1、根namenode通信请求上传文件,namenode检查目标文件是否已存在,父目录是否存在
2、namenode返回是否可以上传
3、client请求第一个 block该传输到哪些datanode服务器上
4、namenode返回3个datanode服务器ABC
5、client请求3台dn中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个pipeline建立完成,逐级返回客户端
6、client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应答队列等待应答
7、当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。
(三)、HDFS读数据流程
1、概述
客户端将要读取的文件路径发送给namenode,namenode获取文件的元信息(主要是block的存放位置信息)返回给客户端,客户端根据返回的信息找到相应datanode逐个获取文件的block并在客户端本地进行数据追加合并从而获得整个文件
2、详细步骤解析
1、跟namenode通信查询元数据,找到文件块所在的datanode服务器
2、挑选一台datanode(就近原则,然后随机)服务器,请求建立socket流
3、datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)
4、客户端以packet为单位接收,现在本地缓存,然后写入目标文件
(四)、NAMENODE工作机制
1、NAMENODE职责
负责客户端请求的响应
元数据的管理(查询,修改)
2、元数据管理
namenode对数据的管理采用了三种存储形式:
(1)内存元数据(NameSystem)
(2)磁盘元数据镜像文件
(3)数据操作日志文件(可通过日志运算出元数据)
(1)元数据存储机制
A、内存中有一份完整的元数据(内存meta data)
B、磁盘有一个“准完整”的元数据镜像(fsimage)文件(在namenode的工作目录中)
C、用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件)注:当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存meta.data中
(2)元数据手动查看
可以通过hdfs的一个工具来查看edits中的信息
1
2
3
4 1bin/hdfs oev -i edits -o edits.xml
2bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
3
4
(3)元数据的checkpoint
每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(这个过程称为checkpoint) ##### checkpoint操作的触发条件配置参数:
1
2
3
4
5
6
7
8
9
10 1dfs.namenode.checkpoint.check.period=60 #检查触发条件是否满足的频率,60秒
2dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
3#以上两个参数做checkpoint操作时,secondary namenode的本地工作目录
4dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
5
6dfs.namenode.checkpoint.max-retries=3 #最大重试次数
7dfs.namenode.checkpoint.period=3600 #两次checkpoint之间的时间间隔3600秒
8dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录
9
10
checkpoint的附带作用:
namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据
(五)、DATANODE的工作机制
1、概述
(1)Datanode工作职责:
存储管理用户的文件块数据
定期向namenode汇报自身所持有的block信息(通过心跳信息上报)(这点很重要,因为,当集群中发生某些block副本失效时,集群如何恢复block初始副本数量的问题)
1
2
3
4
5
6
7 1<property>
2 <name>dfs.blockreport.intervalMsec</name>
3 <value>3600000</value>
4 <description>Determines block reporting interval in milliseconds.</description>
5</property>
6
7
(2)Datanode掉线判断时限参数:
datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
1
2
3
4 1timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
2而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
3
4
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。
1
2
3
4
5
6
7
8
9
10 1<property>
2 <name>heartbeat.recheck.interval</name>
3 <value>2000</value>
4</property>
5<property>
6 <name>dfs.heartbeat.interval</name>
7 <value>1</value>
8</property>
9
10
(六)、HDFS的java操作
hdfs在生产应用中主要是客户端的开发,其核心步骤是从hdfs提供的api中构造一个HDFS的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS上的文件
首先是依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1 <dependency>
2 <groupId>org.apache.hadoop</groupId>
3 <artifactId>hadoop-client</artifactId>
4 <version>2.6.1</version>
5 </dependency>
6 <!--<dependency>-->
7 <!--<groupId>org.springframework.boot</groupId>-->
8 <!--<artifactId>spring-boot-starter-test</artifactId>-->
9 <!--<scope>test</scope>-->
10 <!--</dependency>-->
11 <dependency>
12 <groupId>junit</groupId>
13 <artifactId>junit</artifactId>
14 <version>4.10</version>
15 </dependency>
16
17
常见文件操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 1package com.hadoop.hdfs;
2import org.apache.hadoop.conf.Configuration;
3import org.apache.hadoop.fs.*;
4import org.junit.Before;
5import org.junit.Test;
6
7import java.io.FileNotFoundException;
8import java.io.IOException;
9import java.net.URI;
10
11public class HdfsClient {
12
13FileSystem fs = null;
14
15@Before
16public void init() throws Exception {
17 // 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI
18 // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址
19 // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml
20 // 然后再加载classpath下的hdfs-site.xml
21 Configuration conf = new Configuration();
22 conf.set("fs.defaultFS", "hdfs://mini1:9000");
23 /**
24 * 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
25 */
26 conf.set("dfs.replication", "3");
27 // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例
28 // fs = FileSystem.get(conf);
29
30 // 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
31 fs = FileSystem.get(new URI("hdfs://mini1:9000"), conf, "root");
32
33
34}
35/**
36 * 往hdfs上传文件
37 *
38 * @throws Exception
39 */
40@Test
41public void testAddFileToHdfs() throws Exception {
42
43 // 要上传的文件所在的本地路径
44 Path src = new Path("/home/youjin/ruanjian/apache-tomcat-8.0.53.tar.gz");
45 // 要上传到hdfs的目标路径
46 Path dst = new Path("/apache-tomcat-8.0.53.tar.gz");
47 fs.copyFromLocalFile(src, dst);
48 fs.close();
49}
50/**
51 * 从hdfs中复制文件到本地文件系统
52 *
53 * @throws IOException
54 * @throws IllegalArgumentException
55 */
56@Test
57public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
58 fs.copyToLocalFile(new Path("/aaa"), new Path("/home/youjin/test"));
59 fs.close();
60}
61
62@Test
63public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
64
65 // 创建目录
66 fs.mkdirs(new Path("/a1/b1/c1"));
67
68 // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
69 fs.delete(new Path("/aaa"), true);
70
71 // 重命名文件或文件夹
72 fs.rename(new Path("/a1"), new Path("/a2"));
73
74}
75
76/**
77 * 查看目录信息,只显示文件
78 *
79 * @throws IOException
80 * @throws IllegalArgumentException
81 * @throws FileNotFoundException
82 */
83@Test
84public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
85
86 // 思考:为什么返回迭代器,而不是List之类的容器
87 RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
88
89 while (listFiles.hasNext()) {
90 LocatedFileStatus fileStatus = listFiles.next();
91 System.out.println(fileStatus.getPath().getName());
92 System.out.println(fileStatus.getBlockSize());
93 System.out.println(fileStatus.getPermission());
94 System.out.println(fileStatus.getLen());
95 BlockLocation[] blockLocations = fileStatus.getBlockLocations();
96 for (BlockLocation bl : blockLocations) {
97 System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
98 String[] hosts = bl.getHosts();
99 for (String host : hosts) {
100 System.out.println(host);
101 }
102 }
103 System.out.println("--------------为angelababy打印的分割线--------------");
104 }
105}
106
107
108/**
109 * 查看文件及文件夹信息
110 *
111 * @throws IOException
112 * @throws IllegalArgumentException
113 * @throws FileNotFoundException
114 */
115@Test
116public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
117
118 FileStatus[] listStatus = fs.listStatus(new Path("/"));
119
120 String flag = "d-- ";
121 for (FileStatus fstatus : listStatus) {
122 if (fstatus.isFile()) flag = "f-- ";
123 System.out.println(flag + fstatus.getPath().getName());
124 }
125}
126}
127
128
通过流的方式访问hdfs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114 1package com.hadoop.hdfs;
2
3import org.apache.hadoop.conf.Configuration;
4import org.apache.hadoop.fs.*;
5import org.apache.hadoop.io.IOUtils;
6import org.junit.Before;
7import org.junit.Test;
8
9import java.io.File;
10import java.io.FileOutputStream;
11import java.io.IOException;
12import java.net.URI;
13
14public class StreamAccess {
15 FileSystem fs = null;
16
17@Before
18public void init() throws Exception {
19
20 Configuration conf = new Configuration();
21 fs = FileSystem.get(new URI("hdfs://mini1:9000"), conf, "root");
22
23}
24@Test
25public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException {
26
27 //先获取一个文件的输入流----针对hdfs上的
28 FSDataInputStream in = fs.open(new Path("/apache-tomcat-8.0.53.tar.gz"));
29
30 //再构造一个文件的输出流----针对本地的
31 FileOutputStream out = new FileOutputStream(new File("/home/youjin/test/apache-tomcat-8.0.53.tar.gz"));
32
33 //再将输入流中数据传输到输出流
34 IOUtils.copyBytes(in, out, 4096);
35
36
37}
38
39/**
40 * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
41 * 用于上层分布式运算框架并发处理数据
42 * @throws IllegalArgumentException
43 * @throws IOException
44 */
45@Test
46public void testRandomAccess() throws IllegalArgumentException, IOException{
47 //先获取一个文件的输入流----针对hdfs上的
48 FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
49
50
51 //可以将流的起始偏移量进行自定义
52 in.seek(22);
53
54 //再构造一个文件的输出流----针对本地的
55 FileOutputStream out = new FileOutputStream(new File("/home/youjin/test/iloveyou.line.2.txt"));
56
57 IOUtils.copyBytes(in,out,19L,true);
58
59}
60
61
62
63/**
64 * 显示hdfs上文件的内容
65 * @throws IOException
66 * @throws IllegalArgumentException
67 */
68@Test
69public void testCat() throws IllegalArgumentException, IOException{
70
71 FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
72
73 IOUtils.copyBytes(in, System.out, 1024);
74}
75
76/**
77 * 获取一个文件的所有block位置信息,然后读取指定block中的内容
78 * @throws IllegalArgumentException
79 * @throws IOException
80 */
81
82@Test
83public void testCat1() throws IllegalArgumentException, IOException{
84
85 FSDataInputStream in = fs.open(new Path("/wordcount/input/somewords.txt"));
86 //拿到文件信息
87 FileStatus[] listStatus = fs.listStatus(new Path("/wordcount/input/somewords.txt"));
88 //获取这个文件的所有block的信息
89 BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
90 //第一个block的长度
91 long length = fileBlockLocations[0].getLength();
92 //第一个block的起始偏移量
93 long offset = fileBlockLocations[0].getOffset();
94
95 System.out.println(length);
96 System.out.println(offset);
97
98 //获取第一个block写入输出流
99// IOUtils.copyBytes(in, System.out, (int)length);
100 byte[] b = new byte[4096];
101
102 FileOutputStream os = new FileOutputStream(new File("/home/youjin/test/block0"));
103 while(in.read(offset, b, 0, 4096)!=-1){
104 os.write(b);
105 offset += 4096;
106 if(offset>=length) return;
107 };
108 os.flush();
109 os.close();
110 in.close();
111}
112}
113
114