day24:队列、线程
# 1.Queue的使用
# 步骤1:导入模块
from multiprocessing import Queue
#步骤2:创建一个队列
q = Queue(3)#可接一个整数,表示队列的容量,如果省略则表示不设上限。
print(q.empty())#True
#步骤3:往队列添加消息
# 格式:put(obj, block=True, timeout=None)
#obj:消息对象
q.put('消息1')
print(q.empty())#False
print('~~~~~~~~~~q.qsize:',q.qsize())#1
q.put('消息2')
print(q.full())#False
q.put('消息3')
#判断队列状态的方法
q.full()#判断队列是否满足,返回一个布尔值,表示当前队列是否满了。
print(q.full())#True
q.empty()#判断队列是否为空,返回一个布尔值,表示当前队列是否为空
q.qsize()#返回一个数值,表示当前队列的消息数量
#步骤4:从队列中取消息
value = q.get()
print(value)#消息1
value = q.get()
print(value)#消息2
value = q.get()
print(value)#消息2
print(q.empty())#True
#队列阻塞
from multiprocessing import Queue
q = Queue(3)
# 存
# 格式:put(obj, block=True, timeout=None)
q.put(1)
q.put(2)
q.put(3)
# q.put(4)#由于默认为阻塞状态,程序将会阻塞在这里
#参数block:默认为真,表示如果队列已经满,程序阻塞。
# 如果false,表示不需要阻塞,如果队列已经满,将会抛出异常。
#参数timeout:表示阻塞时间,单位为秒。
try:
# q.put(4,block=False)
q.put(4,block=True,timeout=2)
except:
print('消息队列已经满,现在有消息数量:%s'%(q.qsize()))
print('~~~~~~~~~~~~~~~~~~~~~~')
#put方式的另一种形式
# q.put_nowait(5) 等价于 q.put(5,block=False)
# try:
# q.put_nowait(5)
# except:
# print('消息队列已经满,现在有消息数量:%s'%(q.qsize()))
#推荐方式,先判断消息队列是否已经满了,再往里放
if not q.full():
q.put(6)
#~~~~~~~~~~~~~~~~~~~~~~~~~~~华丽的分隔线~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# 取
# 格式:get(block=True, timeout=None)
q.get()
q.get()
q.get()
print('当前队列的消息个数:%s'%(q.qsize()))
# q.get()#如果队列为空,一直阻塞
# q.get(block=False)
# q.get(block=True,timeout=2)
#在读取消息时,判断是否为空
if not q.empty():
q.get()
# 另一种写法
# q.get_nowait()#强取 等价于 q.get(block=False)
print('~~~~~~~~~~~~~~~~~~~~')
#案例:队列操作案例
from multiprocessing import Process
from multiprocessing import Queue
import time
import random
#需求:
#在父进程当中创建两个子进程,一个往Queue里写数据,一个从Queue里读取数据:
#写操作
def write(q):
# 由于进程之间的通信只能通过队列来完成,
# 所以队列中最后一个元素"Exit"为退出读取进程的标志
for value in ['A','B','C','D','Exit']:
print('Put %s from Queue'%(value))
q.put(value)
time.sleep(random.random())
#读操作
def read(q):
print('我要从队列当中读取数据了~')
while True:
if not q.empty():
value = q.get()
if value == 'Exit':
break
else:
print('Get %s from Queue'%(value))
time.sleep(random.random())
if __name__ == '__main__':
#创建一个队列
q = Queue()
#创建两个进程
pw = Process(target=write,args=(q,))
pr = Process(target=read,args=(q,))
#启动
pw.start()#写入
# pw.join()#此时pw进程结束
pr.start()
# pr.join()
print('所有读写进程结束')
#多线程
from threading import Thread
from urllib import request
def downloader(url):
file_name = url.split('/')[-1]
response = request.urlopen(url)
content = response.read()
with open(file_name,'wb') as fp:
fp.write(content)
print('子线程任务完成!')
if __name__ == '__main__':
#主进程下面有一个主线程
url_list = [
'http://www.langlang2017.com/img/logo.png',
'http://www.langlang2017.com/img/langlang1.png'
]
#创建线程
thread_list = []
for url in url_list:
t = Thread(target=downloader,args=(url,))
t.start()#启动线程
thread_list.append(t)
for t in thread_list:
t.join()
print('~~~~~~主线程执行完成~~~~~~~')
#temp
from threading import Thread
import time,random
def downloader(name):
for i in range(10):
print('线程:%s-------------%d'%(name,i))
time.sleep(random.random())
print("---------子线程执行完成----------")
#创建线程
if __name__ == '__main__':
#主进程下有一个主线程
url_list=["http://www.langlang2017.com/img/banner1.png",
"http://www.langlang2017.com/img/banner2.png"]
#创建线程
thread_list=[]
for name in ["线程--小红",'线程--小明']:
t=Thread(target=downloader,args=(name,))
t.start()#启动线程
thread_list.append(t)
for t in thread_list:
t.join()
print("=================主线程执行完成================")
#线程数量
from threading import Thread
import threading
import time
def sing():
for i in range(3):
print('唱第%d首歌'%(i))
time.sleep(1)
def dance():
for i in range(3):
print('跳第%d段舞'%(i))
time.sleep(1)
if __name__ == '__main__':
t1 = Thread(target=sing)
t2 = Thread(target=dance)
t1.start()
t2.start()
#查看线程的数量
while True:
count = len(threading.enumerate())
print('当前线程数量:',count)
if count <=1:
break
#注意:至少得有一个主线程
#线程实现的第二种方式:
from threading import Thread
class MyThread(Thread):
def __init__(self,url):
Thread.__init__(self)
self.url=url
def run(self):
print("线程的业务逻辑代码",self.url)
if __name__ == '__main__':
url=1
t=MyThread(url)
t.start()
#线程共享全局变量
from threading import Thread
from multiprocessing import Process
g_num=100
def work1():
global g_num
for i in range(3):
g_num+=1
print("线程一结果为",g_num)
# print("进程一结果为", g_num)
def work2():
global g_num
for i in range(3):
g_num+=1
print("线程二结果为",g_num)
# print("进程二结果为", g_num)
if __name__ == '__main__':
t1=Thread(target=work1)
t1.start()
t2=Thread(target=work2)
t2.start()
#多线程共享全局变量,多进程不能共享全局变量
# p1=Process(target=work1)
# p1.start()
# p2=Process(target=work2)
# p2.start()
#线程非安全
from threading import Thread
from multiprocessing import Process
g_num=0
def work1():
global g_num
for i in range(1000000):
g_num+=1
def work2():
global g_num
for i in range(1000000):
g_num+=1
if __name__ == '__main__':
# work1()
# work2()
# print(g_num)
t1=Thread(target=work1)
t2=Thread(target=work2)
t1.start()
t2.start()
t1.join()
t2.join()
print("g_num:",g_num)
#解决方案:互斥锁
from threading import Thread,Lock
g_num=0
def work1():
global g_num
for i in range(1000000):
mutex.acquire()#加锁
g_num+=1
mutex.release()#解锁
def work2():
global g_num
for i in range(1000000):
mutex.acquire()
g_num+=1
mutex.release()
mutex=Lock()#创建锁
if __name__ == '__main__':
t1=Thread(target=work1)
t2=Thread(target=work2)
t1.start()
t2.start()
t1.join()
t2.join()
print("g_num:",g_num)
#加锁后会将并行操作变成串行操作