Python多进程
Python实现多进程的方式主要有两种,一种方式是使用os模块中的fork
方法,另一种方法是使用multiprocessing
模块。这两种方法的区别在于前者仅适用于Unix/Linux操作系统,对Windows不支持,后者则是跨平台的实现方式。由于现在很多爬虫程序都是运行在Unix/Linux操作系统上,所以两种方式都需要了解一下。
一、使用os模块中的fork方式实现多进程
Python的os模块封装了常见的系统调用,其中就有fork方法。fork方法来自于Unix/Linux操作系统中提供的一个fork系统调用,这个方法非常特殊:普通的方法都是调用一次,返回一次;而fork方法是调用一次,返回两次。原因在于操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同(可以理解为fork之后的代码进行拷贝);于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的pid(进程ID)。
示例:
再次提醒,fork只用于Linux/Unix中,这里在Linux/Unix中执行:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/4/7 15:19
# @Author : One Fine
# @File : fork_demo.py
import os
import time
pid = os.fork()
print("onefine")
if pid == 0:
print('子进程{0}, 父进程{1}'.format(os.getpgid(), os.getppid()))
else:
print('我是父进程{0}'.format(pid))
time.sleep(2)
运行:
[[email protected] TEST_TEMP]# python3 fork_demo.py
onefine
我是父进程16752
onefine
Traceback (most recent call last):
File "fork_demo.py", line 7, in <module>
print('子进程{0}, 父进程{1}'.format(os.getpgid(), os.getppid()))
TypeError: getpgid() missing required argument 'pid' (pos 1)
1 修改:
if pid == 0:
print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
print('我是父进程{0}'.format(pid))
再次重新运行:
[[email protected] TEST_TEMP]# python3 fork_demo.py
onefine
我是父进程16797
onefine
子进程16796, 父进程16796
[[email protected] TEST_TEMP]#
2 修改:
print("onefine")
pid = os.fork()
if pid == 0:
print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
print('我是父进程{0}'.format(pid))
time.sleep(2)
运行:
[[email protected] TEST_TEMP]# python3 fork_demo.py
onefine
我是父进程16848
子进程16847, 父进程16847
[[email protected] TEST_TEMP]#
3 修改:
print("onefine")
pid = os.fork()
if pid == 0:
print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
print('我是父进程{0}'.format(pid))
执行:
[[email protected] TEST_TEMP]# python3 fork_demo.py
onefine
我是父进程16860
子进程16859, 父进程16859
[[email protected] TEST_TEMP]#
4 修改:
pid = os.fork()
print("onefine")
if pid == 0:
print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
print('我是父进程{0}'.format(pid))
执行:
[[email protected] TEST_TEMP]# python3 fork_demo.py
onefine
我是父进程16872
[[email protected] TEST_TEMP]# onefine
子进程16871, 父进程1
[[email protected] TEST_TEMP]# python3 fork_demo.py
onefine
我是父进程16876
[[email protected] TEST_TEMP]# onefine
子进程16875, 父进程1
[[email protected] TEST_TEMP]#
截图仔细看:
由于父进程已经结束退出,子进程还没有结束。sleep的作用就体现再此,使得父进程等待子进程执行完毕之后,父进程退出时携子进程一起退出,不然父进程退出了子进程就没有了父进程。所以这里出现打印中断的情况。
总结:
父(os.getppid())-> 我(os.getpid())-> 子(pid)
二、使用multiprocessing模块创建多进程
multiprocessing模块提供了一个Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个Process实例的创建,用start()方法启动进程,用join()方法实现进程间的同步。
# 多进程编程
import multiprocessing
from time import sleep
def get_html(n):
sleep(n)
print("sub progress success")
return n
if __name__ == '__main__':
progress = multiprocessing.Process(target=get_html, args=(2,))
print(progress.pid) # 查看进程id
progress.start()
print(progress.pid) # start之后才有效果
progress.join()
print("main progress end")
执行:
None
22672
sub progress success
main progress end
自定义:
import multiprocessing
class MyProgress(multiprocessing.Process):
# 重新run方法
def run(self):
pass
使用进程池
当要启动大量的子进程的时候,使用上面介绍的方法虽可以满足需求,但使用进程池批量创建子进程的方式更加常见,因为当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态生成多个进程;如果是上百个或者上千个目标,手动去限制进程数量却又太过于繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到进程池中有进程结束,才会创建新的进程来处理它。
import multiprocessing
from time import sleep
def get_html(n):
sleep(n)
print("sub progress success")
return n
if __name__ == '__main__':
# 使用进程池
# 指明进程池中进程的个数,若不指明默认为cpu的数量
# pool = multiprocessing.Pool(4) # 或者下面这种
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# import os
# print(os.cpu_count()) # cpu的数量
# 提交一个任务
result = pool.apply_async(get_html, args=(3,))
# 在pool.join()之前调用pool.close(),使pool不再接收任务
pool.close()
# 等待所有任务完成
pool.join()
print(result.get()) # 得到执行结果
执行:
sub progress success
3
注意:Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能再继续添加新的Process了。
imap、imap_unordered方法介绍
from time import sleep
import multiprocessing
def get_html(n):
sleep(n)
print("sub progress success")
return n
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# imap,第二个参数是一个可迭代的对象
for result in pool.imap(get_html, [1, 5, 3]):
print("{} sleep success".format(result))
执行:
sub progress success
1 sleep success
sub progress success
sub progress success
5 sleep success
3 sleep success
from time import sleep
import multiprocessing
def get_html(n):
sleep(n)
print("sub progress success")
return n
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
# imap_unordered,第二个参数是一个可迭代的对象
for result in pool.imap_unordered(get_html, [1, 5, 3]):
print("{} sleep success".format(result))
执行:
sub progress success
1 sleep success
sub progress success
3 sleep success
sub progress success
5 sleep success
三、进程间通信
from multiprocessing import Process
from time import sleep
from queue import Queue
def producer(queue):
queue.put("onefine")
sleep(2)
def consumer(queue):
sleep(2)
data = queue.get()
print(data)
if __name__ == '__main__':
queue = Queue(10)
my_producer = Process(target=producer, args=(queue, ))
my_consumer = Process(target=consumer, args=(queue, ))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
运行:
Traceback (most recent call last):
File "D:/Test.py", line 27, in <module>
my_producer.start()
File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
使用多进程编程的时候不能使用queue中的Queue,我们可以使用multiprocessing中提供的Queue:
from multiprocessing import Process, Queue
from time import sleep
# from queue import Queue
def producer(queue):
queue.put("onefine")
sleep(2)
def consumer(queue):
sleep(2)
data = queue.get()
print(data)
if __name__ == '__main__':
queue = Queue(10)
my_producer = Process(target=producer, args=(queue, ))
my_consumer = Process(target=consumer, args=(queue, ))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
运行:
onefine
使用全局变量会怎样?
from multiprocessing import Process
from time import sleep
def producer(value):
value += 1
sleep(2)
def consumer(value):
sleep(2)
print(value)
if __name__ == '__main__':
value = 1
my_producer = Process(target=producer, args=(value, ))
my_consumer = Process(target=consumer, args=(value, ))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
运行:
1
总结:全局变量不能使用于多进程编程,但可以使用于多线程。
注意:multiprocessing中提供的Queue不能用于多线程中的通信。
from multiprocessing import Queue, Pool
from time import sleep
def producer(queue):
queue.put("onefine")
sleep(2)
def consumer(queue):
sleep(2)
data = queue.get()
print(data)
if __name__ == '__main__':
queue = Queue(10)
pool = Pool(2)
pool.apply_async(producer, args=(queue, ))
pool.apply_async(consumer, args=(queue, ))
pool.close()
pool.join()
执行之后没有任何显示!
那要怎样进行进程中的通信呢??
from multiprocessing import Pool, Manager
from time import sleep
def producer(queue):
queue.put("onefine")
sleep(2)
def consumer(queue):
sleep(2)
data = queue.get()
print(data)
if __name__ == '__main__':
# 对Manager实例化之后里面有个Queue
queue = Manager().Queue(10)
pool = Pool(2)
pool.apply_async(producer, args=(queue, ))
pool.apply_async(consumer, args=(queue, ))
pool.close()
pool.join()
输出:
onefine
利用pip(管道)进行进程中的通信
from multiprocessing import Pool, Pipe, Process
from time import sleep
def producer(pipe):
pipe.send("onefine")
sleep(2)
def consumer(pipe):
sleep(2)
print(pipe.recv())
if __name__ == '__main__':
# 实例化pipe之后返回两个
receive_pipe, send_pipe = Pipe()
# 注意pipe只能适用于两个指定进程中的通信
my_producer = Process(target=producer, args=(send_pipe, ))
my_consumer = Process(target=consumer, args=(receive_pipe, ))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
输出:
onefine
Pipe的性能高于queue
dict进行通信
from multiprocessing import Manager, Process
def add_data(p_dict, key, value):
p_dict[key] = value
if __name__ == "__main__":
progress_dict = Manager().dict()
first_progress = Process(target=add_data, args=(progress_dict, 'onefine1', 963))
second_progress = Process(target=add_data, args=(progress_dict, 'onefine2', 123))
first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()
print(progress_dict)
输出:
{'onefine1': 963, 'onefine2': 123}