Lib.core.EventManager

事件管理器,用于管理事件与事件监听器

  1"""
  2事件管理器,用于管理事件与事件监听器
  3"""
  4import traceback
  5from typing import Any, TypeVar
  6
  7from collections.abc import Callable
  8from dataclasses import dataclass, field
  9
 10from Lib.core.ThreadPool import async_task
 11from Lib.core import ConfigManager
 12from Lib.utils import Logger
 13import inspect
 14
 15logger = Logger.get_logger()
 16
 17
 18class _Event:
 19    """
 20    请勿使用此事件类,使用Event继承以创建自定义事件
 21    """
 22    pass
 23
 24
 25class Hook(_Event):
 26    """
 27    钩子事件,用于在事件处理过程中跳过某些监听器
 28    """
 29    def __init__(self, event, listener):
 30        self.event = event
 31        self.listener = listener
 32
 33    def call(self):
 34        """
 35        按优先级顺序同步触发所有监听器
 36        """
 37        if self.__class__ in event_listeners:
 38            for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True):
 39                if not ConfigManager.GlobalConfig().debug.enable:
 40                    try:
 41                        res = listener.func(self, **listener.kwargs)
 42                    except Exception as e:
 43                        logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}")
 44                        continue
 45                else:
 46                    res = listener.func(self, **listener.kwargs)
 47                if res is True:
 48                    return True
 49            return False
 50
 51
 52T = TypeVar('T', bound='_Event')
 53
 54
 55# 定义事件监听器的数据类
 56@dataclass(order=True)
 57class EventListener:
 58    """
 59    事件监听器数据类
 60    """
 61    priority: int  # 优先级,默认为排序的依据
 62    func: Callable[[T, ...], Any]  # 监听器函数
 63    kwargs: dict[str, Any] = field(default_factory=dict)  # 附加参数
 64
 65    def __post_init__(self):
 66        # 确保监听器函数至少有一个参数
 67        assert len(inspect.signature(self.func).parameters) >= 1, "The listener takes at least 1 parameter"
 68
 69
 70# 定义监听器的类型和存储
 71event_listeners: dict[type[T], list[EventListener]] = {}
 72
 73
 74# 装饰器,用于注册监听器
 75def event_listener(event_class: type[T], priority: int = 0, **kwargs):
 76    """
 77    用于注册监听器
 78
 79    Args:
 80        event_class: 事件类型
 81        priority: 优先级,默认为0
 82        **kwargs: 附加参数
 83    """
 84    assert issubclass(event_class, _Event), "Event class must be a subclass of Event"
 85
 86    def wrapper(func: Callable[[T, ...], Any]):
 87        # 注册事件监听器
 88        listener = EventListener(priority=priority, func=func, kwargs=kwargs)
 89        event_listeners.setdefault(event_class, []).append(listener)
 90        return func
 91
 92    return wrapper
 93
 94
 95class Event(_Event):
 96    """
 97    基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件
 98    """
 99
100    def _call_hook(self, listener):
101        return Hook(self, listener).call()
102
103    def call(self):
104        """
105        按优先级顺序同步触发所有监听器
106        """
107        if self.__class__ in event_listeners:
108            res_list = []
109            for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True):
110                if self._call_hook(listener):
111                    logger.debug(f"Skipped listener: {listener.func.__name__}")
112                    continue
113                if not ConfigManager.GlobalConfig().debug.enable:
114                    try:
115                        res = listener.func(self, **listener.kwargs)
116                    except Exception as e:
117                        logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}")
118                        continue
119                else:
120                    res = listener.func(self, **listener.kwargs)
121                res_list.append(res)
122
123    @async_task
124    def call_async(self):
125        """
126        无需等待的异步按优先级顺序触发所有监听器
127        """
128        self.call()
129
130
131if __name__ == "__main__":
132    # 示例:自定义事件
133    """
134    class MyEvent(Event):
135        def __init__(self, message):
136            self.message = message
137
138
139    # 监听器函数
140    @event_listener(MyEvent, priority=10, other_message="priority is 10")
141    @event_listener(MyEvent, priority=100, other_message="priority is 100")
142    @event_listener(MyEvent, other_message="I'm going to be skipped")
143    def on_my_event(event, other_message=""):
144        print(f"Received event: {event.message}!", other_message)
145
146
147    @event_listener(Hook)
148    def on_hook(event):
149        if event.event.__class__ == MyEvent and event.listener.kwargs["other_message"] == "I'm going to be skipped":
150            return True
151
152
153    # 触发事件
154    event = MyEvent("Hello, World")
155    event.call()
156"""
logger = <RootLogger root (INFO)>
class Hook(_Event):
26class Hook(_Event):
27    """
28    钩子事件,用于在事件处理过程中跳过某些监听器
29    """
30    def __init__(self, event, listener):
31        self.event = event
32        self.listener = listener
33
34    def call(self):
35        """
36        按优先级顺序同步触发所有监听器
37        """
38        if self.__class__ in event_listeners:
39            for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True):
40                if not ConfigManager.GlobalConfig().debug.enable:
41                    try:
42                        res = listener.func(self, **listener.kwargs)
43                    except Exception as e:
44                        logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}")
45                        continue
46                else:
47                    res = listener.func(self, **listener.kwargs)
48                if res is True:
49                    return True
50            return False

钩子事件,用于在事件处理过程中跳过某些监听器

Hook(event, listener)
30    def __init__(self, event, listener):
31        self.event = event
32        self.listener = listener
event
listener
def call(self):
34    def call(self):
35        """
36        按优先级顺序同步触发所有监听器
37        """
38        if self.__class__ in event_listeners:
39            for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True):
40                if not ConfigManager.GlobalConfig().debug.enable:
41                    try:
42                        res = listener.func(self, **listener.kwargs)
43                    except Exception as e:
44                        logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}")
45                        continue
46                else:
47                    res = listener.func(self, **listener.kwargs)
48                if res is True:
49                    return True
50            return False

按优先级顺序同步触发所有监听器

@dataclass(order=True)
class EventListener:
57@dataclass(order=True)
58class EventListener:
59    """
60    事件监听器数据类
61    """
62    priority: int  # 优先级,默认为排序的依据
63    func: Callable[[T, ...], Any]  # 监听器函数
64    kwargs: dict[str, Any] = field(default_factory=dict)  # 附加参数
65
66    def __post_init__(self):
67        # 确保监听器函数至少有一个参数
68        assert len(inspect.signature(self.func).parameters) >= 1, "The listener takes at least 1 parameter"

事件监听器数据类

EventListener( priority: int, func: Callable[[~T, ...], typing.Any], kwargs: dict[str, typing.Any] = <factory>)
priority: int
func: Callable[[~T, ...], typing.Any]
kwargs: dict[str, typing.Any]
event_listeners: dict[type[~T], list[EventListener]] = {<class 'Lib.core.ListenerServer.EscalationEvent'>: [EventListener(priority=0, func=<function run_plugin_main_wrapper>, kwargs={}), EventListener(priority=0, func=<function on_escalation>, kwargs={})], <class 'Lib.utils.EventClassifier.HeartbeatMetaEvent'>: [EventListener(priority=0, func=<function on_heartbeat>, kwargs={})]}
def event_listener(event_class: type[~T], priority: int = 0, **kwargs):
76def event_listener(event_class: type[T], priority: int = 0, **kwargs):
77    """
78    用于注册监听器
79
80    Args:
81        event_class: 事件类型
82        priority: 优先级,默认为0
83        **kwargs: 附加参数
84    """
85    assert issubclass(event_class, _Event), "Event class must be a subclass of Event"
86
87    def wrapper(func: Callable[[T, ...], Any]):
88        # 注册事件监听器
89        listener = EventListener(priority=priority, func=func, kwargs=kwargs)
90        event_listeners.setdefault(event_class, []).append(listener)
91        return func
92
93    return wrapper

用于注册监听器

Arguments:
  • event_class: 事件类型
  • priority: 优先级,默认为0
  • **kwargs: 附加参数
class Event(_Event):
 96class Event(_Event):
 97    """
 98    基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件
 99    """
100
101    def _call_hook(self, listener):
102        return Hook(self, listener).call()
103
104    def call(self):
105        """
106        按优先级顺序同步触发所有监听器
107        """
108        if self.__class__ in event_listeners:
109            res_list = []
110            for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True):
111                if self._call_hook(listener):
112                    logger.debug(f"Skipped listener: {listener.func.__name__}")
113                    continue
114                if not ConfigManager.GlobalConfig().debug.enable:
115                    try:
116                        res = listener.func(self, **listener.kwargs)
117                    except Exception as e:
118                        logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}")
119                        continue
120                else:
121                    res = listener.func(self, **listener.kwargs)
122                res_list.append(res)
123
124    @async_task
125    def call_async(self):
126        """
127        无需等待的异步按优先级顺序触发所有监听器
128        """
129        self.call()

基事件类,所有自定义事件均继承自此类,继承自此类以创建自定义事件

def call(self):
104    def call(self):
105        """
106        按优先级顺序同步触发所有监听器
107        """
108        if self.__class__ in event_listeners:
109            res_list = []
110            for listener in sorted(event_listeners[self.__class__], key=lambda i: i.priority, reverse=True):
111                if self._call_hook(listener):
112                    logger.debug(f"Skipped listener: {listener.func.__name__}")
113                    continue
114                if not ConfigManager.GlobalConfig().debug.enable:
115                    try:
116                        res = listener.func(self, **listener.kwargs)
117                    except Exception as e:
118                        logger.error(f"Error occurred in listener: {repr(e)}\n{traceback.format_exc()}")
119                        continue
120                else:
121                    res = listener.func(self, **listener.kwargs)
122                res_list.append(res)

按优先级顺序同步触发所有监听器

def call_async(*args, **kwargs):
64    def wrapper(*args, **kwargs):
65        if isinstance(thread_pool, ThreadPoolExecutor):
66            return thread_pool.submit(_wrapper, func, *args, **kwargs)
67        else:
68            logger.warning("Thread Pool is not initialized. Please call init() before using it.")
69            return func(*args, **kwargs)

无需等待的异步按优先级顺序触发所有监听器