标题虽然说是线程,其实gevent用的是“greenlet”,可能翻译成"微线程"更合适一些。
1、线程池
import time import gevent from gevent.threadpool import ThreadPool def my_func(text, num): print text, num pool = ThreadPool(100) start = time.time() for i in xrange(100000): pool.spawn(my_func, "Hello", i) pool.join() delay = time.time() - start print('Take %.3f seconds' % delay)
2、 一生产者多消费者
import time import gevent from gevent.threadpool import ThreadPool from gevent.queue import JoinableQueue from gevent.queue import Empty q = JoinableQueue() def consumer(): while True: item = q.get() try: print "consume", item finally: q.task_done() # Start start = time.time() # Start consumers jobs = [gevent.spawn(consumer) for i in xrange(20)] # Make something to be consume for i in xrange(100000): #time.sleep(0.1) q.put(i) q.join() #gevent.joinall(jobs) delay = time.time() - start print('Take %.3f seconds' % delay)
3、多生产者(用Pool),1消费者(单独线程)
这个略反常规,如果我们假设生产者是I/O密集的Job,由Pool中的Job产生。而消费者只有1个。
用了kill,写的不太优雅,各路大神可以给提提意见。
import time import gevent from gevent.threadpool import ThreadPool from gevent.queue import Queue from gevent.queue import Empty q = Queue() def consumer(): while True: item = q.get(0.1) try: print "consume", item except Empty: pass def producer(n): for i in xrange(10): #time.sleep(0.1) q.put(1000*n+i) # Start start = time.time() # Start producer using pool pool = ThreadPool(10) for i in xrange(100): pool.spawn(producer, i) job = gevent.spawn(consumer) pool.join() job.kill() #gevent.joinall(jobs) delay = time.time() - start print('Take %.3f seconds' % delay)
Update下,我们可以稍微改进下:当pool所有job都完成后,给queue中加入一个特殊的"StopIteration" ,然后来让consumer退出:-)
import time import random import gevent from gevent.threadpool import ThreadPool from gevent.queue import Queue from gevent.queue import Empty q = Queue() def consumer(): while True: #item = q.get(0.1) item = q.get() if item == StopIteration: break try: print "consume", item except StopIteration: break except Empty: pass def producer(n): for i in xrange(10): #time.sleep(random.random()) q.put(1000*n+i) # Start start = time.time() # Start producer using pool pool = ThreadPool(10) for i in xrange(100): pool.spawn(producer, i) job = gevent.spawn(consumer) pool.join() q.put(StopIteration) job.join() #gevent.joinall(jobs) delay = time.time() - start print('Take %.3f seconds' % delay)
你好,在什么场景下会用threadpool ?
对于这用意,不是太懂,能稍微讲解下么?