python 多线程并发锁-php 线程 并发
【python】多线程:锁、全局锁、Queue队列和线程池关于锁的使用方法
【介绍】关于如何上锁,拿到钥匙,解锁。
【注意事项】:lock.acquire()和lock.release()必须成对出现,否则可能造成死锁。
为了规避这个问题,可以使用上下文管理器来加锁。 如下:
import threading
lock = threading.Lock()
with lock:
# 这里写想要实现的代码
pass
【说明】with语句会在代码块执行前自动获取锁,执行结束后自动释放锁
为什么要“上”锁?
import threading
import time
g_num = 0
def test1(num):
global g_num
for i in range(num):
mutex.acquire() # 上锁
g_num += 1
mutex.release() # 解锁
print("---test1---g_num=%d"%g_num)
def test2(num):
global g_num
for i in range(num):
mutex.acquire() # 上锁
g_num += 1
mutex.release() # 解锁
print("---test2---g_num=%d"%g_num)
# 创建一个互斥锁
# 默认是未上锁的状态
mutex = threading.Lock()
# 创建2个线程,让他们各自对g_num加1000000次
p1 = threading.Thread(target=test1, args=(1000000,))
p1.start()
p2 = threading.Thread(target=test2, args=(1000000,))
p2.start()
# 等待计算完成
while len(threading.enumerate()) != 1:
time.sleep(1)
print("2个线程对同一个全局变量操作之后的最终结果是:%s" % g_num)
输出:
---test1---g_num=1909909
---test2---g_num=2000000
2个线程对同一个全局变量操作之后的最终结果是:2000000
【总结】进入互斥量后,结果符合预期。
关于死锁
[解释] 线程间共享多个资源时,如果两个线程分别占用部分资源,同时等待对方的资源,就会产生死锁。
让我们看一个例子:
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()
# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name+'----do1---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 对mutexA解锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()
# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name+'----do2---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 对mutexB解锁
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
【重要】标准锁对象(threading.Lock)不关心当前哪个线程拥有锁; 如果锁已经被占用,任何其他试图获取锁的线程都会被阻塞,包括已经拥有锁的线程也会被阻塞。
【获取和释放锁的语句也可以用Python的with来实现】
[知识提升] 如果一个线程修改了两个函数调用之间的共享资源,那么我们最终会得到不一致的数据。 【最直接的解决方案】也是在这个函数中使用lock。 然而,这是不可行的。 里面的两个访问函数会阻塞,因为外面的语句已经持有了锁。
备受争议的GIL(全局锁) 什么是GIL?
【说明】任何Python线程在执行之前,都必须先获得GIL锁。 然后,每执行完100个字节码,解释器就会自动释放GIL锁,让其他线程有机会执行。 这个GIL全局锁其实是锁住了所有线程的执行代码。 因此python 多线程并发锁,多线程只能在Python中交替执行。 即使在100核的CPU上运行100个线程,也只能使用1个核。
GIL执行过程
5). 设置线程休眠;
6). 解锁 GIL;
【重点】python解释器中任何时刻都只有一个线程在执行;
I/O密集型(输入、输出):
计算密集型(cpu总是被占用):
那么如何避免受到GIL的影响呢? 队列
说到多线程就不得不说Queue队列,这是一个线程向另一个线程发送数据最安全的方式。 创建一个由多个线程共享的 Queue 对象,这些线程使用 put() 和 get() 操作向队列添加或删除元素。
关于Queue队列的重要函数
from queue import Queue
# maxsize默认为0,不受限
# 一旦>0,而消息数又达到限制,q.put()也将阻塞
q = Queue(maxsize=0)
# 阻塞程序,等待队列消息。
q.get()
# 获取消息,设置超时时间
q.get(timeout=5.0)
# 发送消息
q.put()
# 等待所有的消息都被消费完
q.join()
# 以下三个方法,知道就好,代码中不要使用
# 查询当前队列的消息个数
q.qsize()
# 队列消息是否都被消费完,True/False
q.empty()
# 检测队列里消息是否已满
q.full()
生产者消费者模型(继承实现) 什么是生产者消费者模型?
某个模块负责生产+数据,可以认为是生产者;
另一个模块负责处理产生的数据,可以将其视为消费者。
在生产者和消费者之间增加一个缓冲区(queue队列实现),可以认为是一个store。
【生产者】===》【缓冲区】===》【消费者】
生产者消费者概念图
生产者消费者模型在线程池方面的优势
在Python3中python 多线程并发锁,创建线程池是通过concurrent.futures函数库中的ThreadPoolExecutor类实现的。
future对象:将在未来某个时刻完成操作的对象。 提交方法可以返回一个未来的对象。
先看例子:简单的线程池实现
#线程执行的函数
def add(n1,n2):
v = n1 + n2
print('add :', v , ', tid:',threading.currentThread().ident)
time.sleep(n1)
return v
#通过submit把需要执行的函数扔进线程池中.
#submit 直接返回一个future对象
ex = ThreadPoolExecutor(max_workers=3) #制定最多运行N个线程
f1 = ex.submit(add,2,3)
f2 = ex.submit(add,2,2)
print('main thread running')
print(f1.done()) #done 看看任务结束了没
print(f1.result()) #获取结果 ,阻塞方法
简单的线程池实现
import Queue
import threading
import time
'''
这个简单的例子的想法是通过:
1、利用Queue特性,在Queue里创建多个线程对象
2、那我执行代码的时候,去queue里去拿线程!
如果线程池里有可用的,直接拿。
如果线程池里没有可用,那就等。
3、线程执行完毕,归还给线程池
'''
class ThreadPool(object): #创建线程池类
def __init__(self,max_thread=20):#构造方法,设置最大的线程数为20
self.queue = Queue.Queue(max_thread) #创建一个队列
for i in xrange(max_thread):#循环把线程对象加入到队列中
self.queue.put(threading.Thread)
#把线程的类名放进去,执行完这个Queue
def get_thread(self):#定义方法从队列里获取线程
return self.queue.get()
def add_thread(self):#定义方法在队列里添加线程
self.queue.put(threading.Thread)
pool = ThreadPool(10)
def func(arg,p):
print arg
time.sleep(2)
p.add_thread() #当前线程执行完了,我在队列里加一个线程!
for i in xrange(300):
thread = pool.get_thread() #线程池10个线程,每一次循环拿走一个!默认queue.get(),如果队列里没有数据就会等待。
t = thread(target=func,args=(i,pool))
t.start()
'''
self.queue.put(threading.Thread) 添加的是类不是对象,在内存中如果相同的类只占一份内存空间
并且如果这里存储的是对象的话每次都的新增都得在内存中开辟一段内存空间
还有如果是对象的话:下面的这个语句就不能这么调用了!
for i in xrange(300):
thread = pool.get_thread()
t = thread(target=func,args=(i,pool))
t.start()
通过查看源码可以知道,在thread的构造函数中:self.__args = args self.__target = target 都是私有字段那么调用就应该这么写
for i in xrange(300):
ret = pool.get_thread()
ret._Thread__target = func
ret._Thread__args = (i,pool)
ret.start()
[地图方法]
返回值与提交的序列一致。 也就是说,它是有序的。
#下面是map 方法的简单使用.
#注意:map 返回是一个生成器 ,并且是有序的
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
print('thread id:',threading.currentThread().ident,' 访问了:',url)
#这里使用了requests 模块
return requests.get(url)
ex = ThreadPoolExecutor(max_workers=3)
#内部迭代中, 每个url 开启一个线程
res_iter = ex.map(get_html,URLS)
for res in res_iter:
#此时将阻塞 , 直到线程完成或异常
print('url:%s ,len: %d'%(res.url,len(res.text)))
【as_completed】
用于解决提交完成时,避免反复调用future.done或使用future.result。
concurrent.futures.as_completed(fs, timeout=None):返回一个在迭代期间阻塞的生成器。
【关联】map方法的返回是有顺序的,as_completed是先完成/失败再返回的线程。
【给我一个栗子】
#as_completed 返回一个生成器,用于迭代, 一旦一个线程完成(或失败) 就返回
URLS = ['http://www.baidu.com', 'http://www.qq.com', 'http://www.sina.com.cn']
def get_html(url):
time.sleep(1)
print('thread id:',threading.currentThread().ident,' 访问了:',url)
return requests.get(url) #这里使用了requests 模块
ex = ThreadPoolExecutor(max_workers=3) #最多3个线程
future_tasks = [ex.submit(get_html,url) for url in URLS] #创建3个future对象
for future in as_completed(future_tasks): #迭代生成器
try:
resp = future.result()
except Exception as e:
print('%s'%e)
else:
print('%s has %d bytes!'%(resp.url, len(resp.text)))
输出:
"""
thread id: 5160 访问了: http://www.baidu.com
thread id: 7752 访问了: http://www.sina.com.cn
thread id: 5928 访问了: http://www.qq.com
http://www.qq.com/ has 240668 bytes!
http://www.baidu.com/ has 2381 bytes!
https://www.sina.com.cn/ has 577244 bytes!
"""
【重点】关于回调函数add_done_callback(fn)
回调函数在调用线程完成后在同一线程中调用。
import os,sys,time,requests,threading from concurrent import futuresURLS = ['http://baidu.com','http://www.qq.com','http://www.sina.com.cn ' ]def load_url(url):print('tid:',threading.currentThread().ident,',url:',url)with requests.get(url) as resp:return resp.contentdef call_back(obj):打印 ('->>>>>>>>call_back , tid:',threading.currentThread().ident, ',obj:',obj)with futures.ThreadPoolExecutor(max_workers=3) as ex:# mp = { ex.submit(load_url,url) : url for URLS}mp = dict()for url in URLS:f = ex.submit(load_url,url)mp[f] = urlf.add_done_callback(call_back)for f in期货 .as_completed(mp):url = mp[f]try:data = f.result()except Exception as exc:print(exc, ',url:',url)else:print('url:', url, ' ,len:',len(data),',data[:20]:',data[:20])"""tid: 7128 ,url: tid: 7892 ,url: tid: 3712 ,url: -> > >>>>>>>call_back , tid: 7892 ,obj: url: ,len: 251215 ,data[:20]: b'\n>>>>>>>>call_back , tid: 3712 ,obj: url : ,len: 577333 ,数据[:20]: b'\n0