Python学习笔记-进程与线程-1

multiprocessing模块
为再子进程中运行任务、通信和共享数据,以及执行各种形式的同步提供支持。这个编程接口有意模仿threading模块中线程的编程接口。但和线程不同,进程没有任何共享状态,这一点需要重点强调。因此如果某个进程修改数据,改动只限于该进程内。
Process([group[,target[,name[,args[,kwargs]]]]])
练习1-1
#! /bin/python
 
import os
import multiprocessing
import time
 
def task(intval):
  while True:
    print('time is %s' % (time.ctime()))
    time.sleep(intval)
 
if __name__ =='__main__':
  proc=multiprocessing.Process(target=task,args=(3,))
  print('a single process(%s) begin...' % (os.getpid()))
  proc.start()
 
执行结果
time is Tue Feb  6 08:30:54 2018
time is Tue Feb  6 08:30:57 2018
time is Tue Feb  6 08:31:00 2018
time is Tue Feb  6 08:31:03 2018
time is Tue Feb  6 08:31:06 2018
time is Tue Feb  6 08:31:09 2018
time is Tue Feb  6 08:31:12 2018
time is Tue Feb  6 08:31:15 2018
time is Tue Feb  6 08:31:18 2018
time is Tue Feb  6 08:31:21 2018
也可继承Process类来实现:
Python学习笔记-进程与线程-1
 
练习1-2
#python简单的进程操作
#一个进程写入文件
#一个进程读取文件并输出

import multiprocessing
import time

def write():
    str='abcdefghijklmnopqrstuvwxyz'
    with open('a.txt','w') as f:
        for ch in str:
            f.write('%c%s' % (ch,'\n'))
            f.flush()
            time.sleep(1)
def read():
    with open('a.txt','r') as f:
        while True:
            str=f.read()
            print(str)
            #不能判断26,因为有换行符
            # if len(str)==26:
            #     break
            #f.seek(0)表示从文件开始位置读取
           # f.seek(0)
            time.sleep(2)

if __name__=='__main__':
    pw=multiprocessing.Process(target=write)
    pr=multiprocessing.Process(target=read)

    pw.start()
    pr.start()
 
is_alive()方法
如果p仍运行,返回True
join([timeout])方法
等待子进程p终止后再向下执行。timeout是可选的超时期限。进程可以被连接无数次,但如果连接自身则会出错。
run()方法
进程启动时运行的方法(进程任务!)。默认情况下,会调用传递给Process构造函数的target。定义进程的另一种方法是从Process类继承并重新实现run()方法。
start()方法
启动进程,并调用该子进程的run()方法。
terminate()方法
强制终止进程。如果调用此函数,进程立即被终止,同时不会进行任何清理工作。如果进程创建了它自己的子进程,这些进程都将变为僵尸进程。如果进程保存了一个锁定或通过进程间通信被调用,那么终止它可能会导致死锁,或io崩溃。
authkey属性
进程的身份验证键。除非显示设定,这是由os.urandom()函数生成的32位字符的字符串。其用途是为涉及网络连接的底层进程间通信提供安全性。
daemon属性
一个boolean标志,指示进程是否是后台进程。后台进程无法创建自己的新进程。daemon的值必须再start()函数启动进程之前设置。
exitcode属性
进程的整数退出码,如果进程仍在运行,其值为None,如果为负值,-N表示进程由信号N所终止。
name属性
pid属性
进程的整数进程id
Queue(maxsize)
创建共享的进程队列。maxsize是队列中允许的最大项数。可省略,省略后表示大小无限制。
cancel_join_thread()
close()
empty()
full()
get([block[,timeout]])
get_nowait()
join_thread()
put(item[,block[,timeout]])
put_nowait(item)
qsize()
 
练习2-1
#一个简单的使用队列在多个线程通信
#
from multiprocessing import Process,Queue
import codecs
import time

def write(q):
    list=['你好','我好','他也好']
    with codecs.open('a.txt','w','utf-8') as f:
        for str in list:
            f.write(''.join([str,'\n']))
            q.put(str)
            time.sleep(2)
def read(q):
    while True:
        str=q.get()
        print(str)

if __name__=='__main__':
    q=Queue()
    pw=Process(target=write,args=(q,))
    pr=Process(target=read,args=(q,))

    pw.start()
    pr.start()
JoinableQueue(maxsize)
创建可连接的共享进程队列,与Queue类似,但队列允许项目的使用者通知生产者,项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的
练习2-1中,由于read方法是死循环,因此主进程结束后,程序被挂起;可使用JoinableQueue+将消费进程设为守护进程的方式解决无意义的挂起问题。
练习3-1
import multiprocessing
import time

def producer(material,q):
    for str in material:
        q.put(str)
        print('生产者将数据%s放入队列'% str)
       # time.sleep(1)
def consumer(q):
    while True:
        str=q.get()
        print('消费者消费一个数据%s' % str)
        #通知生产者已消费一个数据
        q.task_done()

if __name__=='__main__':
    list=['一','二','三']
    q=multiprocessing.JoinableQueue()
    pc=multiprocessing.Process(target=consumer,args=(q,))
    #由于消费者进程是死循环,因此将其设置为守护进程
    pc.daemon=True
    pc.start()
    producer(list,q)
    #由于pc是守护进程,为防止主进程结束时pc尚未执行完毕,因此调用队列q的join方法阻塞,
    #等待所有数据均被处理(调用task_done()方法)后,再结束pc
    q.join()
    print('主进程将执行结束')
练习3-2
多个消费进程一起处理消息队列中的数据
import multiprocessing
import time
import os

def inp(p):
    while True:
        str = p.get()
        print("%s--%s" % (os.getpid(),str))
        p.task_done()
        time.sleep(1)

def out(list,p):
    for str in list:
        p.put(str)
        print('put a str(%s) into Queue' % str)

if __name__=='__main__':
    list=['aaa','bbb','ccc']
    p=multiprocessing.JoinableQueue()
    #创建两(多)个消费进程,一起处理消息队列中的数据
    proc=multiprocessing.Process(target=inp,args=(p,))
    proc.daemon=True
    proc2=multiprocessing.Process(target=inp,args=(p,))
    proc2.daemon=True

    proc.start()
    proc2.start()

    out(list,p)
    p.join()
输出结果:
put a str(aaa) into Queue
put a str(bbb) into Queue
put a str(ccc) into Queue
14996--aaa
4976--bbb
14996--ccc
由上可见,多个消费进程先到先得,多个消费进程一同处理一份消息队列中的数据
总的规则:发送数量少的大对象比发送数量多的小对象要好
 
在某些应用中,生产者需要通知消费者,它们不再生产任何项目而且应该关闭。为此,编写的代码中应该使用标志(sentinel)——指示完成的特殊值。下面的例子使用None作为标志说明这个概念:
import multiprocessing
import os

def cons(q):
    while True:
        str=q.get()
        if str is None:
            break
        print('%s--%s' % (os.getpid(),str))
def prod(list,q):
    for str in list:
        q.put(str)

if __name__=='__main__':
    q=multiprocessing.Queue()
    pc=multiprocessing.Process(target=cons,args=(q,))
    pc.start()
    list=['xxx','yyy','zzz']
    prod(list,q)
    #打一个值为None的标记,消费进程接收None后,跳出死循环,终止子进程
    q.put(None)
    #主进程等待子进程结束
    pc.join()
但上面的例子只适合一个消费进程的情况,如果有多个消费进程,需要为每个消费进程单独打标记,才能让每个子进程正常结束,非常麻烦,解决的办法就是管道
task_done()
使用者使用此方法发出信号,表示get()方法返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常
join()
生产者使用此方法进行阻塞,直到队列中的所有项目均被处理。阻塞将持续到为队列中的每个项目均调用task_done()方法为止
Pipe([duplex])
在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1conn2是表示管道两端的Connection对象。默认情况下,管道是双向的。如果duplex值为Falseconn1只能用于接收,而conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。
Pipe()方法返回的Connection对象的实例具有以下方法和属性
注意理解下面两个示例中Connection对象的inputoutput
input:将数据放入管道
output:从管道中取出
简单来说,就是a发送,b接收,b发送,a接收,但实际仍是一个Connection,区别指示接收的顺序不同
Python学习笔记-进程与线程-1
 
练习 4-1
import multiprocessing

def cons(pipe):
    output_p, input_p = pipe
    # 由于消费端不需要向管道放入数据,因此关闭
    input_p.close()
    while True:
        try:
            item = output_p.recv()
            print(item)
            # 如果生产端关闭管道,则消费端会接收到EOFError异常,如出现此异常则表示
            # 工作已结束,可以跳出死循环并结束子进程了
        except EOFError:
            break
    print('consumer done!')

def prod(list, input_p):
    for item in list:
        '''
        向管道中放入数据
        '''
        input_p.send(item)

if __name__ == '__main__':
    list = ['hello', 'ok', 'welcome']
    output_p, input_p = multiprocessing.Pipe()
    proc = multiprocessing.Process(target=cons, args=((output_p,input_p),))
    '''
    启动消费进程,让其等待处理数据
    '''
    proc.start()
    '''
    output_p用不到,因此关闭
    '''
    output_p.close()
    '''
    将数据放入管道,
    '''
    prod(list, input_p)
    '''
    数据全部放入管道后,可以将input_p关闭了
    '''
    input_p.close()
    '''
    主进程等待消费进程处理完毕
    '''
    proc.join()
练习4-2
import multiprocessing
import os
import codecs

def write(pipe):
    output_p, input_p = pipe
    input_p.close()
    while True:
        try:
            item = output_p.recv()
            with codecs.open('a.txt', 'a','utf-8') as f:
                f.write('%s\n' % item)
                f.flush()
                print(item)
        except EOFError:
            break

def suplly(list,input_p):
    for item in list:
        input_p.send(item)

if __name__=='__main__':
    output_p,input_p=multiprocessing.Pipe()
    proc=multiprocessing.Process(target=write,args=((output_p,input_p),))
    proc.start()
    output_p.close()

    suplly(['中国','人民','万岁'],input_p)

    input_p.close()

    proc.join()
 
close()
fileno()
poll()
recv()
recv_bytes([maxlength])
recv_bytes_into(buffer[,offset])
send()
send_bytes(buffer[,offset[,size]])