Lib.core.ThreadPool
线程池 Created by BigCookie233
1""" 2线程池 3Created by BigCookie233 4""" 5 6import sys 7import traceback 8from concurrent.futures import ThreadPoolExecutor 9 10from Lib.core import ConfigManager 11from Lib.core.ConfigManager import GlobalConfig 12from Lib.utils.Logger import get_logger 13from Lib.common import save_exc_dump 14 15import atexit 16 17thread_pool = None 18logger = get_logger() 19 20 21def shutdown(): 22 """ 23 关闭线程池 24 """ 25 global thread_pool 26 if isinstance(thread_pool, ThreadPoolExecutor): 27 logger.debug("Closing Thread Pool") 28 thread_pool.shutdown() 29 thread_pool = None 30 31 32def init(): 33 """ 34 初始化线程池 35 Returns: 36 None 37 """ 38 global thread_pool 39 thread_pool = ThreadPoolExecutor(max_workers=GlobalConfig().thread_pool.max_workers) 40 atexit.register(shutdown) 41 42 43def _wrapper(func, *args, **kwargs): 44 try: 45 return func(*args, **kwargs) 46 except Exception as e: 47 if ConfigManager.GlobalConfig().debug.save_dump: 48 dump_path = save_exc_dump(f"Error in async task({func.__module__}.{func.__name__})") 49 else: 50 dump_path = None 51 # 打印到日志中 52 logger.error( 53 f"Error in async task({func.__module__}.{func.__name__}): {repr(e)}\n" 54 f"{"".join(traceback.format_exc())}" 55 f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" 56 ) 57 58 59def async_task(func): 60 """ 61 异步任务装饰器 62 """ 63 def wrapper(*args, **kwargs): 64 if isinstance(thread_pool, ThreadPoolExecutor): 65 return thread_pool.submit(_wrapper, func, *args, **kwargs) 66 else: 67 logger.warning("Thread Pool is not initialized. Please call init() before using it.") 68 return func(*args, **kwargs) 69 70 return wrapper
thread_pool =
None
logger =
<RootLogger root (INFO)>
def
shutdown():
22def shutdown(): 23 """ 24 关闭线程池 25 """ 26 global thread_pool 27 if isinstance(thread_pool, ThreadPoolExecutor): 28 logger.debug("Closing Thread Pool") 29 thread_pool.shutdown() 30 thread_pool = None
关闭线程池
def
init():
33def init(): 34 """ 35 初始化线程池 36 Returns: 37 None 38 """ 39 global thread_pool 40 thread_pool = ThreadPoolExecutor(max_workers=GlobalConfig().thread_pool.max_workers) 41 atexit.register(shutdown)
初始化线程池
Returns:
None
def
async_task(func):
60def async_task(func): 61 """ 62 异步任务装饰器 63 """ 64 def wrapper(*args, **kwargs): 65 if isinstance(thread_pool, ThreadPoolExecutor): 66 return thread_pool.submit(_wrapper, func, *args, **kwargs) 67 else: 68 logger.warning("Thread Pool is not initialized. Please call init() before using it.") 69 return func(*args, **kwargs) 70 71 return wrapper
异步任务装饰器