Lib.core.EventManager
事件管理器,用于管理事件与事件监听器
1""" 2事件管理器,用于管理事件与事件监听器 3""" 4import traceback 5from typing import Any, TypeVar 6 7from collections.abc import Callable 8from dataclasses import dataclass, field 9 10from Lib.core.ThreadPool import async_task 11from Lib.core import ConfigManager 12from Lib.utils import Logger 13import inspect 14 15logger = Logger.get_logger() 16 17 18class _Event: 19 """ 20 请勿使用此事件类,使用Event继承以创建自定义事件 21 """ 22 pass 23 24 25class Hook(_Event): 26 """ 27 钩子事件,用于在事件处理过程中跳过某些监听器 28 """ 29 def __init__(self, event, listener): 30 self.event = event 31 self.listener = listener 32 33 def call(self): 34 """ 35 按优先级顺序同步触发所有监听器 36 """ 37 if self.__class__ in event_listeners: 38 for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): 39 if not ConfigManager.GlobalConfig().debug.enable: 40 try: 41 res = listener.func(self, **listener.kwargs) 42 except Exception as e: 43 logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") 44 continue 45 else: 46 res = listener.func(self, **listener.kwargs) 47 if res is True: 48 return True 49 return False 50 51 52T = TypeVar('T', bound='_Event') 53 54 55# 定义事件监听器的数据类 56@dataclass(order=True) 57class EventListener: 58 """ 59 事件监听器数据类 60 """ 61 priority: int # 优先级,默认为排序的依据 62 func: Callable[[T, ...], Any] # 监听器函数 63 kwargs: dict[str, Any] = field(default_factory=dict) # 附加参数 64 65 def __post_init__(self): 66 # 确保监听器函数至少有一个参数 67 assert len(inspect.signature(self.func).parameters) >= 1, "The listener takes at least 1 parameter" 68 69 70# 定义监听器的类型和存储 71event_listeners: dict[type[T], list[EventListener]] = {} 72 73 74# 装饰器,用于注册监听器 75def event_listener(event_class: type[T], priority: int = 0, **kwargs): 76 """ 77 用于注册监听器 78 79 Args: 80 event_class: 事件类型 81 priority: 优先级,默认为0 82 **kwargs: 附加参数 83 """ 84 assert issubclass(event_class, _Event), "Event class must be a subclass of Event" 85 86 def wrapper(func: Callable[[T, ...], Any]): 87 # 注册事件监听器 88 listener = EventListener(priority=priority, func=func, kwargs=kwargs) 89 event_listeners.setdefault(event_class, []).append(listener) 90 return func 91 92 return wrapper 93 94 95class Event(_Event): 96 """ 97 基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件 98 """ 99 100 def _call_hook(self, listener): 101 return Hook(self, listener).call() 102 103 def call(self): 104 """ 105 按优先级顺序同步触发所有监听器 106 """ 107 if self.__class__ in event_listeners: 108 res_list = [] 109 for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): 110 if self._call_hook(listener): 111 logger.debug(f"Skipped listener: {listener.func.__name__}") 112 continue 113 if not ConfigManager.GlobalConfig().debug.enable: 114 try: 115 res = listener.func(self, **listener.kwargs) 116 except Exception as e: 117 logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") 118 continue 119 else: 120 res = listener.func(self, **listener.kwargs) 121 res_list.append(res) 122 123 @async_task 124 def call_async(self): 125 """ 126 无需等待的异步按优先级顺序触发所有监听器 127 """ 128 self.call() 129 130 131if __name__ == "__main__": 132 # 示例:自定义事件 133 """ 134 class MyEvent(Event): 135 def __init__(self, message): 136 self.message = message 137 138 139 # 监听器函数 140 @event_listener(MyEvent, priority=10, other_message="priority is 10") 141 @event_listener(MyEvent, priority=100, other_message="priority is 100") 142 @event_listener(MyEvent, other_message="I'm going to be skipped") 143 def on_my_event(event, other_message=""): 144 print(f"Received event: {event.message}!", other_message) 145 146 147 @event_listener(Hook) 148 def on_hook(event): 149 if event.event.__class__ == MyEvent and event.listener.kwargs["other_message"] == "I'm going to be skipped": 150 return True 151 152 153 # 触发事件 154 event = MyEvent("Hello, World") 155 event.call() 156"""
logger =
<RootLogger root (INFO)>
26class Hook(_Event): 27 """ 28 钩子事件,用于在事件处理过程中跳过某些监听器 29 """ 30 def __init__(self, event, listener): 31 self.event = event 32 self.listener = listener 33 34 def call(self): 35 """ 36 按优先级顺序同步触发所有监听器 37 """ 38 if self.__class__ in event_listeners: 39 for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): 40 if not ConfigManager.GlobalConfig().debug.enable: 41 try: 42 res = listener.func(self, **listener.kwargs) 43 except Exception as e: 44 logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") 45 continue 46 else: 47 res = listener.func(self, **listener.kwargs) 48 if res is True: 49 return True 50 return False
钩子事件,用于在事件处理过程中跳过某些监听器
def
call(self):
34 def call(self): 35 """ 36 按优先级顺序同步触发所有监听器 37 """ 38 if self.__class__ in event_listeners: 39 for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): 40 if not ConfigManager.GlobalConfig().debug.enable: 41 try: 42 res = listener.func(self, **listener.kwargs) 43 except Exception as e: 44 logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") 45 continue 46 else: 47 res = listener.func(self, **listener.kwargs) 48 if res is True: 49 return True 50 return False
按优先级顺序同步触发所有监听器
@dataclass(order=True)
class
EventListener:
57@dataclass(order=True) 58class EventListener: 59 """ 60 事件监听器数据类 61 """ 62 priority: int # 优先级,默认为排序的依据 63 func: Callable[[T, ...], Any] # 监听器函数 64 kwargs: dict[str, Any] = field(default_factory=dict) # 附加参数 65 66 def __post_init__(self): 67 # 确保监听器函数至少有一个参数 68 assert len(inspect.signature(self.func).parameters) >= 1, "The listener takes at least 1 parameter"
事件监听器数据类
event_listeners: dict[type[~T], list[EventListener]] =
{<class 'Lib.core.ListenerServer.EscalationEvent'>: [EventListener(priority=0, func=<function run_plugin_main_wrapper>, kwargs={}), EventListener(priority=0, func=<function on_escalation>, kwargs={})], <class 'Lib.utils.EventClassifier.HeartbeatMetaEvent'>: [EventListener(priority=0, func=<function on_heartbeat>, kwargs={})]}
def
event_listener(event_class: type[~T], priority: int = 0, **kwargs):
76def event_listener(event_class: type[T], priority: int = 0, **kwargs): 77 """ 78 用于注册监听器 79 80 Args: 81 event_class: 事件类型 82 priority: 优先级,默认为0 83 **kwargs: 附加参数 84 """ 85 assert issubclass(event_class, _Event), "Event class must be a subclass of Event" 86 87 def wrapper(func: Callable[[T, ...], Any]): 88 # 注册事件监听器 89 listener = EventListener(priority=priority, func=func, kwargs=kwargs) 90 event_listeners.setdefault(event_class, []).append(listener) 91 return func 92 93 return wrapper
用于注册监听器
Arguments:
- event_class: 事件类型
- priority: 优先级,默认为0
- **kwargs: 附加参数
96class Event(_Event): 97 """ 98 基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件 99 """ 100 101 def _call_hook(self, listener): 102 return Hook(self, listener).call() 103 104 def call(self): 105 """ 106 按优先级顺序同步触发所有监听器 107 """ 108 if self.__class__ in event_listeners: 109 res_list = [] 110 for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): 111 if self._call_hook(listener): 112 logger.debug(f"Skipped listener: {listener.func.__name__}") 113 continue 114 if not ConfigManager.GlobalConfig().debug.enable: 115 try: 116 res = listener.func(self, **listener.kwargs) 117 except Exception as e: 118 logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") 119 continue 120 else: 121 res = listener.func(self, **listener.kwargs) 122 res_list.append(res) 123 124 @async_task 125 def call_async(self): 126 """ 127 无需等待的异步按优先级顺序触发所有监听器 128 """ 129 self.call()
基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件
def
call(self):
104 def call(self): 105 """ 106 按优先级顺序同步触发所有监听器 107 """ 108 if self.__class__ in event_listeners: 109 res_list = [] 110 for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True): 111 if self._call_hook(listener): 112 logger.debug(f"Skipped listener: {listener.func.__name__}") 113 continue 114 if not ConfigManager.GlobalConfig().debug.enable: 115 try: 116 res = listener.func(self, **listener.kwargs) 117 except Exception as e: 118 logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}") 119 continue 120 else: 121 res = listener.func(self, **listener.kwargs) 122 res_list.append(res)
按优先级顺序同步触发所有监听器
def
call_async(*args, **kwargs):
64 def wrapper(*args, **kwargs): 65 if isinstance(thread_pool, ThreadPoolExecutor): 66 return thread_pool.submit(_wrapper, func, *args, **kwargs) 67 else: 68 logger.warning("Thread Pool is not initialized. Please call init() before using it.") 69 return func(*args, **kwargs)
无需等待的异步按优先级顺序触发所有监听器