零基础学 Python(76):线程消息通信机制

作者: 王炳明 分类: Python 基础教程 发布时间: 2021-07-14 12:51 热度:1,547

查看本系列教程目录:请点击:零基础小白入门 Python 系列教程


前面我已经向大家介绍了,如何使用创建线程,启动线程。相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了。

可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的。如果仅仅依靠前面学的那点浅薄的知识,是远远不够的。

那今天,我们就来探讨一下如何控制线程的触发执行。

要实现对多个线程进行控制,其实本质上就是消息通信机制在起作用,利用这个机制发送指令,告诉线程,什么时候可以执行,什么时候不可以执行,执行什么内容。

经过我的总结,线程中通信方法大致有如下三种:

  • threading.Event
  • threading.Condition
  • queue.Queue

接下来我们来一一探讨下。


1. Event事件

Python提供了非常简单的通信机制 Threading.Event,通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活

关于Event的使用也超级简单,就三个函数

event = threading.Event()

# 重置event,使得所有该event事件都处于待命状态
event.clear()

# 等待接收event的指令,决定是否阻塞程序执行
event.wait()

# 发送event指令,使所有设置该event事件的线程执行
event.set()

举个例子来看下。

import time
import threading


class MyThread(threading.Thread):
    def __init__(self, name, event):
        super().__init__()
        self.name = name
        self.event = event

    def run(self):
        print('Thread: {} start at {}'.format(self.name, time.ctime(time.time())))
        # 等待event.set()后,才能往下执行
        self.event.wait()
        print('Thread: {} finish at {}'.format(self.name, time.ctime(time.time())))


threads = []
event = threading.Event()

# 定义五个线程
[threads.append(MyThread(str(i), event)) for i in range(1,5)]

# 重置event,使得event.wait()起到阻塞作用
event.clear()

# 启动所有线程
[t.start() for t in threads]

print('等待5s...')
time.sleep(5)

print('唤醒所有线程...')
event.set()

执行一下,看看结果

Thread: 1 start at Sun May 13 20:38:08 2018
Thread: 2 start at Sun May 13 20:38:08 2018
Thread: 3 start at Sun May 13 20:38:08 2018
Thread: 4 start at Sun May 13 20:38:08 2018

等待5s...

唤醒所有线程...
Thread: 1 finish at Sun May 13 20:38:13 2018
Thread: 4 finish at Sun May 13 20:38:13 2018
Thread: 2 finish at Sun May 13 20:38:13 2018
Thread: 3 finish at Sun May 13 20:38:13 2018

可见在所有线程都启动(start())后,并不会执行完,而是都在self.event.wait()止住了,需要我们通过event.set()来给所有线程发送执行指令才能往下执行。

2. Condition

Condition和Event 是类似的,并没有多大区别。

同样,Condition也只需要掌握几个函数即可。

cond = threading.Condition()

# 类似lock.acquire()
cond.acquire()

# 类似lock.release()
cond.release()

# 等待指定触发,同时会释放对锁的获取,直到被notify才重新占有琐。
cond.wait()

# 发送指定,触发执行
cond.notify()

举个网上一个比较趣的捉迷藏的例子来看看

import threading, time

class Hider(threading.Thread):
    def __init__(self, cond, name):
        super(Hider, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        time.sleep(1)  #确保先运行Seeker中的方法
        self.cond.acquire()

        print(self.name + ': 我已经把眼睛蒙上了')
        self.cond.notify()
        self.cond.wait()
        print(self.name + ': 我找到你了哦 ~_~')
        self.cond.notify() 

        self.cond.release()
        print(self.name + ': 我赢了')

class Seeker(threading.Thread):
    def __init__(self, cond, name):
        super(Seeker, self).__init__()
        self.cond = cond
        self.name = name

    def run(self):
        self.cond.acquire()
        self.cond.wait()
        print(self.name + ': 我已经藏好了,你快来找我吧')
        self.cond.notify()
        self.cond.wait()
        self.cond.release()
        print(self.name + ': 被你找到了,哎~~~')

cond = threading.Condition()
seeker = Seeker(cond, 'seeker')
hider = Hider(cond, 'hider')
seeker.start()
hider.start()

通过cond来通信,阻塞自己,并使对方执行。从而,达到有顺序的执行。
看下结果

hider:   我已经把眼睛蒙上了
seeker:  我已经藏好了,你快来找我吧
hider:   我找到你了 ~_~
hider:   我赢了
seeker:  被你找到了,哎~~~

3. Queue队列

最后一个,队列,它是本节的重点,因为它是我们日常开发中最使用频率最高的。

从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用put()get() 操作来向队列中发送和获取元素。

同样,对于Queue,我们也只需要掌握几个函数即可。

from queue import Queue
# maxsize默认为0,不受限
# 一旦>0,而消息数又达到限制,q.put()也将阻塞
q = Queue(maxsize=0)

# 默认阻塞程序,等待队列消息,可设置超时时间
q.get(block=True, timeout=None)

# 发送消息:默认会阻塞程序至队列中有空闲位置放入数据
q.put(item, block=True, timeout=None)

# 等待所有的消息都被消费完
q.join()


# 通知队列任务处理已经完成,当所有任务都处理完成时,join() 阻塞将会解除
q.task_done()

以下三个方法,知道就好,一般不需要使用

# 查询当前队列的消息个数
q.qsize()

# 队列消息是否都被消费完,返回 True/False
q.empty()

# 检测队列里消息是否已满
q.full()

函数会比之前的多一些,同时也从另一方面说明了其功能更加丰富。

我来举个老师点名的例子。

# coding=utf-8# /usr/bin/env python'''Author: wangbmEmail: wongbingming@163.comWechat: mrbensonwonBlog: python-online.cn公众号:Python编程时光date: 2020/9/20 下午7:30desc: '''__author__ = 'wangbm'from queue import Queuefrom threading import Threadimport timeclass Student:    def __init__(self, name):        self.name = name    def speak(self):        print("{}:到!".format(self.name))class Teacher:    def __init__(self, queue):        super().__init__()        self.queue=queue    def call(self, student_name):        if student_name == "exit":            print("点名结束,开始上课..")        else:            print("老师:{}来了没?".format(student_name))            # 发送消息,要点谁的名        self.queue.put(student_name)class CallManager(Thread):    def __init__(self, queue):        super().__init__()        self.students = {}        self.queue = queue    def put(self, student):        self.students.setdefault(student.name, student)    def run(self):        while True:            # 阻塞程序,时刻监听老师,接收消息            student_name = queue.get()            if student_name == "exit":                break            elif student_name in self.students:                self.students[student_name].speak()            else:                print("老师,咱班,没有 {} 这个人".format(student_name))queue = Queue()teacher = Teacher(queue=queue)s1 = Student(name="小明")s2 = Student(name="小亮")cm = CallManager(queue)cm.put(s1)cm.put(s2)cm.start()print('开始点名~')teacher.call('小明')time.sleep(1)teacher.call('小亮')time.sleep(1)teacher.call("exit")

运行结果如下

开始点名~老师:小明来了没?小明:到!老师:小亮来了没?小亮:到!点名结束,开始上课..

其实 queue 还有一个很重要的方法,Queue.task_done()

如果不明白它的原理,我们在写程序,就很有可能卡死。

当我们使用 Queue.get() 从队列取出数据后,这个数据有没有被正常消费,是很重要的。

如果数据没有被正常消费,那么Queue会认为这个任务还在执行中,此时你使用 Queue.join() 会一直阻塞,即使此时你的队列里已经没有消息了。

那么如何解决这种一直阻塞的问题呢?

就是在我们正常消费完数据后,记得调用一下 Queue.task_done(),说明队列这个任务已经结束了。

当队列内部的任务计数器归于零时,调用 Queue.join() 就不会再阻塞了。

要理解这个过程,请参考 http://python.iswbm.com/en/latest/c02/c02_06.html 里自定义线程池的的例子。

4. 消息队列的先进先出

消息队列可不是只有queue.Queue这一个类,除它之外,还有queue.LifoQueuequeue.PriorityQueue这两个类。

从名字上,对于他们之间的区别,你大概也能猜到一二吧。

queue.Queue:先进先出队列
queue.LifoQueue:后进先出队列
queue.PriorityQueue:优先级队列

先来看看,我们的老朋友,queue.Queue
所谓的先进先出(FIFO,First in First Out),就是先进入队列的消息,将优先被消费。
这和我们日常排队买菜是一样的,先排队的人肯定是先买到菜。

用代码来说明一下

import queueq = queue.Queue()for i in range(5):    q.put(i)while not q.empty():    print q.get()

看看输出,符合我们先进先出的预期。存入队列的顺序是01234,被消费的顺序也是01234

01234

再来看看Queue.LifoQueue,后进先出,就是后进入消息队列的,将优先被消费。

这和我们羽毛球筒是一样的,最后放进羽毛球筒的球,会被第一个取出使用。

用代码来看下

import queueq = queue.LifoQueue()for i in range(5):    q.put(i)while not q.empty():    print q.get()

来看看输出,符合我们后进后出的预期。存入队列的顺序是01234,被消费的顺序也是43210

43210

最后来看看Queue.PriorityQueue,优先级队列。
这和我们日常生活中的会员机制有些类似,办了金卡的人比银卡的服务优先,办了银卡的人比不办卡的人服务优先。

来用代码看一下

from queue import PriorityQueue# 重新定义一个类,继承自PriorityQueueclass MyPriorityQueue(PriorityQueue):    def __init__(self):        PriorityQueue.__init__(self)        self.counter = 0    def put(self, item, priority):        PriorityQueue.put(self, (priority, self.counter, item))        self.counter += 1    def get(self, *args, **kwargs):        _, _, item = PriorityQueue.get(self, *args, **kwargs)        return itemqueue = MyPriorityQueue()queue.put('item2', 2)queue.put('item5', 5)queue.put('item3', 3)queue.put('item4', 4)queue.put('item1', 1)while True:    print(queue.get())

来看看输出,符合我们的预期。我们存入入队列的顺序是25341,对应的优先级也是25341,可是被消费的顺序丝毫不受传入顺序的影响,而是根据指定的优先级来消费。

item1item2item3item4item5

5. 总结一下

学习了以上三种通信方法,我们很容易就能发现EventCondition 是threading模块原生提供的模块,原理简单,功能单一,它能发送 TrueFalse 的指令,所以只能适用于某些简单的场景中。

Queue则是比较高级的模块,它可能发送任何类型的消息,包括字符串、字典等。其内部实现其实也引用了Condition模块(譬如putget函数的阻塞),正是其对Condition进行了功能扩展,所以功能更加丰富,更能满足实际应用。

文章有帮助,请作者喝杯咖啡?

发表评论