Hadoop: The Definitive Guide 第四章 Hadoop I/O

1、为了HDFS中的保证数据完整性,Hadoop使用Checksum的方法,每io.bytes.per.checksum个字节计算一个CRC-32的CheckSum。默认是512字节,生成4字节的checksum,所以在空间开销上

2、Hadoop/HDFS支持压缩,当数据规模很大的时候,不仅可以节省空间,还可以减少网络I/O等的消耗。

3、在支持的压缩算法中,压缩比:bzip2 > gzip > lzo,速度:lzo > gzip > bzip2。

解码器是使用上述压缩算法的接口,
gzip 对应的codec是 org.apache.hadoop.io.compress.GzipCodec
bzip2 对应的codec是 org.apache.hadoop.io.compress.BZip2Codec

用法:
(1)直接用Codec.createOutputStream()
或者
(2)CompressionCodecFactory.getCodec(Path p),Path的后缀决定了工厂类返回的Codec。

4、在Hadoop中的配置支持的Codec:

io.compression.codecs 决定了采用哪种压缩。
使用Native代码可以大幅提升Hadoop中压缩/解压缩的性能!
见:http://wiki.apache.org/hadoop/NativeHadoop

然而,bzip和lzo都是不支持split的,所以用在hdfs中会非常影响性能,至于bzip2是可以用的。因此,一般都会用SequenceFile(它支持压缩)。

5、在Map/Reduce任务中设置压缩

设置Reduce结果的压缩:

Configuration conf = new COnfiguration();
conf.setBoolean("mapred.output.compress", true);
conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);

设置map阶段输出为压缩

mapred.compress.map.output -> true
mapred.map.output.compression.codec -> GzipCodec.class

但是这些设置压缩的API已经被废除,建议用SequenceFile?

6、序列化,如果需要自定义输出格式,需要自己实现Writable接口,能对性能有很大提升。

7、SequenceFile:实际就是Key-Value对的文件存储格式。
Key是任意的Writable
Value是任意的Writable
我们可以实现将许多小文件转化为SequenceFile,以方便Map/Reduce处理。
实际上,现在Hadoop处理时,都会将数据转为SequenceFile格式,无论是性能还是压缩上的考量。

8、写SequrenceFile
要写SQ文件,关键的是要获得SequrenceFile.Write,重载的方法很多,看一个最基本的:
Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass)
具体key-value写时用append()方法。

fs不解释了; conf是配置,默认空也行; path是存放的位置; keyClass是key的Class, valClass是value的Class。

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class SeqWrite {

	private static final String[] data = { "a,b,c,d,e,f,g", "h,i,j,k,l,m,n",
			"o,p,q,r,s,t", "u,v,w,x,y,z", "0,1,2,3,4", "5,6,7,8,9," };

	// Write an Sequrence File
	public static void main(String[] args) throws Exception {

		// Configuration
		Configuration conf = new Configuration();

		// HDFS File Sytem
		FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),
				conf);

		// Seq File Path
		Path path = new Path("test.seq");

		// Open Seq Writer and write all key-values
		SequenceFile.Writer writer = null;
		IntWritable key = new IntWritable();
		Text value = new Text();
		try {
			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
					value.getClass());
			for (int i = 0; i < 10000; i++) {
				key.set(i);
				value.set(SeqWrite.data[i % SeqWrite.data.length]);
				writer.append(key, value);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(writer);
		}
	}
}

9、读SequenceFile

与写不太相同,直接new SequenceFile.Reader()就可以了,然后反复调用next()方法。

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SeqRead {

	public static void main(String[] args) throws Exception {
		// Configuration
		Configuration conf = new Configuration();

		// HDFS File Sytem
		FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),
				conf);

		// Seq File Path
		Path path = new Path("test.seq");

		// Read SequenceFile
		SequenceFile.Reader reader = null;
		try {
			// Get Reader
			reader = new SequenceFile.Reader(fs, path, conf);
			// Get Key/Value Class
			Writable key = (Writable) ReflectionUtils.newInstance(
					reader.getKeyClass(), conf);
			Writable value = (Writable) ReflectionUtils.newInstance(
					reader.getValueClass(), conf);
			// Read each key/value
			while (reader.next(key, value)) {
				System.out.println(key + "\t" + value);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(reader);
		}

	}
}

结果(部分):

9989	5,6,7,8,9,
9990	a,b,c,d,e,f,g
9991	h,i,j,k,l,m,n
9992	o,p,q,r,s,t
9993	u,v,w,x,y,z
9994	0,1,2,3,4
9995	5,6,7,8,9,
9996	a,b,c,d,e,f,g
9997	h,i,j,k,l,m,n
9998	o,p,q,r,s,t
9999	u,v,w,x,y,z

至此,我们已经可以读、写SequenceFile了。

10、通过命令行读取seq文件。

其实我们刚才的Read,在Hadoop的fs命令中已经实现啦!

./bin/hadoop fs -text test.seq |head
#结果(部分)
0       a,b,c,d,e,f,g
1       h,i,j,k,l,m,n
2       o,p,q,r,s,t
3       u,v,w,x,y,z
4       0,1,2,3,4
5       5,6,7,8,9,
6       a,b,c,d,e,f,g
7       h,i,j,k,l,m,n
8       o,p,q,r,s,t
9       u,v,w,x,y,z

11、SequenceFile提供了SequenceFile.Sorter,用于自定义排序?

12、SequenceFile的压缩。

写:压缩分为Record和Block两种,前者只压缩Value,后者key、value都压缩。
强烈提醒:如果再Seq文件上用压缩,必须安装Hadoop-Native(0.20.203后,默认都配置好了)。
需要注意的是,请使用bin/hadoop来运行!

./bin/hadoop SeqWrite2
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;

//An Compressed version to wrie Seq File
public class SeqWrite2 {

	private static final String[] data = { "a,b,c,d,e,f,g", "h,i,j,k,l,m,n",
			"o,p,q,r,s,t", "u,v,w,x,y,z", "0,1,2,3,4", "5,6,7,8,9," };

	// Write an Sequrence File
	public static void main(String[] args) throws Exception {

		// Configuration
		Configuration conf = new Configuration();

		// HDFS File Sytem
		FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"),
				conf);

		// Seq File Path
		Path path = new Path("test2.seq");

		// Open Seq Writer and write all key-values
		SequenceFile.Writer writer = null;
		IntWritable key = new IntWritable();
		Text value = new Text();
		try {
			// Get writer with compress
			writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
					value.getClass(), SequenceFile.CompressionType.RECORD,
					new GzipCodec());
			for (int i = 0; i < 10000; i++) {
				key.set(i);
				value.set(SeqWrite2.data[i % SeqWrite2.data.length]);
				writer.append(key, value);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(writer);
		}
	}
}

压缩存储完毕后,是435.68,原来是317.34,比原来还大?
可能是由于value太小了。
此外,还可以Block策略而非Record来压缩。

读取压缩,和原来读取一样,无需设置压缩。

13、MapFile,允许通过key直接访问的SequenceFile,可以理解为java.util.Map。

14、MapFile的用法和SequenceFile基本一致,MapFile就是一个sort的SequenceFile。
注意读MapFile时支持直接用Key来读取。

Leave a Reply

Your email address will not be published. Required fields are marked *