大爱

Python异步并发模块concurrent.futures简析

2017-01-17

本文主要介绍python异步并发模块concurrent.futures。它非常简单易用,主要用来实现多线程和多进程的异步并发。

1. 模块安装

  1. python 3.x中自带了concurrent.futures模块

  2. python 2.7需要安装futures模块,使用命令pip install futures安装即可

pypi地址:https://pypi.python.org/pypi/futures/

2. Executor对象

class concurrent.futures.Executor

Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。

2.1 Executor.submit

函数原型:Executor.submit(fn, *args, **kwargs)

其中:

fn:需要异步执行的函数

*args, **kwargs:fn参数

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
#-*- coding:utf-8 -*-
from concurrent import futures

def test(num):
import time
return time.ctime(),num

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test,1)
print future.result()

>>>
('Tue Jan 17 15:23:10 2017', 1)

2.2 Executor.map(func, *iterables, timeout=None)

相当于map(func, *iterables),但是func是异步执行。timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。

func:需要异步执行的函数

*iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。

timeout:设置每次异步操作的超时时间

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#-*- coding:utf-8 -*-
from concurrent import futures

def test(num):
import time
return time.ctime(),num

data=[1,2,3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test,data):
print future

>>>
('Tue Jan 17 15:23:47 2017', 1)
('Tue Jan 17 15:23:47 2017', 2)
('Tue Jan 17 15:23:47 2017', 3)

2.3 Executor.shutdown(wait=True)

释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。

3. ThreadPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用.

class concurrent.futures.ThreadPoolExecutor(max_workers)

使用max_workers数目的线程池执行异步调用

4. ProcessPoolExecutor对象

ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用.

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#-*- coding:utf-8 -*-
from concurrent import futures

def test(num):
import time
return time.ctime(),num

def muti_exec(m,n):
#m 并发次数
#n 运行次数

with futures.ProcessPoolExecutor(max_workers=m) as executor: #多进程
#with futures.ThreadPoolExecutor(max_workers=m) as executor: #多线程
executor_dict=dict((executor.submit(test,times), times) for times in range(m*n))

for future in futures.as_completed(executor_dict):
times = executor_dict[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (times,future.exception()))
else:
print('RunTimes:%d,Res:%s'% (times, future.result()))

if __name__ == '__main__':
muti_exec(5,1)

>>>
RunTimes:0,Res:('Tue Jan 17 15:56:53 2017', 0)
RunTimes:4,Res:('Tue Jan 17 15:56:53 2017', 4)
RunTimes:3,Res:('Tue Jan 17 15:56:53 2017', 3)
RunTimes:1,Res:('Tue Jan 17 15:56:53 2017', 1)
RunTimes:2,Res:('Tue Jan 17 15:56:53 2017', 2)

建议使用多进程并发而不是多线程并发,理由如下。

5. Python GIL相关

要理解GIL的含义,我们需要从Python的基础讲起。像C++这样的语言是编译型语言,所谓编译型语言,是指程序输入到编译器,编译器再根据语言的语 法进行解析,然后翻译成语言独立的中间表示,最终链接成具有高度优化的机器码的可执行程序。编译器之所以可以深层次的对代码进行优化,是因为它可以看到整 个程序(或者一大块独立的部分)。这使得它可以对不同的语言指令之间的交互进行推理,从而给出更有效的优化手段。

与此相反,Python是解释型语言。程序被输入到解释器来运行。解释器在程序执行之前对其并不了解;它所知道的只是Python的规则,以及在执行过程 中怎样去动态的应用这些规则。它也有一些优化,但是这基本上只是另一个级别的优化。由于解释器没法很好的对程序进行推导,Python的大部分优化其实是 解释器自身的优化。

现在我们来看一下问题的症结所在。要想利用多核系统,Python必须支持多线程运行。作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。

那么,不同线程同时访问时,数据的保护机制是怎样的呢?答案是解释器全局锁。从名字上看能告诉我们很多东西,很显然,这是一个加在解释器上的全局(从解释器的角度看)锁(从互斥或者类似角度看)。这种方式当然很安全,但是它有一层隐含的意思(Python初学者需要了解这个):对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。

”为什么我全新的多线程Python程序运行得比其只有一个线程的时候还要慢?“许多人在问这个问题时还是非常犯晕的,因为显然一个具有两个线程的程序要比其只有一个线程时要快(假设该程序确实是可并行的)。事实上,这个问题被问得如此频繁以至于Python的专家们精心制作了一个标准答案:”不要使用多线程,请使用多进程”。

所以,对于计算密集型的,我还是建议不要使用python的多线程而是使用多进程方式,而对于IO密集型的,还是劝你使用多进程方式,因为使用多线程方式出了问题,最后都不知道问题出在了哪里,这是多么让人沮丧的一件事情!

6. 参考文档

http://pythonhosted.org/futures/

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

扫描二维码,分享此文章