多线程是个坑爹的课题,有多线程就有锁。Python中有低级线程模块_thread、再封装一下就是threading、再来一下就是from concurrent.futures import ThreadPoolExecutor
Lock Lock = _thread._allocate_lock
1 2 3 4 5 6 7 8 9 10 11 a = 0 def change (): global a for i in range (100000 ): a += 1 t = [threading.Thread(target=change) for i in range (100 )] [i.start() for i in t] [i.join() for i in t] print(a)
解决方法就是所有线程共享同一个锁,虽然锁有很多种,但是几乎都是这样暴露给用户使用的 ,这是大家都知道的。一个锁都好理解。如果有两个锁就不是那么好理解了,比如这个有点装逼的例子。交叉打印Hello和World
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 lock_a = Lock() lock_b = Lock() def hello (): for i in range (5 ): lock_a.acquire() time.sleep(0.1 ) print("Hello" ) lock_b.release() def world (): for i in range (5 ): lock_b.acquire() time.sleep(0.1 ) print("World" ) lock_a.release() Thread(target=hello).start() Thread(target=world).start()
RLock 可重入锁允许线程获得锁之后该线程可以无数次的再次获得锁。看起来没什么用,大概也确实没啥用吧,不过考虑一下下面这种场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Change : def __init__ (self ): self.a = 1 self.b = 1 self.lock = RLock() def adda (self ): with self.lock: self.a += 1 def addb (self ): with self.lock: self.b += 1 def addab (self ): with self.lock: self.adda() self.addb()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from threading import Lock, get_identclass RLock : def __init__ (self ): self.lock = Lock() self._owner = None self._count = 0 def acquire (self ): me = get_ident() if self._owner == me: self._count += 1 return rc = self.lock.acquire() if rc: self._owner = me self._count = 1 __enter__ = acquire def release (self ): if self._owner != get_ident(): raise RuntimeError() self._count -= 1 if self._count == 0 : self._owner = None self.lock.release() def __exit__ (self, exc_type, exc_val, exc_tb ): self.release()
Condition 被称为条件变量,这个应该是很少被使用的。它允许传入一个锁,当满足条件的时候触发通知(可以看一下queue模块的实现,当get不到数据的时候就执行wait等待,新加入数据的时候执行notify通知唤醒),这应该是它被称为条件变量的原因(感觉都有点牵强😕),和Lock以及RLock不同。这个东东主要是用来通知其他线程的。大概有两个方法wait(阻塞直到被通知),notify(唤醒wait)。 实现的思路是所有的线程共享同一个锁A,同时有一个共用的锁列表。每当调用wait的时候生成一个新的锁放入锁列表,然后获得该锁,再释放锁A(因为wait必须在获得锁A才能调用,如果不释放则其他线程无法获得锁A)。最绝的来了,再次获得新生成的锁造成死锁 。由此该线程被挂起。直到其他获得锁A的线程执行notify操作。将wait的锁释放。 实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from collections import dequeclass Condition : def __init__ (self ): self._lock = RLock() self._waiters = deque() def __enter__ (self ): return self._lock.__enter__() def __exit__ (self, *args ): return self._lock.__exit__(*args) def wait (self ): assert get_ident() == self._lock._owner waiter = Lock() self._waiters.append(waiter) waiter.acquire() self._lock.release() waiter.acquire() self._lock.acquire() del waiter def notify (self ): assert get_ident() == self._lock._owner for waiter in self._waiters.copy(): waiter.release() self._waiters.remove(waiter)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import threadingimport randomimport timeclass Producer (threading.Thread ): def __init__ (self, integers, condition ): super (Producer, self).__init__() self.integers = integers self.condition = condition def run (self ): for i in range (10 ): integer = random.randint(0 , 256 ) with self.condition: self.integers.append(integer) self.condition.notify() time.sleep(0.5 ) class Consumer (threading.Thread ): def __init__ (self, integers, condition ): super (Consumer, self).__init__() self.integers = integers self.condition = condition def run (self ): with self.condition: while True : while self.integers: integer = self.integers.pop() print(integer) self.condition.wait() integers = [] condition = Condition() Producer(integers, condition).start() Consumer(integers, condition).start()
1 2 3 4 5 6 7 8 9 10 11 12 13 class Consumer (threading.Thread ): def __init__ (self, integers, condition ): super (Consumer, self).__init__() self.integers = integers self.condition = condition def run (self ): with self.condition: while True : self.condition.wait() integer = self.integers.pop() print(integer)
Semaphore & BoundedSeamphore 上面先介绍Condition是有原因的,因为多线程信号量和事件都是基于它生成的。信号量常用于限制对有限资源的访问。比如你有1000个线程,如果每一个线程都去创建数据库连接,那么数据库可能会崩。或者爬虫创建对网站的连接,太过凶残并不好,这个时候用信号量来处理就不错。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class Semaphore : def __init__ (self, value=1 ): self._lock = Condition() self.value = value def __enter__ (self ): with self._lock: if self.value == 0 : self._lock.wait() else : self.value -= 1 def __exit__ (self, exc_type, exc_val, exc_tb ): self.value += 1 self._lock.notify() class BoundedSemaphore (Semaphore ): def __init__ (self, value=1 ): super (BoundedSemaphore, self).__init__(value=value)
Event Event和Condition差不多。区别就是上面提到的,使用Condition,如果在单线程,那么当notify的时候是无法触发wait的,再调用wait线程会被阻塞。但是Event就支持在单线程使用。 提供两个主要方法,set和wait。但是它并不需要先获得锁。使用好像更方便一点,实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Event : def __init__ (self ): self._cond = Condition() self._flag = False def set (self ): with self._cond: self._flag = True self._cond.notify() def clear (self ): with self._cond: self._flag = False def wait (self ): with self._cond: signaled = self._flag if not signaled: signaled = self._cond.wait() return signaled
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import threadingimport randomimport timeclass Producer (threading.Thread ): def __init__ (self, integers, condition ): super (Producer, self).__init__() self.integers = integers self.condition = condition def run (self ): for i in range (10 ): integer = random.randint(0 , 256 ) self.integers.append(integer) self.condition.set () time.sleep(0.5 ) class Consumer (threading.Thread ): def __init__ (self, integers, condition ): super (Consumer, self).__init__() self.integers = integers self.condition = condition def run (self ): while True : while self.integers: integer = self.integers.pop() print(integer) self.condition.wait() integers = [] condition = Event() Producer(integers, condition).start() Consumer(integers, condition).start()