gevent中与"线程"相关的几个例子

标题虽然说是线程,其实gevent用的是“greenlet”,可能翻译成"微线程"更合适一些。

1、线程池

import time
import gevent
from gevent.threadpool import ThreadPool

def my_func(text, num):
    print text, num

pool = ThreadPool(100)
start = time.time()
for i in xrange(100000):
    pool.spawn(my_func, "Hello", i)
pool.join()
delay = time.time() - start
print('Take %.3f seconds' % delay)

2、 一生产者多消费者

import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import JoinableQueue
from gevent.queue import Empty

q = JoinableQueue()

def consumer():
    while True:
        item = q.get()
        try:
            print "consume", item
        finally:
            q.task_done()

# Start
start = time.time()

# Start consumers
jobs = [gevent.spawn(consumer) for i in xrange(20)]

# Make something to be consume
for i in xrange(100000):
    #time.sleep(0.1)
    q.put(i)
q.join()

#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)

3、多生产者(用Pool),1消费者(单独线程)

这个略反常规,如果我们假设生产者是I/O密集的Job,由Pool中的Job产生。而消费者只有1个。

用了kill,写的不太优雅,各路大神可以给提提意见。

import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty

q = Queue()

def consumer():
    while True:
        item = q.get(0.1)
        try:
            print "consume", item
        except Empty:
            pass

def producer(n):
    for i in xrange(10):
        #time.sleep(0.1)
        q.put(1000*n+i)

# Start
start = time.time()

# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
    pool.spawn(producer, i)

job = gevent.spawn(consumer)

pool.join()
job.kill()

#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)

Update下,我们可以稍微改进下:当pool所有job都完成后,给queue中加入一个特殊的"StopIteration" ,然后来让consumer退出:-)

import time
import random
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty

q = Queue()

def consumer():
    while True:
        #item = q.get(0.1)
        item = q.get()
        if item == StopIteration:
            break
        try:
            print "consume", item
        except StopIteration:
            break
        except Empty:
            pass

def producer(n):
    for i in xrange(10):
        #time.sleep(random.random())
        q.put(1000*n+i)

# Start
start = time.time()

# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
    pool.spawn(producer, i)

job = gevent.spawn(consumer)

pool.join()
q.put(StopIteration)
job.join()

#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)

 

 

 

 

 

 

 

 

One thought on “gevent中与"线程"相关的几个例子

Leave a Reply

Your email address will not be published. Required fields are marked *