import queue
import threading
import inspect
import ctypes
# 创建stop方法
def _async_raise(tid, exctype):
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)
print("====线程已关闭")
class Test():
def __init__(self) -> None:
print('init')
self._init_test()
self._is_quit = False
def __del__(self) -> None:
self.quit()
print('Del')
def _init_test(self):
self._test = queue.Queue()
def worker():
print('This is a test message')
while True:
item = self._test.get()
print(item)
self._test.task_done()
self._p = threading.Thread(target=worker, daemon=True)
self._p.start()
def test(self):
self._test.put('This is a test message')
def quit(self):
if self._is_quit:
return
self._is_quit = True
stop_thread(self._p)
self._p.join()
if __name__ == '__main__':
test_obj = Test()
test_obj.test()
test_obj.quit() # When comment this, process will not call __del__
当注释最后一行的
test_obj.quit()
就无法正常调用Test的析构函数
你的代码实现了一个带有线程的类Test,其中包含了一个队列和一个线程,线程会不断从队列中取出元素并打印。同时,类Test还实现了一个test方法,用于往队列中添加元素。在类Test的析构函数中,会调用quit方法来停止线程。quit方法中使用了stop_thread函数来停止线程。stop_thread函数使用了_async_raise函数来向线程发送SystemExit异常,从而停止线程。
在主程序中,创建了一个Test对象,并调用了test方法往队列中添加元素,然后调用了quit方法来停止线程。当注释掉quit方法时,程序不会调用类Test的析构函数,因此线程不会被停止。