多线程是个坑爹的课题,有多线程就有锁。Python中有低级线程模块_thread、再封装一下就是threading、再来一下就是from concurrent.futures import ThreadPoolExecutor
。其中涉及到的锁大概有Lock、RLock、Condition、Semaphore、BoundedSeamphore、Event。这么多看起来怪吓人的,其实这么多的锁全部只是由Lock演化出来的
Lock Lock = _thread._allocate_lock
这个是直接由最低级别的_thread直接引用过来的。创建锁的时候是处于未被锁定的状态。这应该是最容易被理解和使用的一种锁了。刚学习的时候铁定写过类似下面这种示例。本质原因是python中的原子操作是针对单条指令。而a+=1被翻译成了多条指令。执行过程中任何时刻可能被打断
1 2 3 4 5 6 7 8 9 10 11 a = 0 def change (): global a for i in range (100000 ): a += 1 t = [threading.Thread(target=change) for i in range (100 )] [i.start() for i in t] [i.join() for i in t] print(a)
解决方法就是所有线程共享同一个锁,虽然锁有很多种,但是几乎都是这样暴露给用户使用的 ,这是大家都知道的。一个锁都好理解。如果有两个锁就不是那么好理解了,比如这个有点装逼的例子。交叉打印Hello和World
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 lock_a = Lock() lock_b = Lock() def hello (): for i in range (5 ): lock_a.acquire() time.sleep(0.1 ) print("Hello" ) lock_b.release() def world (): for i in range (5 ): lock_b.acquire() time.sleep(0.1 ) print("World" ) lock_a.release() Thread(target=hello).start() Thread(target=world).start()
RLock 可重入锁允许线程获得锁之后该线程可以无数次的再次获得锁。看起来没什么用,大概也确实没啥用吧,不过考虑一下下面这种场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Change : def __init__ (self ): self.a = 1 self.b = 1 self.lock = RLock() def adda (self ): with self.lock: self.a += 1 def addb (self ): with self.lock: self.b += 1 def addab (self ): with self.lock: self.adda() self.addb()
你有一个类。其中adda改变a,addb改变b.还提供方法addab同时调用前两者。如果没有可重入锁那么你将不得不把两个函数粘贴到addab下面。它的实现原理是获取锁的时候根据get_ident得到当前线程标识。如果和当前锁的标识一样则仅仅是给值加1,否则就尝试去获取锁。简要代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from threading import Lock, get_identclass RLock : def __init__ (self ): self.lock = Lock() self._owner = None self._count = 0 def acquire (self ): me = get_ident() if self._owner == me: self._count += 1 return rc = self.lock.acquire() if rc: self._owner = me self._count = 1 __enter__ = acquire def release (self ): if self._owner != get_ident(): raise RuntimeError() self._count -= 1 if self._count == 0 : self._owner = None self.lock.release() def __exit__ (self, exc_type, exc_val, exc_tb ): self.release()
Condition 被称为条件变量,这个应该是很少被使用的。它允许传入一个锁,当满足条件的时候触发通知(可以看一下queue模块的实现,当get不到数据的时候就执行wait等待,新加入数据的时候执行notify通知唤醒),这应该是它被称为条件变量的原因(感觉都有点牵强😕),和Lock以及RLock不同。这个东东主要是用来通知其他线程的。大概有两个方法wait(阻塞直到被通知),notify(唤醒wait)。 实现的思路是所有的线程共享同一个锁A,同时有一个共用的锁列表。每当调用wait的时候生成一个新的锁放入锁列表,然后获得该锁,再释放锁A(因为wait必须在获得锁A才能调用,如果不释放则其他线程无法获得锁A)。最绝的来了,再次获得新生成的锁造成死锁 。由此该线程被挂起。直到其他获得锁A的线程执行notify操作。将wait的锁释放。 实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from collections import dequeclass Condition : def __init__ (self ): self._lock = RLock() self._waiters = deque() def __enter__ (self ): return self._lock.__enter__() def __exit__ (self, *args ): return self._lock.__exit__(*args) def wait (self ): assert get_ident() == self._lock._owner waiter = Lock() self._waiters.append(waiter) waiter.acquire() self._lock.release() waiter.acquire() self._lock.acquire() del waiter def notify (self ): assert get_ident() == self._lock._owner for waiter in self._waiters.copy(): waiter.release() self._waiters.remove(waiter)
这个东东吧其实不太适合被直接使用。但是网上还是有人硬生生凑了一个消费者生产者的例子出来做演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import threadingimport randomimport timeclass Producer (threading.Thread ): def __init__ (self, integers, condition ): super (Producer, self).__init__() self.integers = integers self.condition = condition def run (self ): for i in range (10 ): integer = random.randint(0 , 256 ) with self.condition: self.integers.append(integer) self.condition.notify() time.sleep(0.5 ) class Consumer (threading.Thread ): def __init__ (self, integers, condition ): super (Consumer, self).__init__() self.integers = integers self.condition = condition def run (self ): with self.condition: while True : while self.integers: integer = self.integers.pop() print(integer) self.condition.wait() integers = [] condition = Condition() Producer(integers, condition).start() Consumer(integers, condition).start()
这个例子有个地方需要注意。notify和wait并不是一一对应的。不代表执行了notify,wait部分一定会被执行。所以如果消费者部分这样写会造成部分没有被处理
1 2 3 4 5 6 7 8 9 10 11 12 13 class Consumer (threading.Thread ): def __init__ (self, integers, condition ): super (Consumer, self).__init__() self.integers = integers self.condition = condition def run (self ): with self.condition: while True : self.condition.wait() integer = self.integers.pop() print(integer)
Semaphore & BoundedSeamphore 上面先介绍Condition是有原因的,因为多线程信号量和事件都是基于它生成的。信号量常用于限制对有限资源的访问。比如你有1000个线程,如果每一个线程都去创建数据库连接,那么数据库可能会崩。或者爬虫创建对网站的连接,太过凶残并不好,这个时候用信号量来处理就不错。
这里如果直接使用Lock来构建。那么一个锁同时只允许一个线程来获得显然是不达不到要求的。祭出刚才构建的Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class Semaphore : def __init__ (self, value=1 ): self._lock = Condition() self.value = value def __enter__ (self ): with self._lock: if self.value == 0 : self._lock.wait() else : self.value -= 1 def __exit__ (self, exc_type, exc_val, exc_tb ): self.value += 1 self._lock.notify() class BoundedSemaphore (Semaphore ): def __init__ (self, value=1 ): super (BoundedSemaphore, self).__init__(value=value)
Event Event和Condition差不多。区别就是上面提到的,使用Condition,如果在单线程,那么当notify的时候是无法触发wait的,再调用wait线程会被阻塞。但是Event就支持在单线程使用。 提供两个主要方法,set和wait。但是它并不需要先获得锁。使用好像更方便一点,实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Event : def __init__ (self ): self._cond = Condition() self._flag = False def set (self ): with self._cond: self._flag = True self._cond.notify() def clear (self ): with self._cond: self._flag = False def wait (self ): with self._cond: signaled = self._flag if not signaled: signaled = self._cond.wait() return signaled
如果把刚才那个生产者消费者换到它上面来,大概是这个样子了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import threadingimport randomimport timeclass Producer (threading.Thread ): def __init__ (self, integers, condition ): super (Producer, self).__init__() self.integers = integers self.condition = condition def run (self ): for i in range (10 ): integer = random.randint(0 , 256 ) self.integers.append(integer) self.condition.set () time.sleep(0.5 ) class Consumer (threading.Thread ): def __init__ (self, integers, condition ): super (Consumer, self).__init__() self.integers = integers self.condition = condition def run (self ): while True : while self.integers: integer = self.integers.pop() print(integer) self.condition.wait() integers = [] condition = Event() Producer(integers, condition).start() Consumer(integers, condition).start()
当然,没啥必要用这些比较低级的东东。能用通用高级数据结构就直接用,这样大家都比较好理解,总结就是只是给共享资源加锁用Lock、限制资源访问使用BoundedSeamphore,唤醒相关线程使用Event