Lib.core.ThreadPool

线程池 Created by BigCookie233

 1"""
 2线程池
 3Created by BigCookie233
 4"""
 5
 6import sys
 7import traceback
 8from concurrent.futures import ThreadPoolExecutor
 9
10from Lib.core import ConfigManager
11from Lib.core.ConfigManager import GlobalConfig
12from Lib.utils.Logger import get_logger
13from Lib.common import save_exc_dump
14
15import atexit
16
17thread_pool = None
18logger = get_logger()
19
20
21def shutdown():
22    """
23    关闭线程池
24    """
25    global thread_pool
26    if isinstance(thread_pool, ThreadPoolExecutor):
27        logger.debug("Closing Thread Pool")
28        thread_pool.shutdown()
29        thread_pool = None
30
31
32def init():
33    """
34    初始化线程池
35    Returns:
36        None
37    """
38    global thread_pool
39    thread_pool = ThreadPoolExecutor(max_workers=GlobalConfig().thread_pool.max_workers)
40    atexit.register(shutdown)
41
42
43def _wrapper(func, *args, **kwargs):
44    try:
45        return func(*args, **kwargs)
46    except Exception as e:
47        if ConfigManager.GlobalConfig().debug.save_dump:
48            dump_path = save_exc_dump(f"Error in async task({func.__module__}.{func.__name__})")
49        else:
50            dump_path = None
51        # 打印到日志中
52        logger.error(
53            f"Error in async task({func.__module__}.{func.__name__}): {repr(e)}\n"
54            f"{"".join(traceback.format_exc())}"
55            f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}"
56        )
57
58
59def async_task(func):
60    """
61    异步任务装饰器
62    """
63    def wrapper(*args, **kwargs):
64        if isinstance(thread_pool, ThreadPoolExecutor):
65            return thread_pool.submit(_wrapper, func, *args, **kwargs)
66        else:
67            logger.warning("Thread Pool is not initialized. Please call init() before using it.")
68            return func(*args, **kwargs)
69
70    return wrapper
thread_pool = None
logger = <RootLogger root (INFO)>
def shutdown():
22def shutdown():
23    """
24    关闭线程池
25    """
26    global thread_pool
27    if isinstance(thread_pool, ThreadPoolExecutor):
28        logger.debug("Closing Thread Pool")
29        thread_pool.shutdown()
30        thread_pool = None

关闭线程池

def init():
33def init():
34    """
35    初始化线程池
36    Returns:
37        None
38    """
39    global thread_pool
40    thread_pool = ThreadPoolExecutor(max_workers=GlobalConfig().thread_pool.max_workers)
41    atexit.register(shutdown)

初始化线程池

Returns:

None

def async_task(func):
60def async_task(func):
61    """
62    异步任务装饰器
63    """
64    def wrapper(*args, **kwargs):
65        if isinstance(thread_pool, ThreadPoolExecutor):
66            return thread_pool.submit(_wrapper, func, *args, **kwargs)
67        else:
68            logger.warning("Thread Pool is not initialized. Please call init() before using it.")
69            return func(*args, **kwargs)
70
71    return wrapper

异步任务装饰器