Hadoop Streaming Python的一个简单Wrapper

Map比较简单,不贴了。

class Reduce:

    KV_SEP = "\t"

    def __init__(self):
        self.last_key = None
        self.value_list = []

    def reduce(self, key, value_list):
        pass

    def processLine(self, key, value):
        # First time
        if self.last_key is None:
            self.last_key = key 
        # Next key
        if self.last_key != key:
            # Should emit key, [val1, val2...]
            for str in self.reduce(self.last_key, self.value_list):
                yield str 
            del self.value_list[:]
            self.last_key = key 
        else:
            self.value_list.append(value)

    def work(self):
        # Read all lines
        key = ""
        value = ""
        while True:
            # Input
            line = sys.stdin.readline()
            if not line:
                break
            arr = line.split(Reduce.KV_SEP, 1)
            if len(arr) != 2:
                continue
            # Reduce
            key, value = arr 
            for str in self.processLine(key, value):
                sys.stdout.write(str)
        # last group
        for str in self.processLine(None, value):
            sys.stdout.write(str)

 

Leave a Reply

Your email address will not be published.