大数据概念
-
什么是大数据?
大数据是指高速(velocity)涌现的大量(volume)多样化(variety)具有一定价值(value)并且真实(veracity)的数据,其特性可简单概括为5V。
-
原理流程
- 数据采集
大数据首先需要将来自不同来源和应用的数据汇集在一起。需要导入和处理数据、执行格式化操作,以符合业务分析所要求的形式。
- 数据存储
大数据对存储要求较高。存储解决方案可以部署在本地,也可以部署在云端。可以采用任何形式存储数据,根据需要为数据集设置处理要求,引入必要的处理引擎。
- 数据分析
数据分析是根据数据内容及业务要求对数据进行处理的过程,在该过程中需要根据不同的数据量和业务形式选择合适的处理引擎,大数据开发过程有时也会涉及到机器学习相关算法
Hadoop简介
Hadoop是一个适合海量数据的分布式存储和分布式计算的平台
hadoop的处理思想,分而治之
扩容能力(Scalable):能可靠(reliably)地存储和处理PB级别的数据。如果数据量更大,存储不下了,再增加节点就可以了。
成本低(Economical):可以通过普通机器组成的服务器集群来分发以及处理数据.这些服务器集群可达数千个节点。
高效率(Efficient):通过分发计算程序,hadoop可以在数据所在节点上(本地)并行地(parallel)处理他们,这使得处理非常的迅速
可靠性(Reliable):hadoop能够自动地维护数据的多份副本,并且在任务失败后能够自动地重新部署(redeploy)计算任务.
hdfs shell 常用操作
上传:hdfs dfs -put 本地路径 HDFS路径
下载:hdfs dfs -get HDFS路径 本地路径
创建目录:hdfs dfs -mkdir HDFS路径
查看对应路径中的内容:hdfs dfs -ls /
查看文件内容:
- hdfs dfs -cat HDFS路径
- hdfs dfs -tail HDFS路径
- hdfs dfs -tail -f HDFS路径 监听HDFS中的文件内容
- hdfs dfs -text HDFS路径 可以查看被Hadoop压缩的压缩文件
追加信息到文件中:hdfs dfs -appendToFile words.txt /input/words.txt
删除文件: hdfs dfs -rm -r -f /output
查看指定目录下对应空间占用情况:hdfs dfs -du -h /
复制:hdfs dfs -cp 源路径 目标路径
重命名:hdfs dfs -mv 源路径 目标路径
修改文件权限:hdfs dfs -chmod 735 目标路径
查看存储空间:hdfs dfs -df 查看整个HDFS中空间使用情况
查看状态:hdfs dfsadmin -report
存储时遇到的问题
-
数据需要切成多少块,每块的大小是多少?
HDFS中的文件会被分割成若干个块(Block)进行存储。默认情况下,每个块的大小为128MB(在早期版本中默认是64MB)。你可以根据具体需求在HDFS配置中调整块的大小。
-
数据是以什么样的形式保存,是否可以进行压缩?
数据在HDFS中以块的形式保存,每个块在磁盘上都是一个文件。HDFS支持多种数据压缩格式,如Gzip等。压缩数据可以减少存储空间并提高I/O性能。压缩与解压缩操作通常在数据读写时由MapReduce等计算框架处理。
-
数据的存储位置,提供一个标准?
HDFS使用主从架构,由一个NameNode和多个DataNode组成。NameNode负责管理文件系统的命名空间和元数据,DataNode负责实际存储数据块。每个数据块在集群中的多个DataNode上有多个副本(默认是3个),以保证数据的高可用性和可靠性。
-
需要将数据的存储位置记录在某个地方,对应的顺序?
NameNode记录所有数据块的位置及其对应的DataNode。它维护文件系统的元数据,包括文件到块的映射、块到DataNode的映射、文件的访问权限等。客户端访问文件时,NameNode提供所需的元数据信息。
-
对于文件的描述信息需要进行存储(对数据的描述信息以及存储位置等称元数据)?
NameNode存储了所有的元数据,包括:
- 文件到块的映射
- 块到DataNode的映射
- 文件的目录结构
- 文件和目录的权限、所有者和时间戳信息
这些元数据存储在NameNode的内存中,并定期写入磁盘中的FsImage文件和编辑日志(Edits log)以保证数据的持久性
-
对于存储的数据如果丢失了一部分,那么该怎么处理?
HDFS通过副本机制来处理数据丢失问题。每个数据块有多个副本,分布在不同的DataNode上。具体处理机制如下:
- 数据块丢失监测:NameNode会周期性地与DataNode通信,检查每个块的健康状态。如果某个块的副本少于指定数量(通常是3个),就认为数据块丢失。
- 数据块重复制:NameNode会选择新的DataNode来创建丢失副本的副本,从剩余的健康副本复制数据块。这一过程自动进行,保证了数据的高可用性。
通过这些机制,HDFS保证了数据的可靠性和高可用性,即使部分DataNode失效或数据块丢失,也能快速恢复数据。
-
数据持久化
内存中会存在这样的问题:内存当断电时,会导致内存数据丢失,为了保证数据的安全性,于是需要将数据进行持久化操作,持久化是将数据保存在磁盘当中,如果对内存中的数据,直接将数据以快照的形式保存在磁盘中,会导致部分的数据可能会存在丢失,那么如果以客户端操作命令将其追加到磁盘文件中,那么会导致最终的持久化文件会非常大
针对上述的问题,又如下策略:
- 将客户端中的操作记录不断的保存到一个文件中成为edits文件
- 于是需要定期的将edits文件和内存中的数据进行合并,生成对应的fsimage文件
edits文件保存了部分用户的所有操作记录
fsimage文件时保存了部分元数据信息的
1、当Hadoop第一次安装启动后,会先在Data目录下生成空的edits文件和空的fsimage文件,edits文件用于追加用户的写和修改操作记录
2、当程序在不断的运行过程中,由于客户端会不断上传修改文件,导致edits文件很大,于是当SecondaryNameNode发起Checkpoint请求之后,所有客户端的写操作追加到一个新的edits文件
3、对于Secondary NameNode 会将Fsimage文件和edits文件合并生成一个新的Fsimage文件
4、当Hadoop要重启之后,原先内存中的数据都消失了,之后通过NameNode加载新的edits文件和新的Fslmage文件到内存中进行合并就生成完成的元数据信息了Secondary NameNode的checkpoins请求触发的机制:
1.CheckPoint发起时机:1.fs.checkpoint.period指定两次checkpiont的最大时间间隔,默认3600秒。
2.fs.checkpoint.size 规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否到达最大时间间隔默认大小是64M
Hadoop分布式集群中计算存在的问题
-
如果要对巨大量的单个文件数据进行计算,那么单台计算机不能完成工作,需要使用多台服务器,每台服务器处理多少的数据量?
在Hadoop中,文件被分成多个块(block),每个块的大小可以配置(默认128MB)。当对一个巨大的单个文件进行计算时,这些块会被分配到不同的DataNode(数据节点)上进行并行处理。Hadoop通过MapReduce来实现这种分布式计算。
- Map阶段:
- 输入文件被切分成多个块。
- 每个块被分配给一个Mapper任务,Mapper会读取该块并进行相应的处理。
- 多个Mapper任务并行运行,处理不同的块。
- 处理的数据量:
- 每个Mapper任务处理一个块的数据。由于块的大小是固定的(比如128MB),所以每台服务器(运行Mapper任务的DataNode)处理的数据量就是块大小乘以该服务器上运行的Mapper任务数量。
- 服务器的数量和每个服务器上Mapper任务的数量取决集群的规模和配置。
- Map阶段:
-
如果每个服务器处理自身存储的数据,那么最终的结果如何进行汇集到一个或多个计算机中进行求和计算?
- Shuffle和Sort阶段:
- Map任务输出的结果(中间数据)会经过Shuffle阶段,将具有相同键的记录发送到同一个Reduce任务。
- Shuffle阶段包括数据的分组和排序,为Reduce任务做准备。
- Reduce阶段:
- Reduce任务接收到由Shuffle阶段整理好的数据进行汇总处理。
- Reduce任务可以计算局部汇总,例如求和操作。
- 最终结果的汇集:
- 多个Reduce任务的输出结果可以写到HDFS中,形成最终结果文件。
- 结果可以在单个Reduce任务中完成,也可以通过后处理。
- Shuffle和Sort阶段:
MapReduce
常用的writable实现类
- 布尔型(boolean) BooleanWritable
- 字节型(byte) ByteWritable
- 整型(int)IntWritable / VIntWritable
- 浮点型(float) FloatWritable
- 双精度浮点型(double) DoubleWritable
- 长整型(long) LongWritable / VLongWritable
- 字符串 String Text
mapreduce中的方法
-
小文件合并
当MapReduce的数据源中小文件过多,那么根据FileInputFormat类中GetSplit函数加载数据,会产生大量的切片从而启动过多的MapTask任务,MapTask启动过多那么会导致申请过多资源,并且MapTask启动较慢,执行过程较长,效率较低
如何解决?
可以使用MR中的CombineTextInputFormat类,在形成数据切片时,可以对小文件进行合并。
-
输出类
对于文本文件输出MapReduce中使用FileOutputFormat类作为默认输出类,但是如果要对输出的结果文件进行修改,那么需要对输出过程进行自定义。
而自定义输出类需要继承FileOutputFormat 并在RecordWriter中根据输出逻辑将对应函数进行重写 -
关联分析
map阶段的主要任务是对不同文件中的数据打标签
reduce阶段进行实际的连接操作
-
过滤
在MapReduce过程中可以根据判断逻辑选择适当的数据进行写出,同时MapReduce过程中允许只存在有Map过程
-
序列化
序列化 (Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。在序列化期间,对象将其当前状态写入到临时或持久性存储区。以后,可以通过从存储区中读取或反序列化对象的状态,重新创建该对象。
当两个进程在进行远程通信时,彼此可以发送各种类型的数据。无论是何种类型的数据,都会以二进制序列的形式在网络上传送。发送方需要把这个对象转换为字节序列,才能在网络上传送;接收方则需要把字节序列再恢复为对象。把对象转换为字节序列的过程称为对象的序列化。把字节序列恢复为对象的过程称为对象的反序列化。
在Hadoop中可以自定义序列化类。
通过实现WritableComparable接口,写出write(序列化操作)和readFields(去序列化操作)方法。
MapReduce优化
-
IO阶段:
问题:大量的网络传输,会降低MR执行效率
解决方案:
- 采用数据压缩的方式,减少网络IO的时间
- 通过Combine或者提前过滤数据减少数据传输量
- 适当备份,因为备份多可以本地化生成map任务
-
Reduce阶段
执行效率慢
解决方案:
- 合理设置Reduce数量
- 使用MapJoin规避使用Reduce,减少shuffle
- 使Key分配均匀,避免数据倾斜的产生
-
数据输入
问题:合并小文件:因为大量小文件会产生大量的Map任务,而任务的装载比较耗时,从而导致MR运行较慢
解决方案:
- 在读取计算前,对小文件进行合并
-
Map阶段
多次溢写会产生多个溢写文件,并且最终需要合并成一个结果文件
解决方案:
-
增大触发spill的内存上限,减少spill次数,从而减少磁盘IO
io.sort.mb:环形缓冲区大小
sort.spill.percent:默认溢出率为(80%)
-
不影响业务逻辑前提下,先进行Combine处理,减少I/O
-
MapReduce进阶
mapreduce默认输入处理类
InputFormat:抽象类,只是定义了两个方法
FileInputFormat:
- FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。
TextInputFormat:
- 是默认的处理类,处理普通文本文件
- 文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value
- 默认以\n或回车键作为一行记录
RecordReader
每一个InputSplit都有一个RecordReader,作用是把InputSplit中的数据解析成Record,即<k1,v1>。
在TextInputFormat中的RecordReader是LineRecordReader,每一行解析成一个<k1,v1>。其中,k1表示偏移量,v1表示行文本内容
shuffle
广义的Shuffle过程是指,在Map函数输出数据之后并且在Reduce函数执行之前的过程。在Shuffle过程中,包含了对数据的分区、溢写、排序、合并等操作
Shuffle源码主要的内容包含在 MapOutputCollector 的子实现类中,而该类对象表示的就是缓冲区的对象,该类中的函数有如下:
public void init(Context context ) throws IOException, ClassNotFoundException; public void collect(K key, V value, int partition ) throws IOException, InterruptedException; public void close() throws IOException, InterruptedException; public void flush() throws IOException, InterruptedException, ClassNotFoundException;
自定义分区
默认分区下,如果Reduce的数量大于1,那么会使用HashPartitioner对Key进行做Hash计算,之后再对计算得到的结果使用reduce数量进行取余得到分区编号,每个reduce获取固定编号中的数据进行处理
自定义分区需要重写分区方法,根据不同的数据计算得到不同的分区编号
// 添加自定义分区类 job.setPartitionerClass(Partitioner.class); // 按照需求给出分区个数 job.setNumReduceTasks();
Combine
combiner发生在map端的reduce操作。
- 作用是减少map端的输出,减少shuffle过程中网络传输的数据量,提高作业的执行效率。
- combiner仅仅是单个map task的reduce,没有对全部map的输出做reduce。
如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。
注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以,Combine适合于等幂操作,比如累加,最大值等。求平均数不适合
YARN
YARN的核心概念
- ResourceManager(RM):
- 负责集群的资源管理和分配
- 包括两个主要组件:调度器(Scheduler)和应用程序管理器(ApplicationManager)。
- NodeManager(NM):
- 负责集群的资源管理和分配。
- 包括两个主要组件:调度器(Scheduler)和应用程序管理器(ApplicationManager)。
- ApplicationMaster(AM):
- 每个应用程序(如MapReduce作业)都有一个专门的ApplicationMaster,负责申请资源并与NodeManger协调以运行任务。
- Container:
- 资源的抽象表示,包含特定数量的CPU、内存等资源。任务在容器中运行。
yarn工作流程
yarn是如何执行一个mapreduce job的?
首先,Resource Manager会为每一个application(比如一个用户提交的MapReduce Job) 在NodeManager里面申请一个container,然后在该container里面启动一个Application Master。 container在Yarn中是分配资源的容器(内存、cpu、硬盘等),它启动时便会相应启动一个JVM。然后,Application Master便陆续为application包含的每一个task(一个Map task或Reduce task)向Resource Manager申请一个container。等每得到一个container后,便要求该container所属的NodeManager将此container启动,然后就在这个container里面执行相应的task
等这个task执行完后,这个container便会被NodeManager收回,而container所拥有的JVM也相应地被退出。
yarn 调度器
- FIFO Scheduler也叫先进先出调度器,根据业务提交的顺序,排成一个队列,先提交的先执行,并且执行时可以申请整个集群中的资源。逻辑简单,使用方便,但是容易导致其他应用获取资源被阻塞,所以生产过程中很少使用该调度器
- Capacity Scheduler 也称为容量调度器,是Apache默认的调度策略,对于多个部门同时使用一个集群获取计算资源时,可以为每个部门分配一个队列,而每个队列中可以获取到一部分资源,并且在队列内部符合FIFO Scheduler调度规则
- Fair Scheduler 也称为公平调度器,现在是CDH默认的调度策略,公平调度在也可以在多个队列间工作,并且该策略会动态调整每个作业的资源使用情况