摘要 hadoop的两大核心是hdfs和mapreduce,hdfs是适合部署在廉价机器上的高度容错性系统,提供了强大的流式读写文件的能力,mapreduce则提供了大规模数据集的并行运算。
Hadoop中HDFS和MapReduce的进程
本地单机模式安装好hadoop服务后,可以使用start-all.sh
启动hadfs和mapredue服务,也可以使用start-hdfs.sh
和start-yarn.sh
分别启动。正常启动后,通过jps查看有5个进程:
- NameNode 管理文件系统的命名空间。它维护着文件系统树及整棵树内所有的文件和目录。HDFS集群有两类节点,并以管理者-工作者模式运行,即一个NameNode(管理者)和多个DataNode(工作者)[就独立模式而言,仅有一个NameNode(管理者)和一个DataNode(工作者)]。一个HDFS cluster包含一个NameNode和若干的DataNode,NameNode是master,主要负责管理hdfs文件系统,具体地包括namespace管理(其实就是目录结构),block管理(其中包括 filename->block,block->ddatanode list的对应关系)。NameNode 依赖来自每个 DataNode 的定期心跳(heartbeat)消息。每条消息都包含一个块报告,NameNode 可以根据这个报告验证块映射和其他文件系统元数据。如果 DataNode 不能发送心跳消息,NameNode 将采取修复措施,重新复制在该节点上丢失的块。
-
SecondaryNameNode NameNode节点的副本。NameNode进行数据snapshots进行备份,这样尽量降低NameNode崩溃之后,导致数据的丢失,其实所作的工作就是从nn获得fsimage和edits把二者重新合并然后发给NameNode,这样,既能减轻NameNode的负担又能保险地备份。
-
DataNode Datanode是文件系统的工作节点,他们根据客户端或者是namenode的调度存储和检索数据,并且定期向namenode发送他们所存储的块(block)的列表。
-
NodeManager NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)。YARN/MRv2 Node Manager深入剖析—整体架构
-
ResourceManager ResourceManager负责集群中所有资源的统一管理和分配,它接收来自各个节点(NodeManager)的资源汇报信息,并把这些信息按照一定的策略分配给各个应用程序(实际上是ApplicationManager)。YARN/MRv2 Resource Manager深入剖析—RM总体架构
注意:没有JobTracker和TaskTracker进程。
小试牛刀
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.diudiu.mapreduce;
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException; import java.util.stream.Stream;
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = StringUtils.split(value.toString(), ' '); Stream.of(words).forEach((String x) ->{ try { context.write(new Text(x), new IntWritable(1)); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.diudiu.mapreduce;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable i : values) { sum = sum + i.get(); } context.write(key, new IntWritable(sum)); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package com.diudiu.mapreduce;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class RunJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://master:30010"); configuration.set("yarn.resourcemanager.hostname", "master");
FileSystem fs = FileSystem.get(configuration);
Job job = Job.getInstance(configuration); job.setJarByClass(RunJob.class);
job.setJobName("Word Count"); job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/input/word"));
Path outpath = new Path("/output/wc"); if (fs.exists(outpath)) { fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath);
boolean f = job.waitForCompletion(true); if (f) { System.out.println("job任务执行成功"); } } }
|