Python多进程

Python实现多进程的方式主要有两种,一种方式是使用os模块中的fork方法,另一种方法是使用multiprocessing模块。这两种方法的区别在于前者仅适用于Unix/Linux操作系统,对Windows不支持,后者则是跨平台的实现方式。由于现在很多爬虫程序都是运行在Unix/Linux操作系统上,所以两种方式都需要了解一下。

一、使用os模块中的fork方式实现多进程

Python的os模块封装了常见的系统调用,其中就有fork方法。fork方法来自于Unix/Linux操作系统中提供的一个fork系统调用,这个方法非常特殊:普通的方法都是调用一次,返回一次;而fork方法是调用一次,返回两次。原因在于操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同(可以理解为fork之后的代码进行拷贝);于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的pid(进程ID)。

示例:

再次提醒,fork只用于Linux/Unix中,这里在Linux/Unix中执行:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2019/4/7 15:19
# @Author  : One Fine
# @File    : fork_demo.py

import os
import time

pid = os.fork()
print("onefine")
if pid == 0:
    print('子进程{0}, 父进程{1}'.format(os.getpgid(), os.getppid()))
else:
    print('我是父进程{0}'.format(pid))

time.sleep(2)
运行:
[[email protected] TEST_TEMP]# python3 fork_demo.py 
onefine
我是父进程16752
onefine
Traceback (most recent call last):
  File "fork_demo.py", line 7, in <module>
    print('子进程{0}, 父进程{1}'.format(os.getpgid(), os.getppid()))
TypeError: getpgid() missing required argument 'pid' (pos 1)

1 修改:

if pid == 0:
    print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
    print('我是父进程{0}'.format(pid))

再次重新运行:

[[email protected] TEST_TEMP]# python3 fork_demo.py 
onefine
我是父进程16797
onefine
子进程16796, 父进程16796
[[email protected] TEST_TEMP]# 

2 修改:

print("onefine")
pid = os.fork()
if pid == 0:
    print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
    print('我是父进程{0}'.format(pid))

time.sleep(2)

运行:

[[email protected] TEST_TEMP]# python3 fork_demo.py 
onefine
我是父进程16848
子进程16847, 父进程16847
[[email protected] TEST_TEMP]# 

3 修改:

print("onefine")
pid = os.fork()
if pid == 0:
    print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
    print('我是父进程{0}'.format(pid))

执行:

[[email protected] TEST_TEMP]# python3 fork_demo.py 
onefine
我是父进程16860
子进程16859, 父进程16859
[[email protected] TEST_TEMP]#

4 修改:

pid = os.fork()
print("onefine")
if pid == 0:
    print('子进程{0}, 父进程{1}'.format(os.getpgid(pid), os.getppid()))
else:
    print('我是父进程{0}'.format(pid))

执行:

[[email protected] TEST_TEMP]# python3 fork_demo.py 
onefine
我是父进程16872
[[email protected] TEST_TEMP]# onefine
子进程16871, 父进程1

[[email protected] TEST_TEMP]# python3 fork_demo.py 
onefine
我是父进程16876
[[email protected] TEST_TEMP]# onefine
子进程16875, 父进程1

[[email protected] TEST_TEMP]# 

截图仔细看:
Python多进程
由于父进程已经结束退出,子进程还没有结束。sleep的作用就体现再此,使得父进程等待子进程执行完毕之后,父进程退出时携子进程一起退出,不然父进程退出了子进程就没有了父进程。所以这里出现打印中断的情况。

总结:
父(os.getppid())-> 我(os.getpid())-> 子(pid)

二、使用multiprocessing模块创建多进程

multiprocessing模块提供了一个Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个Process实例的创建,用start()方法启动进程,用join()方法实现进程间的同步。

# 多进程编程
import multiprocessing
from time import sleep


def get_html(n):
    sleep(n)
    print("sub progress success")
    return n


if __name__ == '__main__':
    progress = multiprocessing.Process(target=get_html, args=(2,))
    print(progress.pid)  # 查看进程id
    progress.start()
    print(progress.pid)  # start之后才有效果
    progress.join()
    print("main progress end")

执行:

None
22672
sub progress success
main progress end

自定义:

import multiprocessing

class MyProgress(multiprocessing.Process):
	# 重新run方法
    def run(self):
        pass

使用进程池

当要启动大量的子进程的时候,使用上面介绍的方法虽可以满足需求,但使用进程池批量创建子进程的方式更加常见,因为当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态生成多个进程;如果是上百个或者上千个目标,手动去限制进程数量却又太过于繁琐,这时候进程池Pool发挥作用的时候就到了。
Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到进程池中有进程结束,才会创建新的进程来处理它。

import multiprocessing
from time import sleep

def get_html(n):
    sleep(n)
    print("sub progress success")
    return n

if __name__ == '__main__':
    # 使用进程池

    # 指明进程池中进程的个数,若不指明默认为cpu的数量
    # pool = multiprocessing.Pool(4)  # 或者下面这种
    pool = multiprocessing.Pool(multiprocessing.cpu_count())

    # import os
    # print(os.cpu_count())  # cpu的数量

    # 提交一个任务
    result = pool.apply_async(get_html, args=(3,))

    # 在pool.join()之前调用pool.close(),使pool不再接收任务
    pool.close()

    # 等待所有任务完成
    pool.join()
    print(result.get())  # 得到执行结果

执行:

sub progress success
3

注意:Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能再继续添加新的Process了。

imap、imap_unordered方法介绍

from time import sleep
import multiprocessing

def get_html(n):
    sleep(n)
    print("sub progress success")
    return n

if __name__ == '__main__':
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    # imap,第二个参数是一个可迭代的对象
    for result in pool.imap(get_html, [1, 5, 3]):
        print("{} sleep success".format(result))

执行:

sub progress success
1 sleep success
sub progress success
sub progress success
5 sleep success
3 sleep success
from time import sleep
import multiprocessing

def get_html(n):
    sleep(n)
    print("sub progress success")
    return n

if __name__ == '__main__':
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    # imap_unordered,第二个参数是一个可迭代的对象
    for result in pool.imap_unordered(get_html, [1, 5, 3]):
        print("{} sleep success".format(result))

执行:

sub progress success
1 sleep success
sub progress success
3 sleep success
sub progress success
5 sleep success

三、进程间通信

from multiprocessing import Process
from time import sleep
from queue import Queue


def producer(queue):
    queue.put("onefine")
    sleep(2)


def consumer(queue):
    sleep(2)
    data = queue.get()
    print(data)


if __name__ == '__main__':
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue, ))
    my_consumer = Process(target=consumer, args=(queue, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

运行:

Traceback (most recent call last):
  File "D:/Test.py", line 27, in <module>
    my_producer.start()
  File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "c:\users\xxx\appdata\local\programs\python\python37\Lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

使用多进程编程的时候不能使用queue中的Queue,我们可以使用multiprocessing中提供的Queue:

from multiprocessing import Process, Queue
from time import sleep
# from queue import Queue


def producer(queue):
    queue.put("onefine")
    sleep(2)


def consumer(queue):
    sleep(2)
    data = queue.get()
    print(data)


if __name__ == '__main__':
    queue = Queue(10)
    my_producer = Process(target=producer, args=(queue, ))
    my_consumer = Process(target=consumer, args=(queue, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

运行:

onefine
使用全局变量会怎样?
from multiprocessing import Process
from time import sleep


def producer(value):
    value += 1
    sleep(2)


def consumer(value):
    sleep(2)
    print(value)


if __name__ == '__main__':
    value = 1
    my_producer = Process(target=producer, args=(value, ))
    my_consumer = Process(target=consumer, args=(value, ))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

运行:

1

总结:全局变量不能使用于多进程编程,但可以使用于多线程。

注意:multiprocessing中提供的Queue不能用于多线程中的通信。

from multiprocessing import Queue, Pool
from time import sleep

def producer(queue):
    queue.put("onefine")
    sleep(2)

def consumer(queue):
    sleep(2)
    data = queue.get()
    print(data)


if __name__ == '__main__':
    queue = Queue(10)
    pool = Pool(2)

    pool.apply_async(producer, args=(queue, ))
    pool.apply_async(consumer, args=(queue, ))
    pool.close()
    pool.join()

执行之后没有任何显示!

那要怎样进行进程中的通信呢??

from multiprocessing import Pool, Manager
from time import sleep


def producer(queue):
    queue.put("onefine")
    sleep(2)


def consumer(queue):
    sleep(2)
    data = queue.get()
    print(data)


if __name__ == '__main__':
	# 对Manager实例化之后里面有个Queue
    queue = Manager().Queue(10)
    pool = Pool(2)
    pool.apply_async(producer, args=(queue, ))
    pool.apply_async(consumer, args=(queue, ))
    pool.close()
    pool.join()

输出:

onefine

利用pip(管道)进行进程中的通信

from multiprocessing import Pool, Pipe, Process
from time import sleep


def producer(pipe):
    pipe.send("onefine")
    sleep(2)


def consumer(pipe):
    sleep(2)
    print(pipe.recv())


if __name__ == '__main__':
    # 实例化pipe之后返回两个
    receive_pipe, send_pipe = Pipe()
    # 注意pipe只能适用于两个指定进程中的通信

    my_producer = Process(target=producer, args=(send_pipe, ))
    my_consumer = Process(target=consumer, args=(receive_pipe, ))

    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

输出:

onefine

Pipe的性能高于queue

dict进行通信

from multiprocessing import Manager, Process


def add_data(p_dict, key, value):
    p_dict[key] = value


if __name__ == "__main__":
    progress_dict = Manager().dict()

    first_progress = Process(target=add_data, args=(progress_dict, 'onefine1', 963))
    second_progress = Process(target=add_data, args=(progress_dict, 'onefine2', 123))

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()

    print(progress_dict)

输出:

{'onefine1': 963, 'onefine2': 123}