在系统中,当需要处理的数据非常大时,数据出现损坏的可能性会大很多。因此需要进行数据完整性检查;
Hadoop 采用 RPC 来实现进程间的通信,采用的是 Writable 序列化机制。序列化有两个目的:进程间通信与数据持久性存储。
在传输之前,可以对文件先进行压缩,这样做有两大好处,这俩好处也对于处理大量数据十分重要:减少存储空间与加速数据在网络和磁盘上的传输。
对于某些应用,往往需要一种特殊的数据结构来存储数据,所以 Hadoop 为此开发了更高层次的容器:SequenceFile 和 Mapfile.
一、数据完整性
一般情况下,可用通过验证校验和的方式来检查数据的完整性。
校验和(checksum)是指再数据处理和数据通信领域用于校验一组终端数据项的和。
数据在传输之前会生成一个校验和,当传输过去后会再次计算校验和,若不一致,则判定数据已损坏。
另外,文件系统重启时也会计算校验和。
1. HDFS 的数据完整性
除了在写入和读取时会计算验证校验和来验证数据的完整性外,HDFS 还会定期计算 block 的校验和以验证数据完整性。
1) 向 HDFS 中写入数据时的验证
Client 向 HDFS 发送写入请求,最后一个接收 block 副本的 DataNode 会计算校验和,并与 Client 发送的校验和进行比较,相同存入并返回信息,错误就会给 Client 抛出一个 IOException 错误。
此外,DataNode 复制其他数据节点的数据时,也会通过比较校验和的方式对数据进行验证。
2) 从 HDFS 中读取数据时的验证
每个数据节点都会将每次验证后的校验和与校验和的更新时间持久保存到某个日志中。
Client 从数据节点读取数据时,会将数据放置在 Client 的缓存中,然后重新计算校验和并于数据节点所存储的最新校验和进行比较。
3) DataNode 后台守护进程定期检测
解决的是物理存储媒介损坏的问题,每个数据节点都会在后台运行一个 DataBlockScanner 进程,该进程定期检查数据节点上的块,以便在 Client 读取损坏的块前将其及时检测并修复。
此外,当发现某块副本的校验和和日志中的数据不一致,数据节点就会将其标注为已损坏。当数据节点向元数据节点发送心跳时,元数据节点就会让数据节点处理损坏的数据块。
2. 验证数据完整性
为了验证,用户可用去编写程序调用客户端校验类和校验和文件系统类。
1) 客户端校验(LocalFileSystem)类
例如,当调用该类来写入一个“a.txt”的文件时,文件系统会自动创建一个“.a.txt.crc”的隐藏文件。
2) 校验和文件系统(ChecksumFileSystem)类
二、序列化与反序列化
Java 序列化会将 Java 对象转换为字节序列的过程,Java 反序列化是将字节序列恢复到 Java 对象的过程。
例如,将 Java 图片对象序列化,通过网络进行传输,最后将其反序列化以获得需要的对象。
1. 序列化
简单来讲,序列化就是将对象转化为便于传输的格式,例如二进制、字节数组、json、xml等。Hadoop 使用的时 Writable。
1) Writable 接口
Writable 接口针对 DataOutput 和 DataInput 定义了两个方法:write() 和 readFields() 。分别实现对数据的序列化和反序列化。
2) Writable 类
3) 自定义 Writable 类
有时候,还是需要自己构建新类滴,因此,实现自定义的 Writable 也是十分有必要滴,下方代码构建了一个 ListWritable 类(Writable 集合类中没有提供),用于实现 Writable 接口从而达到 List 集合的效果。
2. 反序列化
简单来讲,是对应的,是序列化的逆过程。
三、数据压缩
Hadoop 的数据量过大,有必要对文件进行压缩,减少文件存储所使用的存储空间,也可以加快数据在网络和磁盘上的传输速度。
1. 压缩与解压缩方法 Codec
1) CompressionCodec 接口
CompressionCodec 接口定义了数据的压缩和解压缩方法,分别是 createOutputStream() 和 createInputStream()。
使用 createOutputStream() 方法,可以对未压缩的数据新建一个 CompressionOutputStream 对象,对输出流的数据进行压缩。
使用 createInputStream() 方法,可以对输入流读取的数据进行解压缩,解压缩的数据新构建一个 CompressionInputStream 对象。
在 Eclipse 中运行 Compressor() 测试方法,可以在本地系统的指定位置(如“E:/data”)生成一个压缩文件(如“c.txt.gz”);
运行 UnCompressor() 测试方法,下方的“Console”窗口中会输出将压缩文件解压缩后所得的文件内容。
2) CompressionCodecFactory 类
当读取一个压缩文件时,CompressionCodecFactory 类可以根据文件扩展名来推断使用哪个 Codec 算法。
利用 CompressionCodecFactory 提供的 getCodec() 方法,可以接收一个 Path 对象,并将文件扩展名映射到相应的 Codec 方法中。
在 Eclipse 中运行 FileDecompressor() 测试方法,可以在与压缩文件(即 hdfs://hadoop00:9000/mywork/test/c.txt.gz)相同的目录下得到一个解压缩后的文件。
2. 压缩与输入分片
Hadoop 的数据量过大,压缩后是否支持切分也是很有必要的。例如,一个 2GB 的文件切分成 16 个块(默认每个块128MB),如果该文件作为 MapReduce 的作业输入,将会创建 16 个分片,每个分片启动一个 Map任务单独处理分片数据。
假设压缩后的文件大小为 1.6GB,那么块个数就会降低到 13 个,此时 Map 任务也会减少,由此可用提高作业效率。
当然压缩格式需要支持切分功能(如 bzip2),对于不支持切分的,emmm。
四、Hadoop 文件的数据结构
其中,SequenceFile 和 MapFile 较典型,MapFile 类型文件是排序后带有索引的 SequenceFile 文件。
1. SequemceFile
是序列化后的二进制文件,以键值对的方式记录数据。
在 Eclipse 中运行 SequemceFileWriteDemo() 测试方法,可以将数据写入到 SequemceFile 文件(即 hdfs://hadoop00:9000/mywork/test/sequencedemo.seq)。
在 虚拟机中,通过执行下行命令可以查看该文件内容:hdfs dfs -text /mywork/test/sequencedemo.seq
在 Eclipse 中运行 SequemceFileReadDemo() 测试方法,可以从 SequemceFile 文件中读取数据,下方的“Console”窗口中会输出日志。
2. MapFile
在 Eclipse 中运行 MapFileWriteDemo() 测试方法,可以将数据写入到 MapFile 文件(即 hdfs://hadoop00:9000/mywork/test/mapfiledemo.map)。
在虚拟机中,通过执行下行命令可以分别查看该文件的 index 和 data 内容:hdfs dfs -text /mywork/test/mapfiledemo.map/index
和 hdfs dfs -text /mywork/test/mapfiledemo.map/data
。
在 Eclipse 中运行 MapFileReadDemo() 测试方法,可以从 MapFile 文件中读取数据,此时下方的“Console”窗口中会输出全部文件内容及 key 值为 10 的内容。
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~