GUI Multithreading
개요
지난 파이썬 tkinter를 다루는 게시글엔 화면의 배율에 따라 위젯들의 크기가 동적으로 변하는 것에 대해 학습하였다. 이번 시간에는 특정 이벤트가 발생할 때마다 해당 내역을 업데이트 방법에 대한 내용을 다뤄보도록 하겠다.
본문
개념
본격적인 내용에 들어가기에 앞서, 동기/비동기, 동시성/병렬성, 그리고 멀티쓰레딩/멀티프로세싱의 개념을 알고 있으면 이해가 훨씬 수월하다. 필자는 이 개념들을 해당 블로그를 통해 참고했으며, 만약 아직 해당 개념에 대한 이해가 부족하다면, 블로그 등을 통해 먼저 확인하는 것을 추천한다. 다만, 이 개념들이 다소 복잡하고 어려울 수 있지만, 실무에서 이를 구현하는 데 있어 반드시 완벽한 이해가 필요하지는 않다.
오늘의 목표는 'CAN Message가 수신될 때마다, 메시지와 매핑된 라벨의 text를 업데이트'하는 것이다. 일반적으로 tkinter에서after()라는 함수를 이용하여 동시성을 제공한다. 즉, Tkinter의 이벤트 루프를 사용하여, 지정된 시간 이후에 함수 호출을 예약하는 메커니즘이다. 만약 정해진 시간마다 데이터가 수신된다면 해당 함수를 이용해도 되지만, CAN에서 수신되는 메시지는 '이벤트가 발생할 때마다' 수신되기 때문에 이를 이용하면 수신에 지연이 생긴다.
따라서 threading 모듈을 사용하여 수신 작업을 별도의 스레드에서 처리해야 한다. 기존의 after() 함수는 호출 시간을 명시적으로 지정해야 하는 단점이 있었지만, threading을 사용하면 이벤트가 발생할 때마다 CAN 메시지를 실시간으로 수신할 수 있다. 이를 통해 메시지 수신 후 즉시 해당 메시지에 맞는 라벨의 텍스트를 업데이트하도록 구현하는 것을 목표로 했다.
구현
class CanBus:
def __init__(self):
self.init_class_variable()
self.init()
def init_class_variable(self):
self.bus = None
self.id, self.signal = None, None
def init(self):
self.bus = can.interface.Bus(interface='slcan', channel=Common.find_serial_port(), bitrate=500000)
def shutdown(self):
if self.bus:
self.bus.shutdown()
self.bus = None
def receive(self):
message = self.bus.recv(timeout=0.1)
if message:
self.id = message.arbitration_id
self.signal = message.data
CAN bus를 slcan protocol로 연결해 주는 CanBus 클래스다. 메시지를 수신받으면 id와 signal로 분리시켜 놓았다.
class MainUI:
def __init__(self):
self.init_class_variable()
self.init_root_configure()
self.init_main_ui()
self.init_main_frame()
self.init_main_label()
def init_class_variable(self):
self.root = tk.Tk()
self.ratio = common.Common.get_display_ratio()
self.width, self.height = int(1200/self.ratio), int(500/self.ratio)
self.frames = {}
self.labels = []
def init_root_configure(self):
self.root.grid_columnconfigure(0, weight=1)
self.root.grid_columnconfigure(1, weight=1)
self.root.grid_rowconfigure(0, weight=1)
def init_main_ui(self):
self.root.title("example for multithreading and tkinter")
self.root.geometry(f"{self.width}x{self.height}")
self.root.resizable(False, False)
def init_main_frame(self):
for i in range(2):
frame = tk.Frame(self.root, bg='lightblue')
frame.grid(row=0, column=i, sticky="nsew")
self.frames[f"frame{i}"] = frame
def init_main_label(self):
for i in range(2):
for j in range(4):
label = tk.Label(
self.frames[f'frame{i}'], text=f"{j}번 째 라벨", font=("Helvetica", int(12/self.ratio)), width=50, height=2, anchor="w", padx=10, pady=5)
label.grid(row=j, column=0)
self.labels.append(label)
def run(self):
self.root.mainloop()
UI를 구현한 MainUI 클래스다. 저번 시간에 배웠던 배율에 맞게 반응형 GUI를 구현하였고, 라벨을 8개 배치해 두었다.
class Updater:
def __init__(self, ui: MainUI, bus : CanBus):
self.init_class_variable(ui, bus)
self.ui.root.protocol("WM_DELETE_WINDOW", self.stop)
def init_class_variable(self, ui, bus):
self.ui = ui
self.can = bus
self.running = True
self.stop_threading = threading.Event()
self.messages = {}
self.update_labels = {}
self.cnt = 0
def update_label(self, key):
if self.cnt < len(self.ui.labels):
if key not in self.update_labels:
self.update_labels[key] = self.ui.labels[self.cnt]
self.cnt += 1
def receive_thread(self):
while self.running and not self.stop_threading.is_set():
try:
self.can.receive()
id = self.can.id
signal = self.can.signal
if id and signal:
self.update_label(id)
if id in self.update_labels:
self.update_labels[id].config(
text=f"Signal: " + " ".join(f"{byte:02x}" for byte in signal)
)
except Exception as e:
print(f"Error receiving CAN message: {e}")
break
def run(self):
self.thread = threading.Thread(target=self.receive_thread)
self.thread.start()
self.ui.run()
def stop(self):
self.running = False
self.stop_threading.set()
self.can.shutdown()
if self.thread.is_alive():
self.thread.join()
self.ui.root.destroy()
if __name__ == "__main__":
bus = CanBus()
ui = MainUI()
main = Updater(ui, bus)
main.run()
UI를 업데이트시켜 주는 Updater 클래스다. 여기서 핵심적으로 봐야 할 부분은 receive_thread다. 대기 시간을 따로 설정하지 않고, 메시지가 수신될 때마다 update_label 함수를 호출하여 동일한 id를 가진 라벨이 있는지 확인한 후, 해당 라벨의 text config를 바꿔주는 식으로 구현하였다. 이때 멀티 스레드는 안전하게 종료하는 부분이 중요하므로, 스레드 종료 시 스레드에 할당된 이벤트를 종료하게 구현하였다.
총합본
import common
import tkinter as tk
import can
from common import Common
import threading
class CanBus:
def __init__(self):
self.init_class_variable()
self.init()
def init_class_variable(self):
self.bus = None
self.id, self.signal = None, None
def init(self):
self.bus = can.interface.Bus(interface='slcan', channel=Common.find_serial_port(), bitrate=500000)
def shutdown(self):
if self.bus:
self.bus.shutdown()
self.bus = None
def receive(self):
message = self.bus.recv(timeout=0.1)
if message:
self.id = message.arbitration_id
self.signal = message.data
class MainUI:
def __init__(self):
self.init_class_variable()
self.init_root_configure()
self.init_main_ui()
self.init_main_frame()
self.init_main_label()
def init_class_variable(self):
self.root = tk.Tk()
self.ratio = common.Common.get_display_ratio()
self.width, self.height = int(1200/self.ratio), int(500/self.ratio)
self.frames = {}
self.labels = []
def init_root_configure(self):
self.root.grid_columnconfigure(0, weight=1)
self.root.grid_columnconfigure(1, weight=1)
self.root.grid_rowconfigure(0, weight=1)
def init_main_ui(self):
self.root.title("example for multithreading and tkinter")
self.root.geometry(f"{self.width}x{self.height}")
self.root.resizable(False, False)
def init_main_frame(self):
for i in range(2):
frame = tk.Frame(self.root, bg='lightblue')
frame.grid(row=0, column=i, sticky="nsew")
self.frames[f"frame{i}"] = frame
def init_main_label(self):
for i in range(2):
for j in range(4):
label = tk.Label(
self.frames[f'frame{i}'], text=f"{j}번 째 라벨", font=("Helvetica", int(12/self.ratio)), width=50, height=2, anchor="w", padx=10, pady=5)
label.grid(row=j, column=0)
self.labels.append(label)
def run(self):
self.root.mainloop()
class Updater:
def __init__(self, ui: MainUI, bus : CanBus):
self.init_class_variable(ui, bus)
self.ui.root.protocol("WM_DELETE_WINDOW", self.stop)
def init_class_variable(self, ui, bus):
self.ui = ui
self.can = bus
self.running = True
self.stop_threading = threading.Event()
self.messages = {}
self.update_labels = {}
self.cnt = 0
def update_label(self, key):
if self.cnt < len(self.ui.labels):
if key not in self.update_labels:
self.update_labels[key] = self.ui.labels[self.cnt]
self.cnt += 1
def receive_thread(self):
while self.running and not self.stop_threading.is_set():
try:
self.can.receive()
id = self.can.id
signal = self.can.signal
if id and signal:
self.update_label(id)
if id in self.update_labels:
self.update_labels[id].config(
text=f"Signal: " + " ".join(f"{byte:02x}" for byte in signal)
)
except Exception as e:
print(f"Error receiving CAN message: {e}")
break
def run(self):
self.thread = threading.Thread(target=self.receive_thread)
self.thread.start()
self.ui.run()
def stop(self):
self.running = False
self.stop_threading.set()
self.can.shutdown()
if self.thread.is_alive():
self.thread.join()
self.ui.root.destroy()
if __name__ == "__main__":
bus = CanBus()
ui = MainUI()
main = Updater(ui, bus)
main.run()
기타
앞서 언급했지만 멀티 스레드는 복잡한 개념이다. 특히 파이썬의 경우 GIL(Global Interpreter Lock)이라는 규칙으로 인해 CPU 바운드 작업에서 멀티스레딩의 효율성이 떨어질 수 있다. 그러나 I/O 바운드 작업에서는 멀티스레딩이 효과적일 수 있다. 이러한 부분을 잘 인지하고 자신의 프로젝트 성격에 맞게 멀티스레딩을 구현하는 것이 중요하다. 이 게시글만으로 모든 내용을 다루기 힘들기 때문에 추가로 참고할 수 있는 자료를 소개한다.
GIL: Understanding the Python GIL (David Beazley)
tkinter threading: geeksforgeeks
'Language > Python' 카테고리의 다른 글
[파이썬] 에러 로깅 클래스 (0) | 2024.09.28 |
---|---|
[파이썬] 반응형 GUI (0) | 2024.09.23 |
[파이썬] tkinter (2) - PDF 페이지 추출 프로그램 (0) | 2024.06.29 |
[파이썬] tkinter (1) (0) | 2024.06.24 |
[Python] PDF 텍스트 추출 (0) | 2023.10.31 |