Lib.utils.EventHandlers
事件处理器
1""" 2事件处理器 3""" 4import copy 5import traceback 6 7from Lib.common import save_exc_dump 8from Lib.core import EventManager, ConfigManager, PluginManager 9from Lib.utils import EventClassifier, Logger, QQRichText, StateManager 10 11import inspect 12from typing import Literal, Callable, Any, Type 13 14logger = Logger.get_logger() 15 16 17class Rule: 18 """ 19 Rule基类,请勿直接使用 20 """ 21 22 def match(self, event_data: EventClassifier.Event): 23 """ 24 匹配事件 25 Args: 26 event_data: 事件数据 27 Returns: 28 是否匹配到事件 29 """ 30 pass 31 32 33class AnyRule(Rule): 34 """ 35 输入n个rule,若匹配其中任意一个则返回True 36 """ 37 38 def __init__(self, *rules: Rule): 39 self.rules = rules 40 41 def match(self, event_data: EventClassifier.Event): 42 return any(rule.match(event_data) for rule in self.rules) 43 44 45class AllRule(Rule): 46 """ 47 输入n个rule,若匹配所有则返回True 48 """ 49 50 def __init__(self, *rules: Rule): 51 self.rules = rules 52 53 def match(self, event_data: EventClassifier.Event): 54 return all(rule.match(event_data) for rule in self.rules) 55 56 57class KeyValueRule(Rule): 58 """ 59 键值规则 60 检测event data中的某个键的值是否满足要求 61 """ 62 63 def __init__(self, key, value, model: Literal["eq", "ne", "in", "not in", "func"], 64 func: Callable[[Any, Any], bool] = None): 65 """ 66 Args: 67 key: 键 68 value: 值 69 model: 匹配模式(可选: eq, ne, in, not in, func) 70 func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool) 71 """ 72 self.key = key 73 self.value = value 74 self.model = model 75 if model == "func" and func is None: 76 raise ValueError("if model is func, func must be a callable") 77 self.func = func 78 79 def match(self, event_data: EventClassifier.Event): 80 try: 81 match self.model: 82 case "eq": 83 return event_data.get(self.key) == self.value 84 case "ne": 85 return event_data.get(self.key) != self.value 86 case "in": 87 return self.value in event_data.get(self.key) 88 case "not in": 89 return self.value not in event_data.get(self.key) 90 case "func": 91 return self.func(event_data.get(self.key), self.value) 92 except Exception as e: 93 if ConfigManager.GlobalConfig().debug.save_dump: 94 dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") 95 else: 96 dump_path = None 97 logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" 98 f"{traceback.format_exc()}" 99 f"{f"已保存异常到 {dump_path}" if dump_path else ""}") 100 return False 101 102 103class FuncRule(Rule): 104 """ 105 函数规则 106 检测event data是否满足函数 107 """ 108 109 def __init__(self, func: Callable[[Any], bool]): 110 """ 111 Args: 112 func: 用于检测函数(输入为 event_data, 返回 bool) 113 """ 114 self.func = func 115 116 def match(self, event_data: EventClassifier.Event): 117 try: 118 return self.func(event_data) 119 except Exception as e: 120 if ConfigManager.GlobalConfig().debug.save_dump: 121 dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") 122 else: 123 dump_path = None 124 125 logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" 126 f"{traceback.format_exc()}" 127 f"{f"已保存异常到 {dump_path}" if dump_path else ""}" 128 ) 129 return False 130 131 132class CommandRule(Rule): 133 """ 134 命令规则 135 用于匹配命令 136 137 默认匹配:命令起始符 + 命令 和 命令起始符 + 别名。 138 若消息前带有 @bot 时,可直接匹配 命令本身 和 别名,无需命令起始符。 139 140 会自动移除消息中的 @bot 和命令起始符,同时会自动将 别名 替换为 命令本身,以简化插件处理逻辑。 141 """ 142 143 def __init__( 144 self, 145 command: str, 146 aliases: set[str] = None, 147 command_start: list[str] = None, 148 reply: bool = False, 149 no_args: bool = False, 150 ): 151 """ 152 Args: 153 command: 命令 154 aliases: 命令别名 155 command_start: 命令起始符(不填写默认为配置文件中的command_start) 156 reply: 是否可包含回复(默认否) 157 no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容) 158 """ 159 if aliases is None: 160 aliases = set() 161 if command_start is None: 162 command_start = ConfigManager.GlobalConfig().command.command_start 163 if any(_ in command and _ for _ in ['[', ']'] + command_start): 164 raise ValueError("command cannot contain [ or ]") 165 if command in aliases: 166 raise ValueError("command cannot be an alias") 167 168 self.command = command 169 self.aliases = aliases 170 self.command_start = command_start 171 self.reply = reply 172 self.no_args = no_args 173 174 def match(self, event_data: EventClassifier.MessageEvent): 175 # 检查是否是消息事件 176 if not isinstance(event_data, EventClassifier.MessageEvent): 177 logger.warning(f"event {event_data} is not a MessageEvent, cannot match command") 178 return False 179 180 # 复制一份消息段 181 segments = copy.deepcopy(event_data.message.rich_array) 182 183 # 初始化是否@了机器人以及回复消息段的变量 184 is_at = False 185 reply_segment = None 186 187 # 检查消息是否以回复形式开始 188 if ( 189 self.reply and 190 len(segments) > 0 and 191 isinstance(segments[0], QQRichText.Reply) 192 ): 193 reply_segment = segments[0] 194 segments = segments[1:] 195 196 # 检查消息是否以@机器人开始 197 if ( 198 len(segments) > 0 and 199 isinstance(segments[0], QQRichText.At) and 200 int(segments[0].data.get("qq")) == event_data.self_id 201 ): 202 segments = segments[1:] 203 is_at = True 204 205 # 将消息段转换为字符串消息,并去除前导空格 206 message = str(QQRichText.QQRichText(segments)) 207 while len(message) > 0 and message[0] == " ": 208 message = message[1:] 209 210 # 重新将处理后的消息转换为QQRichText对象,并获取其字符串表示 211 string_message = str(QQRichText.QQRichText(message)) 212 213 # 生成所有可能的命令前缀组合,包括命令起始符和别名 214 commands = [_ + self.command for _ in self.command_start] 215 if is_at: 216 # 如果消息前面有at,则不需要命令起始符 217 commands += [self.command] + [alias for alias in self.aliases] 218 219 # 添加所有别名的命令前缀组合 220 commands += [_ + alias for alias in self.aliases for _ in self.command_start] 221 222 if self.no_args: 223 # 检查消息是否以任何预设命令前缀开始 224 if any(string_message == _ for _ in commands): 225 # 移除命令前缀 226 for start in self.command_start: 227 if string_message.startswith(start): 228 string_message = string_message[len(start):] 229 break 230 # 替换别名为主命令 231 for alias in self.aliases: 232 if string_message == alias: 233 string_message = self.command + string_message[len(alias):] 234 break 235 else: 236 return False 237 238 else: 239 # 检查消息是否以任何预设命令前缀开始 240 if any(string_message.startswith(_) for _ in commands): 241 # 移除命令前缀 242 for start in self.command_start: 243 if string_message.startswith(start): 244 string_message = string_message[len(start):] 245 break 246 # 替换别名为主命令 247 for alias in self.aliases: 248 if string_message.startswith(alias): 249 string_message = self.command + string_message[len(alias):] 250 break 251 else: 252 return False 253 254 # 更新消息对象 255 message = QQRichText.QQRichText(string_message) 256 257 # 将回复消息段添加到消息段列表中(如果有) 258 if reply_segment is not None: 259 message.rich_array.insert(0, reply_segment) 260 261 event_data.message = message 262 event_data.raw_message = string_message 263 return True 264 265 266def _to_me(event_data: EventClassifier.MessageEvent): 267 """ 268 判断是否是@自己或是私聊 269 Args: 270 event_data: 事件数据 271 Returns: 272 是否是@自己或是私聊 273 """ 274 if not isinstance(event_data, EventClassifier.MessageEvent): 275 logger.warning(f"event {event_data} is not a MessageEvent, cannot match to_me") 276 return False 277 if isinstance(event_data, EventClassifier.PrivateMessageEvent): 278 return True 279 if isinstance(event_data, EventClassifier.GroupMessageEvent): 280 for rich in event_data.message.rich_array: 281 if isinstance(rich, QQRichText.At) and int(rich.data.get("qq")) == event_data.self_id: 282 return True 283 return False 284 285 286to_me = FuncRule(_to_me) 287 288 289class Matcher: 290 """ 291 事件处理器 292 """ 293 294 def __init__(self): 295 self.handlers = [] 296 297 def register_handler(self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs): 298 """ 299 注册事件处理器 300 如果注册的处理器返回True,则事件传播将被阻断 301 Args: 302 priority: 事件优先级 303 rules: 匹配规则 304 """ 305 if rules is None: 306 rules = [] 307 if any(not isinstance(rule, Rule) for rule in rules): 308 raise TypeError("rules must be a list of Rule") 309 310 def wrapper(func): 311 self.handlers.append((priority, rules, func, args, kwargs)) 312 return func 313 314 return wrapper 315 316 def match(self, event_data: EventClassifier.Event, plugin_data: dict): 317 """ 318 匹配事件处理器 319 Args: 320 event_data: 事件数据 321 plugin_data: 插件数据 322 """ 323 for priority, rules, handler, args, kwargs in sorted(self.handlers, key=lambda x: x[0], reverse=True): 324 try: 325 if not all(rule.match(event_data) for rule in rules): 326 continue 327 328 # 检测依赖注入 329 handler_kwargs = kwargs.copy() # 复制静态 kwargs 330 331 sig = inspect.signature(handler) 332 333 for name, param in sig.parameters.items(): 334 if name == "state": 335 if (isinstance(event_data, EventClassifier.MessageEvent) or 336 isinstance(event_data, EventClassifier.PrivateMessageEvent)): 337 state_id = f"u{event_data.user_id}" 338 elif isinstance(event_data, EventClassifier.GroupMessageEvent): 339 state_id = f"g{event_data.group_id}_u{event_data.user_id}" 340 else: 341 raise TypeError("event_data must be a MessageEvent") 342 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 343 elif name == "user_state": 344 if isinstance(event_data, EventClassifier.MessageEvent): 345 state_id = f"u{event_data.user_id}" 346 else: 347 raise TypeError("event_data must be a MessageEvent") 348 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 349 elif name == "group_state": 350 if isinstance(event_data, EventClassifier.GroupMessageEvent): 351 state_id = f"g{event_data.group_id}" 352 else: 353 raise TypeError("event_data must be a MessageEvent") 354 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 355 356 result = handler(event_data, *args, **handler_kwargs) 357 358 if result is True: 359 logger.debug(f"处理器 {handler.__name__} 阻断了事件 {event_data} 的传播") 360 return # 阻断同一 Matcher 内的传播 361 except Exception as e: 362 if ConfigManager.GlobalConfig().debug.save_dump: 363 dump_path = save_exc_dump(f"执行匹配事件或执行处理器时出错 {event_data}") 364 else: 365 dump_path = None 366 logger.error( 367 f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n" 368 f"{traceback.format_exc()}" 369 f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" 370 ) 371 372 373events_matchers: dict[str, dict[Type[EventClassifier.Event], list[tuple[int, list[Rule], Matcher]]]] = {} 374 375 376def _on_event(event_data, path, event_type, plugin_data): 377 matchers = events_matchers[path][event_type] 378 for priority, rules, matcher in sorted(matchers, key=lambda x: x[0], reverse=True): 379 matcher_event_data = event_data.__class__(event_data.event_data) 380 if all(rule.match(matcher_event_data) for rule in rules): 381 matcher.match(matcher_event_data, plugin_data) 382 383 384def on_event(event: Type[EventClassifier.Event], priority: int = 0, rules: list[Rule] = None): 385 """ 386 注册事件处理器 387 Args: 388 event: 事件类型 389 priority: 事件优先级 390 rules: 匹配规则 391 Returns: 392 事件处理器 393 """ 394 if rules is None: 395 rules = [] 396 if any(not isinstance(rule, Rule) for rule in rules): 397 raise TypeError("rules must be a list of Rule") 398 if not issubclass(event, EventClassifier.Event): 399 raise TypeError("event must be an instance of EventClassifier.Event") 400 plugin_data = PluginManager.get_caller_plugin_data() 401 path = plugin_data["path"] 402 if path not in events_matchers: 403 events_matchers[path] = {} 404 if event not in events_matchers[path]: 405 events_matchers[path][event] = [] 406 EventManager.event_listener(event, path=path, event_type=event, plugin_data=plugin_data)(_on_event) 407 events_matcher = Matcher() 408 events_matchers[path][event].append((priority, rules, events_matcher)) 409 return events_matcher
logger =
<RootLogger root (INFO)>
class
Rule:
18class Rule: 19 """ 20 Rule基类,请勿直接使用 21 """ 22 23 def match(self, event_data: EventClassifier.Event): 24 """ 25 匹配事件 26 Args: 27 event_data: 事件数据 28 Returns: 29 是否匹配到事件 30 """ 31 pass
Rule基类,请勿直接使用
34class AnyRule(Rule): 35 """ 36 输入n个rule,若匹配其中任意一个则返回True 37 """ 38 39 def __init__(self, *rules: Rule): 40 self.rules = rules 41 42 def match(self, event_data: EventClassifier.Event): 43 return any(rule.match(event_data) for rule in self.rules)
输入n个rule,若匹配其中任意一个则返回True
46class AllRule(Rule): 47 """ 48 输入n个rule,若匹配所有则返回True 49 """ 50 51 def __init__(self, *rules: Rule): 52 self.rules = rules 53 54 def match(self, event_data: EventClassifier.Event): 55 return all(rule.match(event_data) for rule in self.rules)
输入n个rule,若匹配所有则返回True
58class KeyValueRule(Rule): 59 """ 60 键值规则 61 检测event data中的某个键的值是否满足要求 62 """ 63 64 def __init__(self, key, value, model: Literal["eq", "ne", "in", "not in", "func"], 65 func: Callable[[Any, Any], bool] = None): 66 """ 67 Args: 68 key: 键 69 value: 值 70 model: 匹配模式(可选: eq, ne, in, not in, func) 71 func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool) 72 """ 73 self.key = key 74 self.value = value 75 self.model = model 76 if model == "func" and func is None: 77 raise ValueError("if model is func, func must be a callable") 78 self.func = func 79 80 def match(self, event_data: EventClassifier.Event): 81 try: 82 match self.model: 83 case "eq": 84 return event_data.get(self.key) == self.value 85 case "ne": 86 return event_data.get(self.key) != self.value 87 case "in": 88 return self.value in event_data.get(self.key) 89 case "not in": 90 return self.value not in event_data.get(self.key) 91 case "func": 92 return self.func(event_data.get(self.key), self.value) 93 except Exception as e: 94 if ConfigManager.GlobalConfig().debug.save_dump: 95 dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") 96 else: 97 dump_path = None 98 logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" 99 f"{traceback.format_exc()}" 100 f"{f"已保存异常到 {dump_path}" if dump_path else ""}") 101 return False
键值规则 检测event data中的某个键的值是否满足要求
KeyValueRule( key, value, model: Literal['eq', 'ne', 'in', 'not in', 'func'], func: Callable[[Any, Any], bool] = None)
64 def __init__(self, key, value, model: Literal["eq", "ne", "in", "not in", "func"], 65 func: Callable[[Any, Any], bool] = None): 66 """ 67 Args: 68 key: 键 69 value: 值 70 model: 匹配模式(可选: eq, ne, in, not in, func) 71 func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool) 72 """ 73 self.key = key 74 self.value = value 75 self.model = model 76 if model == "func" and func is None: 77 raise ValueError("if model is func, func must be a callable") 78 self.func = func
Arguments:
- key: 键
- value: 值
- model: 匹配模式(可选: eq, ne, in, not in, func)
- func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool)
80 def match(self, event_data: EventClassifier.Event): 81 try: 82 match self.model: 83 case "eq": 84 return event_data.get(self.key) == self.value 85 case "ne": 86 return event_data.get(self.key) != self.value 87 case "in": 88 return self.value in event_data.get(self.key) 89 case "not in": 90 return self.value not in event_data.get(self.key) 91 case "func": 92 return self.func(event_data.get(self.key), self.value) 93 except Exception as e: 94 if ConfigManager.GlobalConfig().debug.save_dump: 95 dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") 96 else: 97 dump_path = None 98 logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" 99 f"{traceback.format_exc()}" 100 f"{f"已保存异常到 {dump_path}" if dump_path else ""}") 101 return False
匹配事件
Arguments:
- event_data: 事件数据
Returns:
是否匹配到事件
104class FuncRule(Rule): 105 """ 106 函数规则 107 检测event data是否满足函数 108 """ 109 110 def __init__(self, func: Callable[[Any], bool]): 111 """ 112 Args: 113 func: 用于检测函数(输入为 event_data, 返回 bool) 114 """ 115 self.func = func 116 117 def match(self, event_data: EventClassifier.Event): 118 try: 119 return self.func(event_data) 120 except Exception as e: 121 if ConfigManager.GlobalConfig().debug.save_dump: 122 dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") 123 else: 124 dump_path = None 125 126 logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" 127 f"{traceback.format_exc()}" 128 f"{f"已保存异常到 {dump_path}" if dump_path else ""}" 129 ) 130 return False
函数规则 检测event data是否满足函数
FuncRule(func: Callable[[Any], bool])
110 def __init__(self, func: Callable[[Any], bool]): 111 """ 112 Args: 113 func: 用于检测函数(输入为 event_data, 返回 bool) 114 """ 115 self.func = func
Arguments:
- func: 用于检测函数(输入为 event_data, 返回 bool)
117 def match(self, event_data: EventClassifier.Event): 118 try: 119 return self.func(event_data) 120 except Exception as e: 121 if ConfigManager.GlobalConfig().debug.save_dump: 122 dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}") 123 else: 124 dump_path = None 125 126 logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n" 127 f"{traceback.format_exc()}" 128 f"{f"已保存异常到 {dump_path}" if dump_path else ""}" 129 ) 130 return False
匹配事件
Arguments:
- event_data: 事件数据
Returns:
是否匹配到事件
133class CommandRule(Rule): 134 """ 135 命令规则 136 用于匹配命令 137 138 默认匹配:命令起始符 + 命令 和 命令起始符 + 别名。 139 若消息前带有 @bot 时,可直接匹配 命令本身 和 别名,无需命令起始符。 140 141 会自动移除消息中的 @bot 和命令起始符,同时会自动将 别名 替换为 命令本身,以简化插件处理逻辑。 142 """ 143 144 def __init__( 145 self, 146 command: str, 147 aliases: set[str] = None, 148 command_start: list[str] = None, 149 reply: bool = False, 150 no_args: bool = False, 151 ): 152 """ 153 Args: 154 command: 命令 155 aliases: 命令别名 156 command_start: 命令起始符(不填写默认为配置文件中的command_start) 157 reply: 是否可包含回复(默认否) 158 no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容) 159 """ 160 if aliases is None: 161 aliases = set() 162 if command_start is None: 163 command_start = ConfigManager.GlobalConfig().command.command_start 164 if any(_ in command and _ for _ in ['[', ']'] + command_start): 165 raise ValueError("command cannot contain [ or ]") 166 if command in aliases: 167 raise ValueError("command cannot be an alias") 168 169 self.command = command 170 self.aliases = aliases 171 self.command_start = command_start 172 self.reply = reply 173 self.no_args = no_args 174 175 def match(self, event_data: EventClassifier.MessageEvent): 176 # 检查是否是消息事件 177 if not isinstance(event_data, EventClassifier.MessageEvent): 178 logger.warning(f"event {event_data} is not a MessageEvent, cannot match command") 179 return False 180 181 # 复制一份消息段 182 segments = copy.deepcopy(event_data.message.rich_array) 183 184 # 初始化是否@了机器人以及回复消息段的变量 185 is_at = False 186 reply_segment = None 187 188 # 检查消息是否以回复形式开始 189 if ( 190 self.reply and 191 len(segments) > 0 and 192 isinstance(segments[0], QQRichText.Reply) 193 ): 194 reply_segment = segments[0] 195 segments = segments[1:] 196 197 # 检查消息是否以@机器人开始 198 if ( 199 len(segments) > 0 and 200 isinstance(segments[0], QQRichText.At) and 201 int(segments[0].data.get("qq")) == event_data.self_id 202 ): 203 segments = segments[1:] 204 is_at = True 205 206 # 将消息段转换为字符串消息,并去除前导空格 207 message = str(QQRichText.QQRichText(segments)) 208 while len(message) > 0 and message[0] == " ": 209 message = message[1:] 210 211 # 重新将处理后的消息转换为QQRichText对象,并获取其字符串表示 212 string_message = str(QQRichText.QQRichText(message)) 213 214 # 生成所有可能的命令前缀组合,包括命令起始符和别名 215 commands = [_ + self.command for _ in self.command_start] 216 if is_at: 217 # 如果消息前面有at,则不需要命令起始符 218 commands += [self.command] + [alias for alias in self.aliases] 219 220 # 添加所有别名的命令前缀组合 221 commands += [_ + alias for alias in self.aliases for _ in self.command_start] 222 223 if self.no_args: 224 # 检查消息是否以任何预设命令前缀开始 225 if any(string_message == _ for _ in commands): 226 # 移除命令前缀 227 for start in self.command_start: 228 if string_message.startswith(start): 229 string_message = string_message[len(start):] 230 break 231 # 替换别名为主命令 232 for alias in self.aliases: 233 if string_message == alias: 234 string_message = self.command + string_message[len(alias):] 235 break 236 else: 237 return False 238 239 else: 240 # 检查消息是否以任何预设命令前缀开始 241 if any(string_message.startswith(_) for _ in commands): 242 # 移除命令前缀 243 for start in self.command_start: 244 if string_message.startswith(start): 245 string_message = string_message[len(start):] 246 break 247 # 替换别名为主命令 248 for alias in self.aliases: 249 if string_message.startswith(alias): 250 string_message = self.command + string_message[len(alias):] 251 break 252 else: 253 return False 254 255 # 更新消息对象 256 message = QQRichText.QQRichText(string_message) 257 258 # 将回复消息段添加到消息段列表中(如果有) 259 if reply_segment is not None: 260 message.rich_array.insert(0, reply_segment) 261 262 event_data.message = message 263 event_data.raw_message = string_message 264 return True
命令规则 用于匹配命令
默认匹配:命令起始符 + 命令 和 命令起始符 + 别名。 若消息前带有 @bot 时,可直接匹配 命令本身 和 别名,无需命令起始符。
会自动移除消息中的 @bot 和命令起始符,同时会自动将 别名 替换为 命令本身,以简化插件处理逻辑。
CommandRule( command: str, aliases: set[str] = None, command_start: list[str] = None, reply: bool = False, no_args: bool = False)
144 def __init__( 145 self, 146 command: str, 147 aliases: set[str] = None, 148 command_start: list[str] = None, 149 reply: bool = False, 150 no_args: bool = False, 151 ): 152 """ 153 Args: 154 command: 命令 155 aliases: 命令别名 156 command_start: 命令起始符(不填写默认为配置文件中的command_start) 157 reply: 是否可包含回复(默认否) 158 no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容) 159 """ 160 if aliases is None: 161 aliases = set() 162 if command_start is None: 163 command_start = ConfigManager.GlobalConfig().command.command_start 164 if any(_ in command and _ for _ in ['[', ']'] + command_start): 165 raise ValueError("command cannot contain [ or ]") 166 if command in aliases: 167 raise ValueError("command cannot be an alias") 168 169 self.command = command 170 self.aliases = aliases 171 self.command_start = command_start 172 self.reply = reply 173 self.no_args = no_args
Arguments:
- command: 命令
- aliases: 命令别名
- command_start: 命令起始符(不填写默认为配置文件中的command_start)
- reply: 是否可包含回复(默认否)
- no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容)
175 def match(self, event_data: EventClassifier.MessageEvent): 176 # 检查是否是消息事件 177 if not isinstance(event_data, EventClassifier.MessageEvent): 178 logger.warning(f"event {event_data} is not a MessageEvent, cannot match command") 179 return False 180 181 # 复制一份消息段 182 segments = copy.deepcopy(event_data.message.rich_array) 183 184 # 初始化是否@了机器人以及回复消息段的变量 185 is_at = False 186 reply_segment = None 187 188 # 检查消息是否以回复形式开始 189 if ( 190 self.reply and 191 len(segments) > 0 and 192 isinstance(segments[0], QQRichText.Reply) 193 ): 194 reply_segment = segments[0] 195 segments = segments[1:] 196 197 # 检查消息是否以@机器人开始 198 if ( 199 len(segments) > 0 and 200 isinstance(segments[0], QQRichText.At) and 201 int(segments[0].data.get("qq")) == event_data.self_id 202 ): 203 segments = segments[1:] 204 is_at = True 205 206 # 将消息段转换为字符串消息,并去除前导空格 207 message = str(QQRichText.QQRichText(segments)) 208 while len(message) > 0 and message[0] == " ": 209 message = message[1:] 210 211 # 重新将处理后的消息转换为QQRichText对象,并获取其字符串表示 212 string_message = str(QQRichText.QQRichText(message)) 213 214 # 生成所有可能的命令前缀组合,包括命令起始符和别名 215 commands = [_ + self.command for _ in self.command_start] 216 if is_at: 217 # 如果消息前面有at,则不需要命令起始符 218 commands += [self.command] + [alias for alias in self.aliases] 219 220 # 添加所有别名的命令前缀组合 221 commands += [_ + alias for alias in self.aliases for _ in self.command_start] 222 223 if self.no_args: 224 # 检查消息是否以任何预设命令前缀开始 225 if any(string_message == _ for _ in commands): 226 # 移除命令前缀 227 for start in self.command_start: 228 if string_message.startswith(start): 229 string_message = string_message[len(start):] 230 break 231 # 替换别名为主命令 232 for alias in self.aliases: 233 if string_message == alias: 234 string_message = self.command + string_message[len(alias):] 235 break 236 else: 237 return False 238 239 else: 240 # 检查消息是否以任何预设命令前缀开始 241 if any(string_message.startswith(_) for _ in commands): 242 # 移除命令前缀 243 for start in self.command_start: 244 if string_message.startswith(start): 245 string_message = string_message[len(start):] 246 break 247 # 替换别名为主命令 248 for alias in self.aliases: 249 if string_message.startswith(alias): 250 string_message = self.command + string_message[len(alias):] 251 break 252 else: 253 return False 254 255 # 更新消息对象 256 message = QQRichText.QQRichText(string_message) 257 258 # 将回复消息段添加到消息段列表中(如果有) 259 if reply_segment is not None: 260 message.rich_array.insert(0, reply_segment) 261 262 event_data.message = message 263 event_data.raw_message = string_message 264 return True
匹配事件
Arguments:
- event_data: 事件数据
Returns:
是否匹配到事件
to_me =
<FuncRule object>
class
Matcher:
290class Matcher: 291 """ 292 事件处理器 293 """ 294 295 def __init__(self): 296 self.handlers = [] 297 298 def register_handler(self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs): 299 """ 300 注册事件处理器 301 如果注册的处理器返回True,则事件传播将被阻断 302 Args: 303 priority: 事件优先级 304 rules: 匹配规则 305 """ 306 if rules is None: 307 rules = [] 308 if any(not isinstance(rule, Rule) for rule in rules): 309 raise TypeError("rules must be a list of Rule") 310 311 def wrapper(func): 312 self.handlers.append((priority, rules, func, args, kwargs)) 313 return func 314 315 return wrapper 316 317 def match(self, event_data: EventClassifier.Event, plugin_data: dict): 318 """ 319 匹配事件处理器 320 Args: 321 event_data: 事件数据 322 plugin_data: 插件数据 323 """ 324 for priority, rules, handler, args, kwargs in sorted(self.handlers, key=lambda x: x[0], reverse=True): 325 try: 326 if not all(rule.match(event_data) for rule in rules): 327 continue 328 329 # 检测依赖注入 330 handler_kwargs = kwargs.copy() # 复制静态 kwargs 331 332 sig = inspect.signature(handler) 333 334 for name, param in sig.parameters.items(): 335 if name == "state": 336 if (isinstance(event_data, EventClassifier.MessageEvent) or 337 isinstance(event_data, EventClassifier.PrivateMessageEvent)): 338 state_id = f"u{event_data.user_id}" 339 elif isinstance(event_data, EventClassifier.GroupMessageEvent): 340 state_id = f"g{event_data.group_id}_u{event_data.user_id}" 341 else: 342 raise TypeError("event_data must be a MessageEvent") 343 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 344 elif name == "user_state": 345 if isinstance(event_data, EventClassifier.MessageEvent): 346 state_id = f"u{event_data.user_id}" 347 else: 348 raise TypeError("event_data must be a MessageEvent") 349 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 350 elif name == "group_state": 351 if isinstance(event_data, EventClassifier.GroupMessageEvent): 352 state_id = f"g{event_data.group_id}" 353 else: 354 raise TypeError("event_data must be a MessageEvent") 355 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 356 357 result = handler(event_data, *args, **handler_kwargs) 358 359 if result is True: 360 logger.debug(f"处理器 {handler.__name__} 阻断了事件 {event_data} 的传播") 361 return # 阻断同一 Matcher 内的传播 362 except Exception as e: 363 if ConfigManager.GlobalConfig().debug.save_dump: 364 dump_path = save_exc_dump(f"执行匹配事件或执行处理器时出错 {event_data}") 365 else: 366 dump_path = None 367 logger.error( 368 f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n" 369 f"{traceback.format_exc()}" 370 f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" 371 )
事件处理器
298 def register_handler(self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs): 299 """ 300 注册事件处理器 301 如果注册的处理器返回True,则事件传播将被阻断 302 Args: 303 priority: 事件优先级 304 rules: 匹配规则 305 """ 306 if rules is None: 307 rules = [] 308 if any(not isinstance(rule, Rule) for rule in rules): 309 raise TypeError("rules must be a list of Rule") 310 311 def wrapper(func): 312 self.handlers.append((priority, rules, func, args, kwargs)) 313 return func 314 315 return wrapper
注册事件处理器 如果注册的处理器返回True,则事件传播将被阻断
Arguments:
- priority: 事件优先级
- rules: 匹配规则
317 def match(self, event_data: EventClassifier.Event, plugin_data: dict): 318 """ 319 匹配事件处理器 320 Args: 321 event_data: 事件数据 322 plugin_data: 插件数据 323 """ 324 for priority, rules, handler, args, kwargs in sorted(self.handlers, key=lambda x: x[0], reverse=True): 325 try: 326 if not all(rule.match(event_data) for rule in rules): 327 continue 328 329 # 检测依赖注入 330 handler_kwargs = kwargs.copy() # 复制静态 kwargs 331 332 sig = inspect.signature(handler) 333 334 for name, param in sig.parameters.items(): 335 if name == "state": 336 if (isinstance(event_data, EventClassifier.MessageEvent) or 337 isinstance(event_data, EventClassifier.PrivateMessageEvent)): 338 state_id = f"u{event_data.user_id}" 339 elif isinstance(event_data, EventClassifier.GroupMessageEvent): 340 state_id = f"g{event_data.group_id}_u{event_data.user_id}" 341 else: 342 raise TypeError("event_data must be a MessageEvent") 343 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 344 elif name == "user_state": 345 if isinstance(event_data, EventClassifier.MessageEvent): 346 state_id = f"u{event_data.user_id}" 347 else: 348 raise TypeError("event_data must be a MessageEvent") 349 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 350 elif name == "group_state": 351 if isinstance(event_data, EventClassifier.GroupMessageEvent): 352 state_id = f"g{event_data.group_id}" 353 else: 354 raise TypeError("event_data must be a MessageEvent") 355 handler_kwargs[name] = StateManager.get_state(state_id, plugin_data) 356 357 result = handler(event_data, *args, **handler_kwargs) 358 359 if result is True: 360 logger.debug(f"处理器 {handler.__name__} 阻断了事件 {event_data} 的传播") 361 return # 阻断同一 Matcher 内的传播 362 except Exception as e: 363 if ConfigManager.GlobalConfig().debug.save_dump: 364 dump_path = save_exc_dump(f"执行匹配事件或执行处理器时出错 {event_data}") 365 else: 366 dump_path = None 367 logger.error( 368 f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n" 369 f"{traceback.format_exc()}" 370 f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}" 371 )
匹配事件处理器
Arguments:
- event_data: 事件数据
- plugin_data: 插件数据
events_matchers: dict[str, dict[typing.Type[Lib.utils.EventClassifier.Event], list[tuple[int, list[Rule], Matcher]]]] =
{}
def
on_event( event: Type[Lib.utils.EventClassifier.Event], priority: int = 0, rules: list[Rule] = None):
385def on_event(event: Type[EventClassifier.Event], priority: int = 0, rules: list[Rule] = None): 386 """ 387 注册事件处理器 388 Args: 389 event: 事件类型 390 priority: 事件优先级 391 rules: 匹配规则 392 Returns: 393 事件处理器 394 """ 395 if rules is None: 396 rules = [] 397 if any(not isinstance(rule, Rule) for rule in rules): 398 raise TypeError("rules must be a list of Rule") 399 if not issubclass(event, EventClassifier.Event): 400 raise TypeError("event must be an instance of EventClassifier.Event") 401 plugin_data = PluginManager.get_caller_plugin_data() 402 path = plugin_data["path"] 403 if path not in events_matchers: 404 events_matchers[path] = {} 405 if event not in events_matchers[path]: 406 events_matchers[path][event] = [] 407 EventManager.event_listener(event, path=path, event_type=event, plugin_data=plugin_data)(_on_event) 408 events_matcher = Matcher() 409 events_matchers[path][event].append((priority, rules, events_matcher)) 410 return events_matcher
注册事件处理器
Arguments:
- event: 事件类型
- priority: 事件优先级
- rules: 匹配规则
Returns:
事件处理器