JohnShen's Blog.

大数据入门草稿 - Hadoop

字数统计: 3.8k阅读时长: 17 min
2019/12/16 Share

HDFS、YARN、MapReduce

大数据基础

概述

4V

Volume 海量的数据规模;Variety 多样的数据类型;Velocity 快速的数据流转;Value 发现数据加载。

大数据在技术架构上带来的挑战

对现有数据库管理技术的挑战(TB以上结构化的存储);经典数据库技术没有考虑数据的多类别;实时性的技术挑战;网络架构、数据中心、运维的挑战。

大数据带来的额外挑战:数据隐私。

Google大数据技术

存储容量 GFS;读写速度 BigTable;计算效率 MapReduce

Hadoop

开源的分布式存储+分布式计算平台,可以对多机器的海量数据进行分布式处理的框架。其拥有成熟的生态圈,可存储在廉价的机器上。

Hadoop 主要包括四部分:

  • Hadoop Common:Hadoop 共用包;

  • Hadoop Distributed File System(HDFS):可提供高吞吐量的分布式文件系统;

  • Hadoop YARN:工作调度 & 资源管理;

  • Hadoop MapReduce:基于 YARN 可并行处理大数据集的框架

狭义的Hadoop即为这三者的总和,而广义的Hadoop是指Hadoop生态系统,生态系统中的每一个子系统只解决某一个特定问题域,是多个小而精的系统。

HDFS

特点:扩展性 & 容错性 & 海量数据存储

将文件切分成指定大小(可配置,默认128M)的数据块并以多副本的形式存储在多个机器上。

数据切分、多副本、容错等操作对用户是透明的。

YARN

Yet Another Resource Negotiator,负责整个集群资源的管理和调度(作业占用CPU及内存)。

特点:扩展性 & 容错性(Task异常进行一定次数的重试)& 多框架资源统一调度(跑Spark等额外类型的作业,14年Spark逐渐代替MapReduce称为Hadoop缺省执行引擎。)

MapReduce

Hadoop MapReduce 是 Google MapReduce 的克隆版

特点:扩展性 & 容错性 & 海量数据离线处理

HDFS

如果要自行设计一个简易的分布式文件系统,可能会将大小不同的文件以多副本的方式存在在多台机器上,并且记录文件存在哪台机器上等元数据。这样的缺点是:不管文件多大都直接放在一个节点上会造成网络瓶颈且很难进行并行数据处理;存储负载较难均衡,部分节点的利用率较低。

而HDFS的做法是将每个文件先进行拆分,每块大小可设置(如128M ),每个Block多副本的方式进行存储。大小一致的Block会让机器的存储负载好很多,且便于并行处理。

HDFS架构

1个Master(NameNode/NN) & N个Slaves(DataNode/DN)

  • NameNode:负责客户端请求的响应;负责元数据(文件的名称、副本系数、Block存放的DataNode)的管理;

  • DataNode:存储用户文件对应的数据块(Block);定期向NN发送心跳信息,汇报本身及所有Block信息,健康状况;

实际生产部署时, 一个机器部署NameNode,其他每个机器部署一个DataNode。

Replication Factor:副本系数(副本因子)。每个文件可以单独配置副本因子和 Block Size。

HDFS的文件只能写一次(除非 appends truncates),且在任意时间内只能有一个 writer 进行写操作,不支持多并发写。

Hadoop伪分布式安装

  1. JDK安装并添加至环境变量

  2. 安装ssh,并配置免密登录

    1
    2
    3
    sudo yum install ssh
    ssh-keygen -t rsa
    cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys
  3. 下载并解压 hadoop,hadoop-2.6.0-cdh5.7.0.tar.gz

  4. 更改 hadoop 配置 (hadoop_home/etc/hadoop) :

    • hadoop-env.sh 更改 JAVA_HOME

      1
      export JAVA_HOME=...
    • core-site.xml

      1
      2
      3
      4
      5
      6
      7
      8
      <property>
      <name>fs.defaultFS</name>
      <value>hdfs://xxx:8020</value>
      </property>
      <property>
      <name>hadoop.tmp.dir</name>
      <value>/home/xxx/tmp-hadoop</value>
      </property>
    • hdfs-site.xml

      1
      2
      3
      4
      <property>
      <name>dfs.replication</name>
      <value>1</value>
      </property>
    • slaves 文件添加 slave 信息,此处不做变动

  5. 启动HDFS:格式化文件系统,进入bin目录,ce,之后启动 hdfs,sbin/start-dfs.sh,使用jps查看 DataNode 以及 NameNode 是否启动成功,或者通过浏览器端口 50070。

  6. 停止HDFS:sbin/stop-dfs.sh

HDFS Shell

hdfs dfshadoop fs指令相同,直接输入后可以看见其他后续指令。

1
2
3
4
5
6
7
8
9
10
11
12
hdfs dfs -ls /
hdfs dfs -put test.log / 向HDFS存放文件
hdfs dfs -text /test.log 查看
hdfs dfs -cat /test.log 查看
hdfs dfs -mkdir /tt
hdfs dfs -mkdir -p /tt/a/b 递归创建
hdfs dfs -ls /tt/a
hdfs dfs -ls /tt/a/b
hdfs dfs -ls -R / 递归查看
hdfs dfs -get /test.log 从HDFS中获取文件
hdfs dfs -rm /test.log 删除文件
hdfs dfs -rm -R /tt 删除目录

HDFS Java API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public static final String HDFS_PATH = "hdfs://10.211.55.6:8020";

FileSystem fileSystem = null;
Configuration configuration = null;

@Before
public void setUp() throws Exception {
System.out.println("HDFSApp.setUp");
configuration = new Configuration();
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, "parallels");
}

@After
public void tearDown() throws Exception {
configuration = null;
fileSystem = null;
System.out.println("HDFSApp.tearDown");
}

/**
* 创建HDFS目录
*/
@Test
public void mkdir() throws Exception {
fileSystem.mkdirs(new Path("/hdfsapi/test2"));
}

/**
* 创建文件
*/
@Test
public void create() throws Exception {
FSDataOutputStream output = fileSystem.create(new Path("/hdfsapi/test2/a.txt"));
output.write("hello hadoop".getBytes());
output.flush();
output.close();
}

/**
* 查看HDFS文件的内容
*/
@Test
public void cat() throws Exception {
FSDataInputStream in = fileSystem.open(new Path("/hdfsapi/test2/a.txt"));
IOUtils.copyBytes(in, System.out, 1024);
in.close();
}


/**
* 重命名
*/
@Test
public void rename() throws Exception {
Path oldPath = new Path("/hdfsapi/test/a.txt");
Path newPath = new Path("/hdfsapi/test/b.txt");
fileSystem.rename(oldPath, newPath);
}

/**
* 上传文件到HDFS
*/
@Test
public void copyFromLocalFile() throws Exception {
Path localPath = new Path("/Users/Finch/Downloads/springboot2.1.7.pom");
Path hdfsPath = new Path("/hdfsapi/test");
fileSystem.copyFromLocalFile(localPath, hdfsPath);
}

/**
* 上传文件到HDFS
*/
@Test
public void copyFromLocalFileWithProgress() throws Exception {
InputStream in = new BufferedInputStream(
new FileInputStream(
new File("/Users/rocky/source/spark-1.6.1/spark-1.6.1-bin-2.6.0-cdh5.5.0.tgz")));

FSDataOutputStream output = fileSystem.create(new Path("/hdfsapi/test/spark-1.6.1.tgz"),
new Progressable() {
public void progress() {
System.out.print("."); //带进度提醒信息
}
});


IOUtils.copyBytes(in, output, 4096);
}


/**
* 下载HDFS文件
*/
@Test
public void copyToLocalFile() throws Exception {
Path localPath = new Path("/Users/rocky/tmp/h.txt");
Path hdfsPath = new Path("/hdfsapi/test/hello.txt");
fileSystem.copyToLocalFile(hdfsPath, localPath);
}

/**
* 查看某个目录下的所有文件
*
* 问题:我们已经在hdfs-site.xml中设置了副本系数为1,为什么此时查询文件看到的3呢?
* 如果你是通过hdfs shell的方式put的上去的那么,才采用默认的副本系数1
* 如果我们是java api上传上去的,在本地我们并没有手工设置副本系数,所以否则采用的是hadoop自己的副本系数
*
* 【设置副本系数:configuration.set("dfs.replication", "1");】
*/
@Test
public void listFiles() throws Exception {
FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/hdfsapi/test"));

for(FileStatus fileStatus : fileStatuses) {
String isDir = fileStatus.isDirectory() ? "文件夹" : "文件";
short replication = fileStatus.getReplication();
long len = fileStatus.getLen();
String path = fileStatus.getPath().toString();

System.out.println(isDir + "\t" + replication + "\t" + len + "\t" + path);
}

}

/**
* 删除
*/
@Test
public void delete() throws Exception{
fileSystem.delete(new Path("/"), true);
}

HDFS读写原理

通过漫画轻松掌握HDFS工作原理

HDFS优缺点

优点:数据冗余、硬件容错;可构建在廉价机器上;适合存储大文件。

缺点:数据访问延迟(想要极快进行数据检索并不现实);不适合小文件存储(小文件太多会导致NameNode占用内存信息越多,增加了NameNode压力)。

YARN

资源调度框架YARN在1.x时代只支持MapReduce任务,而在2.x之后可以让更多的计算框架(如Spark、Storm)运行在集群里面,不同的计算框架可以共享同一个HDFS的数据,享受整体的资源调度。

YARN架构

ResourceManager: RM

  • 整个集群同一时间提供服务的RM只有一个(可以使用主从解决单点问题),负责集群资源的统一管理和调度
  • 处理客户端的请求: 提交一个作业、杀死一个作业
  • 监控我们的NM,一旦某个NM挂了,那么该NM上运行的任务需要告诉我们的AM来如何进行处理

NodeManager: NM

  • 整个集群中有多个,负责自己本身节点资源管理和使用
  • 定时向RM汇报本节点的资源使用情况
  • 接收并处理来自RM的各种命令:启动Container
  • 处理来自AM的命令
  • 单个节点的资源管理

ApplicationMaster: AM

  • 每个应用程序对应一个:MR、Spark,负责应用程序的管理
  • 为应用程序向RM申请资源(core、memory),分配给内部task
  • 需要与NM通信:启动/停止task,task是运行在container里面,AM也是运行在container里面

Container

  • 封装了CPU、Memory等资源的一个容器
  • 是一个任务运行环境的抽象

Client

  • 提交作业
  • 查询作业的运行进度
  • 杀死作业

运行流程

https://github.com/heibaiying/BigData-Notes/blob/master/notes/Hadoop-YARN.md

  1. Client 提交作业到 YARN 上;
  2. Resource Manager 选择一个 Node Manager,启动一个 Container 并运行 Application Master 实例;
  3. Application Master 根据实际需要向 Resource Manager 请求更多的 Container 资源(如果作业很小, 应用管理器会选择在其自己的 JVM 中运行任务);
  4. Application Master 通过获取到的 Container 资源执行分布式计算。

YARN伪分布式环境搭建

1)mapred-site.xml [ mapred-site.xml.template 改名去后缀 ]

1
2
3
4
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

2)yarn-site.xml

1
2
3
4
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

3) 启动YARN相关的进程

1
sbin/start-yarn.sh

4)验证

  • jps:ResourceManager 、NodeManager
  • 访问8088端口

5)停止YARN相关的进程

1
sbin/stop-yarn.sh

提交mr作业到YARN上运行:

  • 使用Jar包:hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar

  • 使用命令hadoop jar,比如 hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3

MapReduce

优点:海量数据离线处理

缺点:实时流式计算

入门案例:wordcount: 统计文件中每个单词出现的次数,借用 MapReduce 可以实现分而治之。

MapReduce 框架专门用于键值对处理,它将作业的输入视为一组对,并生成一组对作为输出。输出和输出的 keyvalue 都必须实现Writable 接口。 key classes 还必须实现 WritableComparable 接口。

1
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

核心概念

Split: 交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元。HDFS中 blocksize 是HDFS中最小的存储单元 128M。默认情况下:他们两是一一对应的,当然我们也可以手工设置他们之间的关系(不建议)

InputFormat: 将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;TextInputFormat: 处理文本格式的数据

OutputFormat: 输出

Combiner

Partitioner

MapReduce 2.x 架构

Java 版 WordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 使用MapReduce开发WordCount应用程序
**/
public class WordCountApp {

/**
* Map: 读取输入的文件内容
*/
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

LongWritable one = new LongWritable(1);

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 接收到的每一行数据
String line = value.toString();

// 按照指定的分割符进行拆分
String[] words = line.split(" ");
for (String word : words) {
// 通过上下文把map的处理结果输出
context.write(new Text((word)), one);
}
}
}

/**
* Reduce: 归并操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
// 求key出现的次数总和
sum += value.get();
}
// 将最终的统计结果输出
context.write(key, new LongWritable(sum));
}
}

/**
* 定义Driver:封装了MapReduce作业的所有信息
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();

// 准备清理已存在的输出目录, 在MR中,输出文件是不能事先存在的
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath,true);
System.out.println("output file exists, but is has deleted");
}

// 创建Job,通过参数设置Job的名称
Job job = Job.getInstance(configuration, "wordcount");

// 设置Job的处理类
job.setJarByClass(WordCountApp.class);

// 设置作业处理的输入路径
FileInputFormat.setInputPaths(job, new Path(args[0]));

// 设置map相关参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

// 设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

// 通过Job对象来设置Combiner处理类,在逻辑上和reduce是一样的
job.setCombinerClass(MyReducer.class);

// 设置作业处理完成后的输出路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

执行:

1
hadoop jar /home/hadoop/lib/hadoop-train-1.0.jar com.hadoop.mapreduce.WordCountApp hdfs://10.211.55.6:8020/hello.txt hdfs://10.211.55.6:8020/output/wc

Combiner

  • 本地的Reduce;
  • 减少Map Tasks输出的数据量及数据网络传输量;
  • 但适用场景有限,比如求和计数等,不适合求平均数等类似场景。

Partitioner

Partitioner决定Map Task输出的数据交由哪个Reduce Task处理。默认实现:分发的key的hash值对Reduce Task个数取模。

案例:手机销量手机按品牌分类收集至各个文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 接收到的每一行数据
String line = value.toString();

// 按照指定的分割符进行拆分
String[] words = line.split(" ");
// 通过上下文把map的处理结果输出
context.write(new Text((words[0])), new LongWritable(Long.parseLong(words[1])));
}
}

public static class MyPartitioner extends Partitioner<Text, LongWritable> {

public int getPartition(Text key, LongWritable value, int numPartitions) {
if(key.toString().equals("xiaomi")){
return 0;
}
if(key.toString().equals("huawei")){
return 1;
}
if(key.toString().equals("iphone7")) {
return 2;
}
return 3;
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
...
// 设置Job的partition
job.setPartitionerClass(MyPartitioner.class);
// 设置4个reducer,每个分区一个
job.setNumReduceTasks(4);

FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

JobHistory

JobHistory是一个Hadoop自带的历史服务器,它用于记录已运行完的MapReduce信息到指定的HDFS目录下。可以通过HTTP页面访问获得信息。

mapred-site.xml 添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<!-- jobhistory的通信地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>192.168.77.130:10020</value>
<description>MapReduce JobHistory Server IPC host:port</description>
</property>
<!-- jobhistory的web访问地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>192.168.77.130:19888</value>
<description>MapReduce JobHistory Server IPC host:port</description>
</property>
<!-- 任务运行完成后,history信息所存放的目录 -->
<property>
<name>mapreduce.jobhistory.done-dir</name>
<value>/history/done</value>
</property>
<!-- 任务运行中,history信息所存放的目录 -->
<property>
<name>mapreduce.jobhistory.intermediate-done-dir</name>
<value>/history/done_intermediate</value>
</property>

yarn-site.xml 添加:

1
2
3
4
5
<!-- 开启聚合日志 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

之后需要重启YARN访问,stop-yarn.sh 以及 start-yarn.sh。

启动 sbin/mr-jobhistory-daemon.sh start historyserver,在19888端口处就可以看到接下来的执行信息了。

https://blog.51cto.com/zero01/2093445

CATALOG
  1. 1. 大数据基础
    1. 1.1. 概述
      1. 1.1.1. 4V
      2. 1.1.2. 大数据在技术架构上带来的挑战
      3. 1.1.3. Google大数据技术
      4. 1.1.4. Hadoop
        1. 1.1.4.1. HDFS
        2. 1.1.4.2. YARN
        3. 1.1.4.3. MapReduce
  2. 2. HDFS
    1. 2.1. HDFS架构
    2. 2.2. Hadoop伪分布式安装
    3. 2.3. HDFS Shell
    4. 2.4. HDFS Java API
    5. 2.5. HDFS读写原理
    6. 2.6. HDFS优缺点
  3. 3. YARN
    1. 3.1. YARN架构
    2. 3.2. 运行流程
    3. 3.3. YARN伪分布式环境搭建
  4. 4. MapReduce
    1. 4.1. 核心概念
    2. 4.2. MapReduce 2.x 架构
    3. 4.3. Java 版 WordCount
      1. 4.3.1. Combiner
      2. 4.3.2. Partitioner
    4. 4.4. JobHistory