线程

进程:最小资源单位
为什么要有进程的切换,如果没有就只能执行一个任务,如只能听音乐,结束后才能看写博客等等。
线程:最小执行单位
一个进程中的所有线程共享这个进程中的资源。cup里面执行的是线程。进程是代表资源的管理,他来管理这些线程。一进程中最少有一个线程,
在这个线程里面可以开多个子线程。进程中同样可以开子进程。
创建子线程目的:同时处理很多任务)

创建子线程方式一:直接创建threading.Thread类对象

import threading
import time
import os

def music():
    print('music begin time: %s' % time.ctime())
    print(os.getpid(), os.getppid())
    time.sleep(5)
    print('music end time: %s' % time.ctime())

def game():
    print('game begin time: %s' % time.ctime())
    print(os.getpid(), os.getppid())
    time.sleep(10)
    print('game end time: %s' % time.ctime())

join方法:只能控制主线程的执行,不能控制别的线程的执行。
if __name__ == '__main__':
    print(os.getpid(), os.getppid())
    t1 = threading.Thread(target=music)
    t2 = threading.Thread(target=game)
    t1.start()  # 开启与主线程下的子线程1,此子线程是和主线程一同执行的
    t2.start()  # **子线程二
    t1.join()   # 执行完t1线程才能执行主线程以下语句
    print('ending....')
    t1.join()
    print('kdfj')

运行结果


1996 7160  # 此文件的进程id,执行时此进程时创建了一个线程,线程再创建此线程的子线程,父文件进程id,也就是pycharm(操作系统分配的),
music begin time: Wed Sep 19 19:30:11 2018
1996 7160
game begin time: Wed Sep 19 19:30:11 2018
1996 7160
music end time: Wed Sep 19 19:30:16 2018
ending....
game end time: Wed Sep 19 19:30:21 2018

setDaemon方法:守护线程

import threading
from time import ctime, sleep, time
import time

def ListenMusic(name):
    print('begin listen to %s.%s' % (name, time()))
    sleep(2)
    print('end listening %s' % (time()))

def RecordBlog(title, con):
    print('begin recording the %s!, %s, %s' % (title, con, time()))
    sleep(2)
    print('end recording %s!, %s, %s' % (title, con, time()))
threads = []
t1 = threading.Thread(target=ListenMusic, args=('水手',))
t2 = threading.Thread(target=RecordBlog, args=('python线程',))
threads.append(t1)
threads.append(t2)
print(threads[0], threads[1])  # <Thread(Thread-1, initial)> <Thread(Thread-2, initial)>

if __name__ == '__main__':
    t1.setDaemon(True)  # daemon:守护者,t2被主线程守护,跟随主线程的任务而结束。你有我有,你没有我没有,主线程退出了我子线程不执行了,我要跟着主线程一起退。如果t1和t2都守护,主线程的代码执行完,他俩跟着主线程一起退。如果t1守护,而t2没有守护,那么主线程代码执行完后等着t2执行完才退,如果t2执行完之t1代码也会执行完,t1依然执行。守护是守护主线程真正结束而退出该进程的层面而言,而不是相对代码执行完成与否而言的。
    t2.setDaemon(True)
    t1.start()
    t2.start()
    for t in threads:
        t1.setDaemon(True)
        t.start()
    t1.start()
        print(t.getName())

    print('count', threading.active_count())
        print(t.getName())
        print('count: ', threading.active_count())
        t.join()
    t.join()

    sleep(3)
    print('主线程结束 %s' % ctime())
    for i in threading.enumerate():
        print(i)
    while threading.active_count() == 1:
        print('all over %s ' % ctime())
    

创建子线程方式二:继承的方式创建类

class MyTd(threading.Thread):
    def __init__(self, num, target, args):
        threading.Thread.__init__(self)
        self._target = target
        self._args = args
        self.num = num
    def run(self):  # 重写父类中run方法
        print('running on number: %s ' % self.num)
        time.sleep(3)

if __name__ == '__main__':
    t1 = MyTd(6, target=ListenMusic, args=('水手1',))
    t2 = MyTd(8, target=RecordBlog, args=('水手2', '快递费借口拒绝开放'))
    t1.start()
    t2.start()
    print('ending......%s'% time())

同步锁

并发:一个cpu轮流分配时间片给多个任务,
并行:同时:同一个时间点,多个任务同时执行,并行一定是并发,并发一定不是并行。并行是并发的子集。
同步:和io同步,相当于打电话,等待接收到对方的信息才进行下一步。
异步:相当于发信息,我可以先回复或处理别的信息再回去处理没有回复的信息。不会等待io或阻塞。


 # 1 开100个线程,累加或累减的操作

import threading

def sub():
    global num
    num -= 1   # 每次结果减1
l = []
num = 100

for i in range(100):  # 虽然开了100个子线程,但是由于有pid锁,一个进程一个锁,一个进程只能跑一个线程,相当于串行或并发。
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)


for t in l:
    t.join()  # 主线程等待每个t执行完再执行

print(num)


# 2 开100个线程,累加或累减的操作
import threading

def sub():
    global num
    temp = num  # 全局变量赋值给temp
    num = temp - 1  # temp - 1 赋值给num,num还是全局变量,和上题效果一样,只是多了一次赋值的操作
l = []
num = 100

for i in range(100):
    t = threading.Thread(target=sub)
    t.start()
    l.append(t)


for t in l:
    t.join()
print(num)  # 0

# 3
import threading
import time
lock = threading.Lock()
def sub(i):
    global num
    lock.acquire()  # 锁定以下内容,没有cpu没有经过时间轮寻。变成串行了
    print('%s begin time : %s' % (i, time.time()), num)
    temp = num
    time.sleep(0.001) # 每个线程轮流抢占cpu,拿到num时可能是10,去cpu执行完代码后返回给全局变量num9,而这时另外一条线早已经拿到num的值时开始他出去时的值等着排队的,cpu是不会闲着。回来后num还是9,因为是属于又一次的赋值。
    num = temp - 1
    print('%s end time: %s' % (i, time.time()), num)
    lock.release()
l = []
num = 10

for i in range(10):
    t = threading.Thread(target=sub, args=(i,))
    t.start()
    l.append(t)


for t in l:
    t.join()
print(num)  # 每次执行结果不一样

递归锁

这个类实现可重入的锁定对象。一个可重入的锁必须由获取它的线程释放。曾经一次线程获得了可重入锁,同一线程可以获取它。在没有阻塞的情况下,线程必须每次释放一次


import threading
import time
class MyThread(threading.Thread):
    def action(self):
        r_lock.acquire()
        print(self.name, 'got lock', time.ctime())
        time.sleep(2)
        r_lock.acquire()

        print(self.name, 'got lock', time.ctime())
        time.sleep(1)

        r_lock.release()
        r_lock.release()

    def run(self):
        self.action()

if __name__ == '__main__':
    r_lock = threading.RLock()
    l = []
    for i in range(5):
        t = MyThread()
        t.start()
        l.append(t)

    for i in l:
        i.join()

    print('end')

运行结果如下

Thread-1 got lock Sat Sep 22 10:52:46 2018
Thread-1 got lock Sat Sep 22 10:52:48 2018
Thread-2 got lock Sat Sep 22 10:52:49 2018
Thread-2 got lock Sat Sep 22 10:52:51 2018
Thread-3 got lock Sat Sep 22 10:52:52 2018
Thread-3 got lock Sat Sep 22 10:52:54 2018
Thread-4 got lock Sat Sep 22 10:52:55 2018
Thread-4 got lock Sat Sep 22 10:52:57 2018
Thread-5 got lock Sat Sep 22 10:52:58 2018
Thread-5 got lock Sat Sep 22 10:53:00 2018
end

队列

线程中的队列,导入单独的queue模块,常用的是Queue类,import queue

The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics. It depends on the availability of thread support in Python; see the threading module.

The module implements three types of queue, which differ only in the order in which the entries are retrieved. In a FIFO queue, the first tasks added are the first retrieved. In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack). With a priority queue, the entries are kept sorted (using the heapq module) and the lowest valued entry is retrieved first.

Internally, the module uses locks to temporarily block competing threads; however, it is not designed to handle reentrancy within a thread.

该模块实现了多生产者,多消费者队列。当必须在多个线程之间安全地交换信息时,它在线程编程中特别有用。此模块中的类实现了所有必需的锁定语义。这取决于Python中线程支持的可用性; 看 模块。queueQueuethreading

该模块实现了三种类型的队列,它们的区别仅在于检索条目的顺序。在FIFO 队列中,添加的第一个任务是第一个检索的任务。在 LIFO队列中,最近添加的条目是第一个检索的条目(像堆栈一样运行)。使用优先级队列,条目将保持排序(使用heapq模块),并首先检索最低值的条目。

在内部,模块使用锁来临时阻止竞争线程; 但是,它不是为处理线程内的重入而设计的。

该模块定义了以下类和异常:queue

class (maxsize = 0 )queue.Queue
FIFO 队列的构造函数。 maxsize是一个整数,用于设置可以放入队列的项目数的上限。达到此大小后,插入将阻止,直到消耗队列项。如果 maxsize小于或等于零,则队列大小为无限大。

class (maxsize = 0 )queue.LifoQueue
LIFO 队列的构造函数。 maxsize是一个整数,用于设置可以放入队列的项目数的上限。达到此大小后,插入将阻止,直到消耗队列项。如果 maxsize小于或等于零,则队列大小为无限大。

class (maxsize = 0 )queue.PriorityQueue
优先级队列的构造函数。 maxsize是一个整数,用于设置可以放入队列的项目数的上限。达到此大小后,插入将阻止,直到消耗队列项。如果 maxsize小于或等于零,则队列大小为无限大。

首先检索最低值的条目(最低值条目是返回的条目sorted(list(entries))[0])。条目的典型模式是以下形式的元组:。(priority_number, data)

异常queue.Empty
在对象为空时调用非阻塞get()(或 get_nowait())时引发异常。Queue

异常queue.Full
在对象已满时调用非阻塞put()(或 put_nowait())时引发异常。Queue

队列对象(,或)提供下面描述的公共方法。QueueLifoQueuePriorityQueue

Queue.qsize()
返回队列的大致大小。注意,qsize()> 0不保证后续的get()不会阻塞,qsize()<maxsize也不保证put()不会阻塞。

Queue.empty()
True如果队列为空False则返回,否则返回。如果empty()返回True,则不保证对put()的后续调用不会阻塞。类似地,如果empty()返回False,则不保证对get()的后续调用不会阻塞。

Queue.full()
True如果队列已满,False则返回,否则返回。如果full()返回True,则不保证对get()的后续调用不会阻塞。同样,如果full()返回False,则不保证对put()的后续调用不会阻塞。

Queue.put(item,block = True,timeout = None )
将项目放入队列。如果可选的args 块为true且timeout为 None(默认值),则在必要时阻塞,直到有空闲插槽可用。如果 timeout是一个正数,则它会阻止最多超时秒,Full如果在该时间内没有可用的空闲槽,则会引发异常。否则(块为假),如果空闲插槽立即可用,则将项目放在队列上,否则引发Full异常(在这种情况下忽略超时)。

Queue.put_nowait(项目)
相当于。put(item, False)

Queue.get(block = True,timeout = None )
从队列中删除并返回一个项目。如果可选的args 块为true且 timeout为None(默认值),则在必要时阻止,直到某个项可用为止。如果timeout是一个正数,它会阻止最多超时秒,Empty如果在该时间内没有可用的项,则会引发异常。否则(块为假),如果一个项立即可用则返回一个项,否则引发Empty异常(在这种情况下忽略超时)。

Queue.get_nowait()
相当于get(False)。

提供了两种方法来支持跟踪队列 d任务是否已由守护程序使用者线程完全处理。

Queue.task_done()
指示以前的队列 d任务已完成。由队列使用者线程使用。对于每个get()用于获取任务的人,后续调用 task_done()告诉队列任务的处理完成。

如果a join()当前正在阻止,则它将在所有项目都已处理后恢复(意味着task_done()已收到已put()进入队列的每个项目的呼叫)。

提出一个ValueErrorif被调用的次数比放入队列中的项目多一次。

Queue.join()
阻止直到队列中的所有项目都被获取并处理。

每当项目添加到队列时,未完成任务的计数就会增加。每当消费者线程调用task_done()以指示该项目已被检索并且其上的所有工作都已完成时,计数就会下降。当未完成任务的数量降至零时,join()取消阻止。

queue.Queue(4) 队列默认容量为3,默认FIFO模式:first in first out 先进先出

import threading,time
import queue
q = queue.Queue(4)  # 队列容量为3,默认FIFO模式:first in first out  先进先出
q.put(15)
q.put('hello')
q.put({'name': 'laura'})
q.put_nowait(56)  # 相当于:q.put(56, block=False) 如果队列已经满了,我就不等了nowait,终止程序并报错raise Full:queue.Full,如果直接写q.put(56)就会阻塞,不会报错但程序也不会执行下面的语句。(阻塞了)等到队列腾出空间。类似于input阻塞。
print(q.qsize())  # 4
print(q.empty())  # False
print(q.full())   # True
while 1:
    data = q.get()   # 如果队列为空,就一直等待存值。阻塞。
    print(data)
    print('-------')

queue.LifoQueue() 后进先出模式,Last in first out 栈模式


q = queue.LifoQueue()  # 
q.put(12)
q.put('hello')
q.put('world')

while 1:
    data = q.get()
    print(data)
    print('-------')

运行结果:

world
 -------
 hello
 -------
 12
 -------

queue.PriorityQueue() 按优先级顺序检索打开条目的队列的 ,可用于构造堆模式。 Variant of Queue that retrieves open entries in priority order (lowest first).


q = queue.PriorityQueue()  # Variant of Queue that retrieves open entries in priority order (lowest first).
q.put([3, 20])
q.put([9, 'kid'])
q.put([5, 'iris'])
while 1:
    data = q.get()
    print(data)
    print('----')

运行结果

 [3, 20]
 ----
 [5, 'iris']
 ----
 [9, 'kid']
 ----

线程中生产者与消费者模型

import time,random
import queue,threading

q = queue.Queue()  # Create a queue object with a given maximum size. If maxsize is <= 0, the queue size is infinite.如果参数为空,队列的容量是无限的# 创建一个队列对象 queue模块中的源码写着:# NOTE: These are incomplete!未完待续,what???

def Producer(name):  # 定义生产者函数
    count = 0
    while count < 10:
        print('making hamburger......')
        time.sleep(5)  # 制作一个汉堡包5秒钟
        q.put(count)  # 制作完成后放入队列
        print('Producer %s has produced %s hamburger......' % (name, count))
        count += 1
        q.task_done()  # 告诉队列我放了一个值进去了
        print('ok ....')

def Consumer(name):  # 定义消费者函数
    count = 0
    while count < 10:
        t = random.randrange(5)
        print('waiting %ss......' %t)
        time.sleep(t)  # 等待制作完成的时间
        data = q.get()  # 取一个值
        print('eating....')
        q.join()  # 告诉对列我已经取走一个值了,执行完这句才会再放内容到队列 执行完这句q.task_done接收到了这条消息,代码才可以往下走。
        time.sleep(4)  # 吃汉堡包的时间
        print('Consumer %s has eat %s hamburger' % (name, data))

p1 = threading.Thread(target=Producer, args=('A君',))  # 子线程一
c1 = threading.Thread(target=Consumer, args=('B君',))  # 子线程二
c2 = threading.Thread(target=Consumer, args=('C君',))
c3 = threading.Thread(target=Consumer, args=('D君',))

p1.start()
c1.start()
c2.start()
c3.start()

event事件

import threading, time
class Boss(threading.Thread):
    def run(self):  # 用对象调用start方法可激发此方法
        print('Boss: 今晚大家都要加班到10点')
        print(event.isSet())  # False  # 此时event还没有设置
        event.set()  # 此时设置event,只能这条线程先走,别的都不能运行
        time.sleep(5)
        print('Boss: 可以下班了。')
        print(event.isSet())
        event.set()  # set the internal flag to true.All threads waiting for it to become true are awakened. (所有内部含有event.wait()的线程将被唤醒)Threads that call wait() once the flag is true will not block at all.
class Worker(threading.Thread):
    def run(self):
        event.wait()  # 一旦event被设定,等同于pass,等待直到有event.set()才可以执行以下语句 Block until the internal flag is true
        print('Worker: 哎...命苦啊!')
        time.sleep(1)
        event.clear()  # Reset the internal flag to false.set刚好相反
        event.wait()   # 重新发起事件等待 threads calling wait() will block(阻塞) until set() is called to set the internal flag(event中内部标志) to true again.
        print('Worker: OhYeah')

if __name__ == '__main__':
    event = threading.Event()

    threads = []
    for i in range(5):
        threads.append(Worker())
    threads.append(Boss())
    for t in threads:
        t.start()
    for i in threads:
        i.join()
    print('end')

执行结果:

Boss: 今晚大家都要加班到10False
Worker:...命苦啊!
Worker:...命苦啊!
Worker:...命苦啊!Worker:...命苦啊!
Worker:...命苦啊!

Boss: 可以下班了。
False
Worker: OhYeah
Worker: OhYeah
Worker: OhYeah
Worker: OhYeahWorker: OhYeah

end

信号量


import threading,time

class myThread(threading.Thread):
    def run(self):
        if semaphore.acquire():  # Acquire a semaphore, decrementing the internal counter by one  # 相当与lock.acquire方法,上锁的过程
            print(self.name)
            time.sleep(3)
            semaphore.release()  #Release a semaphore, incrementing the internal counter by one  # lock.release() 释放锁的过程
if __name__ == '__main__':
    semaphore = threading.Semaphore(5)  # 指定锁的数量
    thrs = []
    for i in range(100):
        thrs.append(myThread())
    for t in thrs:
        t.start()

Pool

concurrent.futures 模块提供了高度封装的异步调用接口

  • ThreadPoolExecutor: 线程池,提供异步调用
  • ProcessPoolExecutor:进程池,提供异步调用
    Both implemet the same interface,which is defined by the abstract Executor class.
    两者都继承自一个接口,也就是说用法相同。

方法:

  • submit(fn, *args, **kwargs)
    异步提交任务 第一个参数为函数名,第二个给函数的参数,并且只能按位置参数传参

  • map(func, *iterables, timeout=None, chunksize=1)
    取代for循环submit的操作

  • shutdown(wait=True)
    相当于进程池的pool.close() + pool.join() 操作
    wait = True 等待池内所有任务执行完毕回收完资源后才继续
    wait = False 立即返回,并不会等待池内的任务执行完毕
    但不管wait参数为何值,整个程序都会等到所有任务执行完毕
    submit和map必须在shutdown之前。

  • result(timeout=None)
    取得结果

  • add_done_callback(fn)
    回调函数

异步 线程池


import time
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread  # 查看线程id
# 第一步:定义提交给线程池的任务
def func(i):
    print('thread', i, current_thread())  # 线程id
    time.sleep(1)
    print('thread %s end' % i)
# 第二步:实例化一个线程池
tp = ThreadPoolExecutor(5)  # 线程池里面5个线程  ,一个线程池建议起5 * cpu_count 个线程
# 第三步:向池里面提交任务
for i in range(20):  # 提交20个线程任务
    ret = tp.submit(func, i)  # 提交一个任务,返回结果赋值给ret
# 第四步:选择是否阻塞等到所有线程池里面的任务都结束
tp.shutdown()  # 等待提交的任务执行完后再执行以下语句 也就是tp.join() + tp.close()
print(ret)

运行结果

线程

# 获取返回值

import time
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread

def func(i):
    print('thread', i, cthread().ident)
    time.sleep(1)
    print('thread %s end' % i)
    return i * '*'

tp = ThreadPoolExecutor(5)
ret_l = []
for i in range(20):
    ret = tp.submit(func, i)
    ret_l.append(ret)
for ret in ret_l:
    print(ret.result())
print('主线程')

map

import time
from concurrent.futures import ThreadPoolExecutor
def func(i):
    print('thread', i)
    time.sleep(1)
    print('thread %s end' % i)
    return i * '*'

tp = ThreadPoolExecutor(5)
res = tp.map(func, range(20))  # map不用提交也可以获取结果,简化操作
for i in res: print(i)  # 并发获取结果的过程

回调函数

import time
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread

def func(i):
    print('thread', i, cthread().ident)  # 该回调函数由子线程执行。
    time.sleep(1)
    print('thread %s end' % i)
    return i * '*'
def call_back(arg):
    print('call back: ', cthread().ident)  # 该回调函数由子线程执行。
    print('ret: ', arg.result())

tp = ThreadPoolExecutor(5)
ret_l = []
for i in range(20):
    tp.submit(func, i).add_done_callback(call_back)  # 提交任务并且把该任务的返回值交给回调函数并做进一步处理
print('主线程', print(cthread().ident))

ProcessPoolExcutor 和 TreadPoolExecutor 类用法一样。只是一个是进程池,一个是线程池,唯一不同的是回调函数中进程池依然是主进程调用,线程池中是子线程调用。