Hive中的InputFormat、OutputFormat与SerDe

前言

Hive中,默认使用的是TextInputFormat,一行表示一条记录。在每条记录(一行中),默认使用^A分割各个字段。

在有些时候,我们往往面对多行,结构化的文档,并需要将其导入Hive处理,此时,就需要自定义InputFormat、OutputFormat,以及SerDe了。

首先来理清这三者之间的关系,我们直接引用Hive官方说法:

SerDe is a short name for "Serializer and Deserializer."
Hive uses SerDe (and !FileFormat) to read and write table rows.
HDFS files --> InputFileFormat --> <key, value> --> Deserializer --> Row object
Row object --> Serializer --> <key, value> --> OutputFileFormat --> HDFS files

总结一下,当面临一个HDFS上的文件时,Hive将如下处理(以读为例):

(1) 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
(2) 调用SerDe的Deserializer,将一行(Row),切分为各个字段。

当HIVE执行INSERT操作,将Row写入文件时,主要调用OutputFormat、SerDe的Seriliazer,顺序与读取相反。

本文将对InputFormat、OutputFormat、SerDe自定义,使Hive能够与自定义的文档格式进行交互:

<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>

如上所示,每篇文档用<DOC>和</DOC>分割。文档之中的每行,为key=value的格式。

1、自定义InputFormat

Hive的InputFormat来源于Hadoop中的对应的部分。需要注意的是,其采用了mapred的老接口。

package com.coder4.hive;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class DocFileInputFormat extends TextInputFormat implements
		JobConfigurable {

	@Override
	public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
			JobConf job, Reporter reporter) throws IOException {
		reporter.setStatus(split.toString());
		return new DocRecordReader(job, (FileSplit) split);
	}
}

在本文实现中,我们省略了压缩、解压缩等细节,如果需要,可以参考Hadoop官方的实现。

在上述的InputFormat中,只是简单的实现了接口。对文档进行切分的业务逻辑,在DocRecordReader中完成。

package com.coder4.hive;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;

public class DocRecordReader implements RecordReader<LongWritable, Text> {

	// Reader
	private LineRecordReader reader;
	// The current line_num and lin
	private LongWritable lineKey = null;
	private Text lineValue = null;
	// Doc related
	private StringBuilder sb = new StringBuilder();
	private boolean inDoc = false;
	private final String DOC_START = "<DOC>";
	private final String DOC_END = "</DOC>";

	public DocRecordReader(JobConf job, FileSplit split) throws IOException {
		reader = new LineRecordReader(job, split);
		lineKey = reader.createKey();
		lineValue = reader.createValue();
	}

	@Override
	public void close() throws IOException {
		reader.close();
	}

	@Override
	public boolean next(LongWritable key, Text value) throws IOException {
		while (true) {
			// get current line
			if (!reader.next(lineKey, lineValue)) {
				break;
			}
			if (!inDoc) {
				// not in doc, check if <doc>
				if (lineValue.toString().startsWith(DOC_START)) {
					// reset doc status
					inDoc = true;
					// clean buff
					sb.delete(0, sb.length());
				}
			} else {
				// indoc, check if </doc>
				if (lineValue.toString().startsWith(DOC_END)) {
					// reset doc status
					inDoc = false;
					// set kv and return
					key.set(key.get() + 1);
					value.set(sb.toString());
					return true;
				} else {
					if (sb.length() != 0) {
						sb.append("\n");
					}
					sb.append(lineValue.toString());
				}
			}
		}
		return false;
	}

	@Override
	public float getProgress() throws IOException {
		return reader.getProgress();
	}

	@Override
	public LongWritable createKey() {
		return new LongWritable(0);
	}

	@Override
	public Text createValue() {
		return new Text("");
	}

	@Override
	public long getPos() throws IOException {
		return reader.getPos();
	}

}

如上的代码中,使用了LineRecordReader,用于读取Split的每一行。为了节省内存,这里对lineValue、lineKey进行了复用。

2、自定义OutputFormat

OutputFormat负责写入,这里要注意的是,不能再照抄Hadoop的对应接口了,需要实现HiveOutputFormat。

package com.coder4.hive;

import java.io.IOException;
import java.util.Properties;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;

@SuppressWarnings({ "rawtypes" })
public class DocFileOutputFormat<K extends WritableComparable, V extends Writable>
		extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {

	public RecordWriter getHiveRecordWriter(JobConf job, Path outPath,
			Class<? extends Writable> valueClass, boolean isCompressed,
			Properties tableProperties, Progressable progress)
			throws IOException {
		FileSystem fs = outPath.getFileSystem(job);
		FSDataOutputStream out = fs.create(outPath);

		return new DocRecordWriter(out);
	}
}

类似的,业务逻辑在如下的RecordWriter中:

package com.coder4.hive;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;

public class DocRecordWriter implements RecordWriter {

	private FSDataOutputStream out;
	private final String DOC_START = "<DOC>";
	private final String DOC_END = "</DOC>";

	public DocRecordWriter(FSDataOutputStream o) {
		this.out = o;
	}

	@Override
	public void close(boolean abort) throws IOException {
		out.flush();
		out.close();
	}

	@Override
	public void write(Writable wr) throws IOException {
		write(DOC_START);
		write("\n");
		write(wr.toString());
		write("\n");
		write(DOC_END);
		write("\n");
	}

	private void write(String str) throws IOException {
		out.write(str.getBytes(), 0, str.length());
	}

}

3、自定义SerDe or UDF?

在自定义InputFormat、OutputFomat后,我们已经将Split拆分为了 多个Row(文档)。

接下来,我们需要将Row拆分为Field。此时,我们有两个技术选择:

(1) 写一个UDF,将Row拆分为kv对,以Map<K, V>返回。此时,Table中只需定义一个STRING类型变量即可。
(2) 实现SerDe,将Row直接转化为Table对应的字段。

先来看一下UDF的这种方法,在Json解析等字段名不确定(或要经常变更) 的 应用场景下,这种方法还是比较适用的。

package com.coder4.hive;

import java.util.Map;
import org.apache.hadoop.hive.ql.exec.UDF;

public class DocToMap extends UDF {
	public Map<String, String> evaluate(String s) {
		return Doc.deserialize(s);
	}
}

其中Doc的deserilize只是自定义方法,无需重载方法或继承接口。

使用时的方法为:

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  doc STRING
)
STORED AS
  INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
  OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'
;

add jar /xxxxxxxx/hive-test.jar;

CREATE TEMPORARY FUNCTION doc_to_map AS 'com.coder4.hive.DocToMap';

SELECT
    raw['id'],
    raw['name']
FROM
(
    SELECT 
        doc_to_map(doc) raw
    FROM
        test_table
) t;

4、自定义SerDe

如果选择自定义SerDe,实现起来要略微麻烦一点。

这里主要参考了一篇Blog,和官方的源代码

http://svn.apache.org/repos/asf/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java

http://blog.cloudera.com/blog/2012/12/how-to-use-a-serde-in-apache-hive/

package com.coder4.hive;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

public class MySerDe extends AbstractSerDe {

	// params
	private List<String> columnNames = null;
	private List<TypeInfo> columnTypes = null;
	private ObjectInspector objectInspector = null;
	// seperator
	private String nullString = null;
	private String lineSep = null;
	private String kvSep = null;

	@Override
	public void initialize(Configuration conf, Properties tbl)
			throws SerDeException {
		// Read sep
		lineSep = "\n";
		kvSep = "=";
		nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, "");

		// Read Column Names
		String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS);
		if (columnNameProp != null && columnNameProp.length() > 0) {
			columnNames = Arrays.asList(columnNameProp.split(","));
		} else {
			columnNames = new ArrayList<String>();
		}

		// Read Column Types
		String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
		// default all string
		if (columnTypeProp == null) {
			String[] types = new String[columnNames.size()];
			Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME);
			columnTypeProp = StringUtils.join(types, ":");
		}
		columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp);

		// Check column and types equals
		if (columnTypes.size() != columnNames.size()) {
			throw new SerDeException("len(columnNames) != len(columntTypes)");
		}

		// Create ObjectInspectors from the type information for each column
		List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>();
		ObjectInspector oi;
		for (int c = 0; c < columnNames.size(); c++) {
			oi = TypeInfoUtils
					.getStandardJavaObjectInspectorFromTypeInfo(columnTypes
							.get(c));
			columnOIs.add(oi);
		}
		objectInspector = ObjectInspectorFactory
				.getStandardStructObjectInspector(columnNames, columnOIs);

	}

	@Override
	public Object deserialize(Writable wr) throws SerDeException {
		// Split to kv pair
		if (wr == null)
			return null;
		Map<String, String> kvMap = new HashMap<String, String>();
		Text text = (Text) wr;
		for (String kv : text.toString().split(lineSep)) {
			String[] pair = kv.split(kvSep);
			if (pair.length == 2) {
				kvMap.put(pair[0], pair[1]);
			}
		}

		// Set according to col_names and col_types
		ArrayList<Object> row = new ArrayList<Object>();
		String colName = null;
		TypeInfo type_info = null;
		Object obj = null;
		for (int i = 0; i < columnNames.size(); i++) {
			colName = columnNames.get(i);
			type_info = columnTypes.get(i);
			obj = null;
			if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) {
				PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info;
				switch (p_type_info.getPrimitiveCategory()) {
				case STRING:
					obj = StringUtils.defaultString(kvMap.get(colName), "");
					break;
				case LONG:
				case INT:
					try {
						obj = Long.parseLong(kvMap.get(colName));
					} catch (Exception e) {
					}
				}
			}
			row.add(obj);
		}

		return row;
	}

	@Override
	public ObjectInspector getObjectInspector() throws SerDeException {
		return objectInspector;
	}

	@Override
	public SerDeStats getSerDeStats() {
		// Not suppourt yet
		return null;
	}

	@Override
	public Class<? extends Writable> getSerializedClass() {
		// Not suppourt yet
		return Text.class;
	}

	@Override
	public Writable serialize(Object arg0, ObjectInspector arg1)
			throws SerDeException {
		// Not suppourt yet
		return null;
	}

}

最终的Hive定义为:

add jar /xxxxxxxx/hive-test.jar;

CREATE EXTERNAL TABLE IF NOT EXISTS test_table
(
  id BIGINT,
  name STRING
)
ROW FORMAT SERDE 'com.coder4.hive.MySerDe'
STORED AS
  INPUTFORMAT 'com.coder4.hive.DocFileInputFormat'
  OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat'
LOCATION '/user/heyuan.lhy/doc/'

我们自定义的SerDe,会将每一个<DOC>内的文档,根据k=v切分,若key name为id,name,则将其置入对应的字段中。

5、测试,效果:

首先,我们在hdfs目录/user/heyuan.lhy/doc/ 放置了一个文件,内容如下:

<DOC>
id=1
name=a
</DOC>
<DOC>
id=2
name=b
</DOC>
<DOC>
id=3
name=c
</DOC>
<DOC>
id=4
name=d
</DOC>

在如4中,定义了表的schema后,我们来SELECT。

SELECT * FROM test_table;
OK
1       a
2       b
3       c
4       d

可以看到,id和name字段被分别解析出来了。

由于我们的SerDe没有实现serialize方法,因此无法实现写入。

如果有需要,可以使用UDF + Map的方法,完成。

 

One thought on “Hive中的InputFormat、OutputFormat与SerDe

Leave a Reply

Your email address will not be published. Required fields are marked *