标题虽然说是线程,其实gevent用的是“greenlet”,可能翻译成"微线程"更合适一些。
1、线程池
import time
import gevent
from gevent.threadpool import ThreadPool
def my_func(text, num):
print text, num
pool = ThreadPool(100)
start = time.time()
for i in xrange(100000):
pool.spawn(my_func, "Hello", i)
pool.join()
delay = time.time() - start
print('Take %.3f seconds' % delay)
2、 一生产者多消费者
import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import JoinableQueue
from gevent.queue import Empty
q = JoinableQueue()
def consumer():
while True:
item = q.get()
try:
print "consume", item
finally:
q.task_done()
# Start
start = time.time()
# Start consumers
jobs = [gevent.spawn(consumer) for i in xrange(20)]
# Make something to be consume
for i in xrange(100000):
#time.sleep(0.1)
q.put(i)
q.join()
#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)
3、多生产者(用Pool),1消费者(单独线程)
这个略反常规,如果我们假设生产者是I/O密集的Job,由Pool中的Job产生。而消费者只有1个。
用了kill,写的不太优雅,各路大神可以给提提意见。
import time
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty
q = Queue()
def consumer():
while True:
item = q.get(0.1)
try:
print "consume", item
except Empty:
pass
def producer(n):
for i in xrange(10):
#time.sleep(0.1)
q.put(1000*n+i)
# Start
start = time.time()
# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
pool.spawn(producer, i)
job = gevent.spawn(consumer)
pool.join()
job.kill()
#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)
Update下,我们可以稍微改进下:当pool所有job都完成后,给queue中加入一个特殊的"StopIteration" ,然后来让consumer退出:-)
import time
import random
import gevent
from gevent.threadpool import ThreadPool
from gevent.queue import Queue
from gevent.queue import Empty
q = Queue()
def consumer():
while True:
#item = q.get(0.1)
item = q.get()
if item == StopIteration:
break
try:
print "consume", item
except StopIteration:
break
except Empty:
pass
def producer(n):
for i in xrange(10):
#time.sleep(random.random())
q.put(1000*n+i)
# Start
start = time.time()
# Start producer using pool
pool = ThreadPool(10)
for i in xrange(100):
pool.spawn(producer, i)
job = gevent.spawn(consumer)
pool.join()
q.put(StopIteration)
job.join()
#gevent.joinall(jobs)
delay = time.time() - start
print('Take %.3f seconds' % delay)
你好,在什么场景下会用threadpool ?
对于这用意,不是太懂,能稍微讲解下么?