昨天花了一整节来讨论一些关键问题:
- 多线程
- 多线程通信
- PySimpleGUI 异步模式
光说不练假把式,让我们行动起来。这节我们一鼓作气实现 6.2 提到的第一步:web 服务接收数据并显示在视窗里。
一个关键约束
前面提到过,这里再单独提示一下:PySimpleGUI 所有调用都必须在主线程里,不能在子线程里调用。
利用多线程同时跑 2 个死循环
先看一下如何使用 threading 库启动子线程。这是一个简单的多线程例子,今天会在此基础上不断增加功能:
import threading
import time
def thread_work(host, port):
# 子线程里的死循环
while True:
print("Web server runs on http://{}:{}".format(host, port))
# 休眠 3s
time.sleep(3)
HOST = "localhost"
PORT = 8080
worker = threading.Thread(target=thread_work, args = [HOST, PORT])
worker.start()
# 主线程里的死循环
while True:
print("Main thread running~")
# 休眠 2s
time.sleep(1)
我们用 threading.Thread 创建一个线程对象,保存在 worker 中,其中 threading.Thread 接受两个参数:
- target:在子线程中执行的函数;
- args:子线程运行函数的参数。
当线程对象被创建的时候,它还没有启动,需要调用 worker.start 来启动线程。
让我们运行起来看一下:
- 主线程每 1s 输出一次 Main thread running~
- 子线程每 3s 输出一次 Web server runs on http://localhost:8080
可以看到同时运行着 2 个死循环,并且各自独立的向屏幕输出数据。thread_work 函数运行在 threading 库创建的新线程中,相对于原来的线程,它被认为是子线程;原来的线程则认为是主线程,或者子线程的父线程。
红色箭头指示的就是执行流分叉的位置。执行流从线程 start 开始分支,子线程脱离主线程独立运行,以蓝色框表示;主线程则继承最开始的衣钵,继续执行,途中橙色线框所示。
子线程运行 web 服务
由于视窗线程只能是主线程,bottle 库所在的 web 服务线程就只能是子线程了。
将上一节的 web 服务代码转移到子线程的 thread_work 函数内部,在子线程接收客户端数据,代码调整如下,注意新增注释:
import threading
import time
# 导入 bottle 库
from bottle import run, request, post
# Post 请求处理函数定义不用在 thread_work 里定义
@post('/msg')
def index():
print(request.body.read().decode("utf-8"))
return "<h1>OK</h1>"
# bottle 真正的死循环在 run 函数中,将它转移到 thread_work 里
# run(host='localhost', port=8080)
def thread_work(host, port):
# 子线程里的死循环
#while True:
# print("Web server runs on http://{}:{}".format(host, port))
# # 休眠 3s
# time.sleep(3)
# run 函数本身就是死循环,上面的代码就不需要了
run(host=host, port=port)
HOST = "localhost"
PORT = 8080
worker = threading.Thread(target=thread_work, args = [HOST, PORT])
worker.start()
# 主线程里的死循环
while True:
print("Main thread running~")
# 休眠 2s
time.sleep(1)
这里用我昨天写的 HTTP 调试工具配合调试。在 HTTP 调试工具中,填写 Url 为 localhost:8080/msg,在 Body 框中填写要 Post 给 web 服务的数据,点击 Submit 按钮,就会输出中将调试工具打印出收到的消息。
- 首先观察输出,bottle 已经提示 web 服务运行在 localhost:8080;
- 同时看到主线程每 1s 输出一次 Main thread running~;
- 任何时候 Post 数据都会被立刻输出;
- 子线程的 web 服务接收、输出数据没有影响主线程打印 Main thread running~。
显然,程序正在以多线程并行处理无疑。
主线程运行视窗
我们能且只能在主线程中运行 PySimpleGUI 视窗,这是在其文档中写的很清楚了!
还记得 PySimpleGUI 视窗编程三部曲吧:
- 定义蓝图 layout;
- 创建 Window 对象
- 在事件循环中 read,并处理 event, values
这块不再赘述,直接贴上新增代码:
# 在代码开始的位置导入 PySimpleGUI 库
import PySimpleGUI as sg
# 就显示一个多行文本框,大小是 50 * 5,单位是应为字符,并通过 disabled 禁止用户编辑内容,使其只读
layout = [ [sg.Multiline( size = (50, 5), key = "-MSG_SCREEN-", do_not_clear=True, disabled=True)]]
window = sg.Window("Msg App", layout = layout)
# 主线程里的死循环
while True:
# 使用 read 函数异步模式,100 毫秒超时,通过 timeout_key 自定义了超时 event 的名称
event, values = window.read(timeout = 100, timeout_key = "-TIMEOUT-")
if event is None :
break
if event == "-TIMEOUT-":
# 目前什么都不用做,不要在这里 print,会产生大量输出
continue
完整代码请参看结尾源码。
可以清楚的看到,显示视窗的同时,web 服务线程也能正常接收 HTTP 调试工具发来的数据。
web 服务线程向视窗线程发消息
我们会用到 queue 库,创建一个 “先进先出” 队列,充当 web 服务线程与视窗线程之间的数据管道。
先看一下 queue 库的基本用法:
import queue
# 创建队列,可以指定一个 int 参数,限制队列最大长度,避免挤压过多数据,由于视窗每 100 毫秒就检查一次,就不限制队列长度了。
q = queue.Queue()
# 将 ‘a’ 从一端放入队列,除了 string,其他各种对象都可以放入
q.put('a')
# 从另一端取出, chart_ 此时是 ’a‘,如果 q 是空的,这里会阻塞,直到 q 中有新数据
char_ = q.get()
# 我们使用非阻塞模式,如果 q 是空的,会触发 queue.Empty 异常,注意捕获异常。
char_ = q.get_nowait()
再回顾上一次的图:
我们要在:
- web 服务线程中,收到消息就 put 到队列中;
- 主线程超时事件中,使用 get_nowait() 方法尝试获取队列中的数据。
思路非常清晰了。
首先在启动子线程之前,创建全局的 Queue 对象 que,否则线程中可能会访问到未定义的变量。
# 在代码开头导入 queue
import queue
# 创建队列,这一行在 worker.start() 之前
que = queue.Queue()
在 web 服务线程,我们假定客户端用 Json 格式 Post 数据,其中包含:
- from: 消息发送者姓名
- msg: 消息正文
比如:
{
"from" : "Jiangchuan",
"msg" : "I Love Python!"
}
直接修改 web 服务中 Post 请求处理函数:
# 在代码开头导入 json 库
import json
# Post 请求处理函数定义不用在 thread_work 里定义
@post('/msg')
def index():
# 下一行代码不需要了,注释掉
# print(request.body.read().decode("utf-8"))
# 使用全局变量 que,也就是前面定义的队列
global que
# 解析客户端发来的 json 数据,转换为 Python dict
# json.load 可以自动读取 ByteIO 对象,如果从 string 读取,要用 json.loads
msg = json.load(request.body)
# 打印调试信息,确保真的收到需要的数据了。
print("msg from {}:{}".format(msg["from"], msg["msg"]))
# 将 msg 对象放入队列,等会视窗线程会从队列另一端取出
que.put(msg)
return "<h1>OK</h1>"
这样,只要 web 服务收到数据,就按照 Json 解析,放入队列中。
TIPS
客户端未必一定会发 Json 格式数据过来,所以最好捕获 json.load 函数的异常。 不过我这里省略掉了,暂时假定用户是诚实可爱的吧。
下一步,在主线程的超时事件处理中,从队列中取出消息,并更新到多行文本组件。
逻辑非常简单,直接上代码:
if event == "-TIMEOUT-":
try:
msg = que.get_nowait()
except Exception:
# 可能捕获 queue.Empty 异常,no new msg
continue
else:
# que 中有新消息
# 获取当前显示内容
old_sreen = values["-MSG_SCREEN-"]
# 获得消息正文、来源姓名
new_msg = msg["msg"]
from_ = msg["from"]
# 按格式,拼接一行消息
new_msg_line = "{}: {}".format(from_, new_msg)
# 将新消息追加到多行文本结尾
new_screen = "{}{}".format(old_sreen, new_msg_line) if old_sreen != "" else new_msg_line
# 将新数据更新到视窗
window["-MSG_SCREEN-"].update(new_screen)
检验成果
运行完整代码,发现可以接收客户端发来的 Json 数据,并且实时更新在视窗中。
太棒了,只要客户端发送符合要求的 Json 数据,我们就能实时在视窗中看到消息,而且还有发送者姓名!是不是很像那么回事。
但是你有没有发现,这个程序无法通过点击视窗右上角的 X 退出,这是怎么回事? 这是因为子线程,就是那个运行 thread_work 的线程内部的死循环永远不会结束,退出视窗线程只会结束主线程,但是子线程却还活的好好的。我们需要主线程退出的时候,子线程也退出。
达成这样的目标有很多办法,比如在子线程中发起异常等等。比较简单的方式是让子线程成为 Deamon 模式。你可以在 worker.start 之前调用:
# 设置子线程为 Deamon 模式
worker.setDaemon(True)
worker.start()
就可以了!然后就可以通过 X 退出程序!
总结
总算拿下第一步了,现在我们的聊天软件已经可以接收数据,并且实时展示在视窗上的多行文本组件里。
如何才能像一个真正的聊天软件一样,能收能发呢?想想看?
TIPS
一点提示:如过把 HTTP 调试工具和今天的作品融合到一起呢?
其实聊天程序已经完成了最关键的功能,剩下的需要只是一点创意。
明天继续咯,我觉得明天可以完成这个作品!