Lib.utils.EventHandlers

事件处理器

  1"""
  2事件处理器
  3"""
  4import copy
  5import traceback
  6
  7from Lib.common import save_exc_dump
  8from Lib.core import EventManager, ConfigManager, PluginManager
  9from Lib.utils import EventClassifier, Logger, QQRichText, StateManager
 10
 11import inspect
 12from typing import Literal, Callable, Any, Type
 13
 14logger = Logger.get_logger()
 15
 16
 17class Rule:
 18    """
 19    Rule基类,请勿直接使用
 20    """
 21
 22    def match(self, event_data: EventClassifier.Event):
 23        """
 24        匹配事件
 25        Args:
 26            event_data: 事件数据
 27        Returns:
 28            是否匹配到事件
 29        """
 30        pass
 31
 32
 33class AnyRule(Rule):
 34    """
 35    输入n个rule,若匹配其中任意一个则返回True
 36    """
 37
 38    def __init__(self, *rules: Rule):
 39        self.rules = rules
 40
 41    def match(self, event_data: EventClassifier.Event):
 42        return any(rule.match(event_data) for rule in self.rules)
 43
 44
 45class AllRule(Rule):
 46    """
 47    输入n个rule,若匹配所有则返回True
 48    """
 49
 50    def __init__(self, *rules: Rule):
 51        self.rules = rules
 52
 53    def match(self, event_data: EventClassifier.Event):
 54        return all(rule.match(event_data) for rule in self.rules)
 55
 56
 57class KeyValueRule(Rule):
 58    """
 59    键值规则
 60    检测event data中的某个键的值是否满足要求
 61    """
 62
 63    def __init__(self, key, value, model: Literal["eq", "ne", "in", "not in", "func"],
 64                 func: Callable[[Any, Any], bool] = None):
 65        """
 66        Args:
 67            key: 键
 68            value: 值
 69            model: 匹配模式(可选: eq, ne, in, not in, func)
 70            func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool)
 71        """
 72        self.key = key
 73        self.value = value
 74        self.model = model
 75        if model == "func" and func is None:
 76            raise ValueError("if model is func, func must be a callable")
 77        self.func = func
 78
 79    def match(self, event_data: EventClassifier.Event):
 80        try:
 81            match self.model:
 82                case "eq":
 83                    return event_data.get(self.key) == self.value
 84                case "ne":
 85                    return event_data.get(self.key) != self.value
 86                case "in":
 87                    return self.value in event_data.get(self.key)
 88                case "not in":
 89                    return self.value not in event_data.get(self.key)
 90                case "func":
 91                    return self.func(event_data.get(self.key), self.value)
 92        except Exception as e:
 93            if ConfigManager.GlobalConfig().debug.save_dump:
 94                dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}")
 95            else:
 96                dump_path = None
 97            logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n"
 98                         f"{traceback.format_exc()}"
 99                         f"{f"已保存异常到 {dump_path}" if dump_path else ""}")
100            return False
101
102
103class FuncRule(Rule):
104    """
105    函数规则
106    检测event data是否满足函数
107    """
108
109    def __init__(self, func: Callable[[Any], bool]):
110        """
111        Args:
112            func: 用于检测函数(输入为 event_data, 返回 bool)
113        """
114        self.func = func
115
116    def match(self, event_data: EventClassifier.Event):
117        try:
118            return self.func(event_data)
119        except Exception as e:
120            if ConfigManager.GlobalConfig().debug.save_dump:
121                dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}")
122            else:
123                dump_path = None
124
125            logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n"
126                         f"{traceback.format_exc()}"
127                         f"{f"已保存异常到 {dump_path}" if dump_path else ""}"
128                         )
129            return False
130
131
132class CommandRule(Rule):
133    """
134    命令规则
135    用于匹配命令
136
137    默认匹配:命令起始符 + 命令 和 命令起始符 + 别名。
138    若消息前带有 @bot 时,可直接匹配 命令本身 和 别名,无需命令起始符。
139
140    会自动移除消息中的 @bot 和命令起始符,同时会自动将 别名 替换为 命令本身,以简化插件处理逻辑。
141    """
142
143    def __init__(
144            self,
145            command: str,
146            aliases: set[str] = None,
147            command_start: list[str] = None,
148            reply: bool = False,
149            no_args: bool = False,
150    ):
151        """
152        Args:
153            command: 命令
154            aliases: 命令别名
155            command_start: 命令起始符(不填写默认为配置文件中的command_start)
156            reply: 是否可包含回复(默认否)
157            no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容)
158        """
159        if aliases is None:
160            aliases = set()
161        if command_start is None:
162            command_start = ConfigManager.GlobalConfig().command.command_start
163        if any(_ in command and _ for _ in ['[', ']'] + command_start):
164            raise ValueError("command cannot contain [ or ]")
165        if command in aliases:
166            raise ValueError("command cannot be an alias")
167
168        self.command = command
169        self.aliases = aliases
170        self.command_start = command_start
171        self.reply = reply
172        self.no_args = no_args
173
174    def match(self, event_data: EventClassifier.MessageEvent):
175        # 检查是否是消息事件
176        if not isinstance(event_data, EventClassifier.MessageEvent):
177            logger.warning(f"event {event_data} is not a MessageEvent, cannot match command")
178            return False
179
180        # 复制一份消息段
181        segments = copy.deepcopy(event_data.message.rich_array)
182
183        # 初始化是否@了机器人以及回复消息段的变量
184        is_at = False
185        reply_segment = None
186
187        # 检查消息是否以回复形式开始
188        if (
189                self.reply and
190                len(segments) > 0 and
191                isinstance(segments[0], QQRichText.Reply)
192        ):
193            reply_segment = segments[0]
194            segments = segments[1:]
195
196        # 检查消息是否以@机器人开始
197        if (
198                len(segments) > 0 and
199                isinstance(segments[0], QQRichText.At) and
200                int(segments[0].data.get("qq")) == event_data.self_id
201        ):
202            segments = segments[1:]
203            is_at = True
204
205        # 将消息段转换为字符串消息,并去除前导空格
206        message = str(QQRichText.QQRichText(segments))
207        while len(message) > 0 and message[0] == " ":
208            message = message[1:]
209
210        # 重新将处理后的消息转换为QQRichText对象,并获取其字符串表示
211        string_message = str(QQRichText.QQRichText(message))
212
213        # 生成所有可能的命令前缀组合,包括命令起始符和别名
214        commands = [_ + self.command for _ in self.command_start]
215        if is_at:
216            # 如果消息前面有at,则不需要命令起始符
217            commands += [self.command] + [alias for alias in self.aliases]
218
219        # 添加所有别名的命令前缀组合
220        commands += [_ + alias for alias in self.aliases for _ in self.command_start]
221
222        if self.no_args:
223            # 检查消息是否以任何预设命令前缀开始
224            if any(string_message == _ for _ in commands):
225                # 移除命令前缀
226                for start in self.command_start:
227                    if string_message.startswith(start):
228                        string_message = string_message[len(start):]
229                        break
230                # 替换别名为主命令
231                for alias in self.aliases:
232                    if string_message == alias:
233                        string_message = self.command + string_message[len(alias):]
234                        break
235            else:
236                return False
237
238        else:
239            # 检查消息是否以任何预设命令前缀开始
240            if any(string_message.startswith(_) for _ in commands):
241                # 移除命令前缀
242                for start in self.command_start:
243                    if string_message.startswith(start):
244                        string_message = string_message[len(start):]
245                        break
246                # 替换别名为主命令
247                for alias in self.aliases:
248                    if string_message.startswith(alias):
249                        string_message = self.command + string_message[len(alias):]
250                        break
251            else:
252                return False
253
254        # 更新消息对象
255        message = QQRichText.QQRichText(string_message)
256
257        # 将回复消息段添加到消息段列表中(如果有)
258        if reply_segment is not None:
259            message.rich_array.insert(0, reply_segment)
260
261        event_data.message = message
262        event_data.raw_message = string_message
263        return True
264
265
266def _to_me(event_data: EventClassifier.MessageEvent):
267    """
268    判断是否是@自己或是私聊
269    Args:
270        event_data: 事件数据
271    Returns:
272        是否是@自己或是私聊
273    """
274    if not isinstance(event_data, EventClassifier.MessageEvent):
275        logger.warning(f"event {event_data} is not a MessageEvent, cannot match to_me")
276        return False
277    if isinstance(event_data, EventClassifier.PrivateMessageEvent):
278        return True
279    if isinstance(event_data, EventClassifier.GroupMessageEvent):
280        for rich in event_data.message.rich_array:
281            if isinstance(rich, QQRichText.At) and int(rich.data.get("qq")) == event_data.self_id:
282                return True
283    return False
284
285
286to_me = FuncRule(_to_me)
287
288
289class Matcher:
290    """
291    事件处理器
292    """
293
294    def __init__(self):
295        self.handlers = []
296
297    def register_handler(self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs):
298        """
299        注册事件处理器
300        如果注册的处理器返回True,则事件传播将被阻断
301        Args:
302            priority: 事件优先级
303            rules: 匹配规则
304        """
305        if rules is None:
306            rules = []
307        if any(not isinstance(rule, Rule) for rule in rules):
308            raise TypeError("rules must be a list of Rule")
309
310        def wrapper(func):
311            self.handlers.append((priority, rules, func, args, kwargs))
312            return func
313
314        return wrapper
315
316    def match(self, event_data: EventClassifier.Event, plugin_data: dict):
317        """
318        匹配事件处理器
319        Args:
320            event_data: 事件数据
321            plugin_data: 插件数据
322        """
323        for priority, rules, handler, args, kwargs in sorted(self.handlers, key=lambda x: x[0], reverse=True):
324            try:
325                if not all(rule.match(event_data) for rule in rules):
326                    continue
327
328                # 检测依赖注入
329                handler_kwargs = kwargs.copy()  # 复制静态 kwargs
330
331                sig = inspect.signature(handler)
332
333                for name, param in sig.parameters.items():
334                    if name == "state":
335                        if (isinstance(event_data, EventClassifier.MessageEvent) or
336                                isinstance(event_data, EventClassifier.PrivateMessageEvent)):
337                            state_id = f"u{event_data.user_id}"
338                        elif isinstance(event_data, EventClassifier.GroupMessageEvent):
339                            state_id = f"g{event_data.group_id}_u{event_data.user_id}"
340                        else:
341                            raise TypeError("event_data must be a MessageEvent")
342                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
343                    elif name == "user_state":
344                        if isinstance(event_data, EventClassifier.MessageEvent):
345                            state_id = f"u{event_data.user_id}"
346                        else:
347                            raise TypeError("event_data must be a MessageEvent")
348                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
349                    elif name == "group_state":
350                        if isinstance(event_data, EventClassifier.GroupMessageEvent):
351                            state_id = f"g{event_data.group_id}"
352                        else:
353                            raise TypeError("event_data must be a MessageEvent")
354                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
355
356                result = handler(event_data, *args, **handler_kwargs)
357
358                if result is True:
359                    logger.debug(f"处理器 {handler.__name__} 阻断了事件 {event_data} 的传播")
360                    return  # 阻断同一 Matcher 内的传播
361            except Exception as e:
362                if ConfigManager.GlobalConfig().debug.save_dump:
363                    dump_path = save_exc_dump(f"执行匹配事件或执行处理器时出错 {event_data}")
364                else:
365                    dump_path = None
366                logger.error(
367                    f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n"
368                    f"{traceback.format_exc()}"
369                    f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}"
370                )
371
372
373events_matchers: dict[str, dict[Type[EventClassifier.Event], list[tuple[int, list[Rule], Matcher]]]] = {}
374
375
376def _on_event(event_data, path, event_type, plugin_data):
377    matchers = events_matchers[path][event_type]
378    for priority, rules, matcher in sorted(matchers, key=lambda x: x[0], reverse=True):
379        matcher_event_data = event_data.__class__(event_data.event_data)
380        if all(rule.match(matcher_event_data) for rule in rules):
381            matcher.match(matcher_event_data, plugin_data)
382
383
384def on_event(event: Type[EventClassifier.Event], priority: int = 0, rules: list[Rule] = None):
385    """
386    注册事件处理器
387    Args:
388        event: 事件类型
389        priority: 事件优先级
390        rules: 匹配规则
391    Returns:
392        事件处理器
393    """
394    if rules is None:
395        rules = []
396    if any(not isinstance(rule, Rule) for rule in rules):
397        raise TypeError("rules must be a list of Rule")
398    if not issubclass(event, EventClassifier.Event):
399        raise TypeError("event must be an instance of EventClassifier.Event")
400    plugin_data = PluginManager.get_caller_plugin_data()
401    path = plugin_data["path"]
402    if path not in events_matchers:
403        events_matchers[path] = {}
404    if event not in events_matchers[path]:
405        events_matchers[path][event] = []
406        EventManager.event_listener(event, path=path, event_type=event, plugin_data=plugin_data)(_on_event)
407    events_matcher = Matcher()
408    events_matchers[path][event].append((priority, rules, events_matcher))
409    return events_matcher
logger = <RootLogger root (INFO)>
class Rule:
18class Rule:
19    """
20    Rule基类,请勿直接使用
21    """
22
23    def match(self, event_data: EventClassifier.Event):
24        """
25        匹配事件
26        Args:
27            event_data: 事件数据
28        Returns:
29            是否匹配到事件
30        """
31        pass

Rule基类,请勿直接使用

def match(self, event_data: Lib.utils.EventClassifier.Event):
23    def match(self, event_data: EventClassifier.Event):
24        """
25        匹配事件
26        Args:
27            event_data: 事件数据
28        Returns:
29            是否匹配到事件
30        """
31        pass

匹配事件

Arguments:
  • event_data: 事件数据
Returns:

是否匹配到事件

class AnyRule(Rule):
34class AnyRule(Rule):
35    """
36    输入n个rule,若匹配其中任意一个则返回True
37    """
38
39    def __init__(self, *rules: Rule):
40        self.rules = rules
41
42    def match(self, event_data: EventClassifier.Event):
43        return any(rule.match(event_data) for rule in self.rules)

输入n个rule,若匹配其中任意一个则返回True

AnyRule(*rules: Rule)
39    def __init__(self, *rules: Rule):
40        self.rules = rules
rules
def match(self, event_data: Lib.utils.EventClassifier.Event):
42    def match(self, event_data: EventClassifier.Event):
43        return any(rule.match(event_data) for rule in self.rules)

匹配事件

Arguments:
  • event_data: 事件数据
Returns:

是否匹配到事件

class AllRule(Rule):
46class AllRule(Rule):
47    """
48    输入n个rule,若匹配所有则返回True
49    """
50
51    def __init__(self, *rules: Rule):
52        self.rules = rules
53
54    def match(self, event_data: EventClassifier.Event):
55        return all(rule.match(event_data) for rule in self.rules)

输入n个rule,若匹配所有则返回True

AllRule(*rules: Rule)
51    def __init__(self, *rules: Rule):
52        self.rules = rules
rules
def match(self, event_data: Lib.utils.EventClassifier.Event):
54    def match(self, event_data: EventClassifier.Event):
55        return all(rule.match(event_data) for rule in self.rules)

匹配事件

Arguments:
  • event_data: 事件数据
Returns:

是否匹配到事件

class KeyValueRule(Rule):
 58class KeyValueRule(Rule):
 59    """
 60    键值规则
 61    检测event data中的某个键的值是否满足要求
 62    """
 63
 64    def __init__(self, key, value, model: Literal["eq", "ne", "in", "not in", "func"],
 65                 func: Callable[[Any, Any], bool] = None):
 66        """
 67        Args:
 68            key: 键
 69            value: 值
 70            model: 匹配模式(可选: eq, ne, in, not in, func)
 71            func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool)
 72        """
 73        self.key = key
 74        self.value = value
 75        self.model = model
 76        if model == "func" and func is None:
 77            raise ValueError("if model is func, func must be a callable")
 78        self.func = func
 79
 80    def match(self, event_data: EventClassifier.Event):
 81        try:
 82            match self.model:
 83                case "eq":
 84                    return event_data.get(self.key) == self.value
 85                case "ne":
 86                    return event_data.get(self.key) != self.value
 87                case "in":
 88                    return self.value in event_data.get(self.key)
 89                case "not in":
 90                    return self.value not in event_data.get(self.key)
 91                case "func":
 92                    return self.func(event_data.get(self.key), self.value)
 93        except Exception as e:
 94            if ConfigManager.GlobalConfig().debug.save_dump:
 95                dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}")
 96            else:
 97                dump_path = None
 98            logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n"
 99                         f"{traceback.format_exc()}"
100                         f"{f"已保存异常到 {dump_path}" if dump_path else ""}")
101            return False

键值规则 检测event data中的某个键的值是否满足要求

KeyValueRule( key, value, model: Literal['eq', 'ne', 'in', 'not in', 'func'], func: Callable[[Any, Any], bool] = None)
64    def __init__(self, key, value, model: Literal["eq", "ne", "in", "not in", "func"],
65                 func: Callable[[Any, Any], bool] = None):
66        """
67        Args:
68            key: 键
69            value: 值
70            model: 匹配模式(可选: eq, ne, in, not in, func)
71            func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool)
72        """
73        self.key = key
74        self.value = value
75        self.model = model
76        if model == "func" and func is None:
77            raise ValueError("if model is func, func must be a callable")
78        self.func = func
Arguments:
  • key:
  • value:
  • model: 匹配模式(可选: eq, ne, in, not in, func)
  • func: 函数(仅在 model 为 func 时有效,输入为 (event_data.get(key), value),返回 bool)
key
value
model
func
def match(self, event_data: Lib.utils.EventClassifier.Event):
 80    def match(self, event_data: EventClassifier.Event):
 81        try:
 82            match self.model:
 83                case "eq":
 84                    return event_data.get(self.key) == self.value
 85                case "ne":
 86                    return event_data.get(self.key) != self.value
 87                case "in":
 88                    return self.value in event_data.get(self.key)
 89                case "not in":
 90                    return self.value not in event_data.get(self.key)
 91                case "func":
 92                    return self.func(event_data.get(self.key), self.value)
 93        except Exception as e:
 94            if ConfigManager.GlobalConfig().debug.save_dump:
 95                dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}")
 96            else:
 97                dump_path = None
 98            logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n"
 99                         f"{traceback.format_exc()}"
100                         f"{f"已保存异常到 {dump_path}" if dump_path else ""}")
101            return False

匹配事件

Arguments:
  • event_data: 事件数据
Returns:

是否匹配到事件

class FuncRule(Rule):
104class FuncRule(Rule):
105    """
106    函数规则
107    检测event data是否满足函数
108    """
109
110    def __init__(self, func: Callable[[Any], bool]):
111        """
112        Args:
113            func: 用于检测函数(输入为 event_data, 返回 bool)
114        """
115        self.func = func
116
117    def match(self, event_data: EventClassifier.Event):
118        try:
119            return self.func(event_data)
120        except Exception as e:
121            if ConfigManager.GlobalConfig().debug.save_dump:
122                dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}")
123            else:
124                dump_path = None
125
126            logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n"
127                         f"{traceback.format_exc()}"
128                         f"{f"已保存异常到 {dump_path}" if dump_path else ""}"
129                         )
130            return False

函数规则 检测event data是否满足函数

FuncRule(func: Callable[[Any], bool])
110    def __init__(self, func: Callable[[Any], bool]):
111        """
112        Args:
113            func: 用于检测函数(输入为 event_data, 返回 bool)
114        """
115        self.func = func
Arguments:
  • func: 用于检测函数(输入为 event_data, 返回 bool)
func
def match(self, event_data: Lib.utils.EventClassifier.Event):
117    def match(self, event_data: EventClassifier.Event):
118        try:
119            return self.func(event_data)
120        except Exception as e:
121            if ConfigManager.GlobalConfig().debug.save_dump:
122                dump_path = save_exc_dump(f"执行匹配事件器时出错 {event_data}")
123            else:
124                dump_path = None
125
126            logger.error(f"执行匹配事件器时出错 {event_data}: {repr(e)}\n"
127                         f"{traceback.format_exc()}"
128                         f"{f"已保存异常到 {dump_path}" if dump_path else ""}"
129                         )
130            return False

匹配事件

Arguments:
  • event_data: 事件数据
Returns:

是否匹配到事件

class CommandRule(Rule):
133class CommandRule(Rule):
134    """
135    命令规则
136    用于匹配命令
137
138    默认匹配:命令起始符 + 命令 和 命令起始符 + 别名。
139    若消息前带有 @bot 时,可直接匹配 命令本身 和 别名,无需命令起始符。
140
141    会自动移除消息中的 @bot 和命令起始符,同时会自动将 别名 替换为 命令本身,以简化插件处理逻辑。
142    """
143
144    def __init__(
145            self,
146            command: str,
147            aliases: set[str] = None,
148            command_start: list[str] = None,
149            reply: bool = False,
150            no_args: bool = False,
151    ):
152        """
153        Args:
154            command: 命令
155            aliases: 命令别名
156            command_start: 命令起始符(不填写默认为配置文件中的command_start)
157            reply: 是否可包含回复(默认否)
158            no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容)
159        """
160        if aliases is None:
161            aliases = set()
162        if command_start is None:
163            command_start = ConfigManager.GlobalConfig().command.command_start
164        if any(_ in command and _ for _ in ['[', ']'] + command_start):
165            raise ValueError("command cannot contain [ or ]")
166        if command in aliases:
167            raise ValueError("command cannot be an alias")
168
169        self.command = command
170        self.aliases = aliases
171        self.command_start = command_start
172        self.reply = reply
173        self.no_args = no_args
174
175    def match(self, event_data: EventClassifier.MessageEvent):
176        # 检查是否是消息事件
177        if not isinstance(event_data, EventClassifier.MessageEvent):
178            logger.warning(f"event {event_data} is not a MessageEvent, cannot match command")
179            return False
180
181        # 复制一份消息段
182        segments = copy.deepcopy(event_data.message.rich_array)
183
184        # 初始化是否@了机器人以及回复消息段的变量
185        is_at = False
186        reply_segment = None
187
188        # 检查消息是否以回复形式开始
189        if (
190                self.reply and
191                len(segments) > 0 and
192                isinstance(segments[0], QQRichText.Reply)
193        ):
194            reply_segment = segments[0]
195            segments = segments[1:]
196
197        # 检查消息是否以@机器人开始
198        if (
199                len(segments) > 0 and
200                isinstance(segments[0], QQRichText.At) and
201                int(segments[0].data.get("qq")) == event_data.self_id
202        ):
203            segments = segments[1:]
204            is_at = True
205
206        # 将消息段转换为字符串消息,并去除前导空格
207        message = str(QQRichText.QQRichText(segments))
208        while len(message) > 0 and message[0] == " ":
209            message = message[1:]
210
211        # 重新将处理后的消息转换为QQRichText对象,并获取其字符串表示
212        string_message = str(QQRichText.QQRichText(message))
213
214        # 生成所有可能的命令前缀组合,包括命令起始符和别名
215        commands = [_ + self.command for _ in self.command_start]
216        if is_at:
217            # 如果消息前面有at,则不需要命令起始符
218            commands += [self.command] + [alias for alias in self.aliases]
219
220        # 添加所有别名的命令前缀组合
221        commands += [_ + alias for alias in self.aliases for _ in self.command_start]
222
223        if self.no_args:
224            # 检查消息是否以任何预设命令前缀开始
225            if any(string_message == _ for _ in commands):
226                # 移除命令前缀
227                for start in self.command_start:
228                    if string_message.startswith(start):
229                        string_message = string_message[len(start):]
230                        break
231                # 替换别名为主命令
232                for alias in self.aliases:
233                    if string_message == alias:
234                        string_message = self.command + string_message[len(alias):]
235                        break
236            else:
237                return False
238
239        else:
240            # 检查消息是否以任何预设命令前缀开始
241            if any(string_message.startswith(_) for _ in commands):
242                # 移除命令前缀
243                for start in self.command_start:
244                    if string_message.startswith(start):
245                        string_message = string_message[len(start):]
246                        break
247                # 替换别名为主命令
248                for alias in self.aliases:
249                    if string_message.startswith(alias):
250                        string_message = self.command + string_message[len(alias):]
251                        break
252            else:
253                return False
254
255        # 更新消息对象
256        message = QQRichText.QQRichText(string_message)
257
258        # 将回复消息段添加到消息段列表中(如果有)
259        if reply_segment is not None:
260            message.rich_array.insert(0, reply_segment)
261
262        event_data.message = message
263        event_data.raw_message = string_message
264        return True

命令规则 用于匹配命令

默认匹配:命令起始符 + 命令 和 命令起始符 + 别名。 若消息前带有 @bot 时,可直接匹配 命令本身 和 别名,无需命令起始符。

会自动移除消息中的 @bot 和命令起始符,同时会自动将 别名 替换为 命令本身,以简化插件处理逻辑。

CommandRule( command: str, aliases: set[str] = None, command_start: list[str] = None, reply: bool = False, no_args: bool = False)
144    def __init__(
145            self,
146            command: str,
147            aliases: set[str] = None,
148            command_start: list[str] = None,
149            reply: bool = False,
150            no_args: bool = False,
151    ):
152        """
153        Args:
154            command: 命令
155            aliases: 命令别名
156            command_start: 命令起始符(不填写默认为配置文件中的command_start)
157            reply: 是否可包含回复(默认否)
158            no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容)
159        """
160        if aliases is None:
161            aliases = set()
162        if command_start is None:
163            command_start = ConfigManager.GlobalConfig().command.command_start
164        if any(_ in command and _ for _ in ['[', ']'] + command_start):
165            raise ValueError("command cannot contain [ or ]")
166        if command in aliases:
167            raise ValueError("command cannot be an alias")
168
169        self.command = command
170        self.aliases = aliases
171        self.command_start = command_start
172        self.reply = reply
173        self.no_args = no_args
Arguments:
  • command: 命令
  • aliases: 命令别名
  • command_start: 命令起始符(不填写默认为配置文件中的command_start)
  • reply: 是否可包含回复(默认否)
  • no_args: 是否不需要命令参数(即消息只能完全匹配命令,不包含其他的内容)
command
aliases
command_start
reply
no_args
def match(self, event_data: Lib.utils.EventClassifier.MessageEvent):
175    def match(self, event_data: EventClassifier.MessageEvent):
176        # 检查是否是消息事件
177        if not isinstance(event_data, EventClassifier.MessageEvent):
178            logger.warning(f"event {event_data} is not a MessageEvent, cannot match command")
179            return False
180
181        # 复制一份消息段
182        segments = copy.deepcopy(event_data.message.rich_array)
183
184        # 初始化是否@了机器人以及回复消息段的变量
185        is_at = False
186        reply_segment = None
187
188        # 检查消息是否以回复形式开始
189        if (
190                self.reply and
191                len(segments) > 0 and
192                isinstance(segments[0], QQRichText.Reply)
193        ):
194            reply_segment = segments[0]
195            segments = segments[1:]
196
197        # 检查消息是否以@机器人开始
198        if (
199                len(segments) > 0 and
200                isinstance(segments[0], QQRichText.At) and
201                int(segments[0].data.get("qq")) == event_data.self_id
202        ):
203            segments = segments[1:]
204            is_at = True
205
206        # 将消息段转换为字符串消息,并去除前导空格
207        message = str(QQRichText.QQRichText(segments))
208        while len(message) > 0 and message[0] == " ":
209            message = message[1:]
210
211        # 重新将处理后的消息转换为QQRichText对象,并获取其字符串表示
212        string_message = str(QQRichText.QQRichText(message))
213
214        # 生成所有可能的命令前缀组合,包括命令起始符和别名
215        commands = [_ + self.command for _ in self.command_start]
216        if is_at:
217            # 如果消息前面有at,则不需要命令起始符
218            commands += [self.command] + [alias for alias in self.aliases]
219
220        # 添加所有别名的命令前缀组合
221        commands += [_ + alias for alias in self.aliases for _ in self.command_start]
222
223        if self.no_args:
224            # 检查消息是否以任何预设命令前缀开始
225            if any(string_message == _ for _ in commands):
226                # 移除命令前缀
227                for start in self.command_start:
228                    if string_message.startswith(start):
229                        string_message = string_message[len(start):]
230                        break
231                # 替换别名为主命令
232                for alias in self.aliases:
233                    if string_message == alias:
234                        string_message = self.command + string_message[len(alias):]
235                        break
236            else:
237                return False
238
239        else:
240            # 检查消息是否以任何预设命令前缀开始
241            if any(string_message.startswith(_) for _ in commands):
242                # 移除命令前缀
243                for start in self.command_start:
244                    if string_message.startswith(start):
245                        string_message = string_message[len(start):]
246                        break
247                # 替换别名为主命令
248                for alias in self.aliases:
249                    if string_message.startswith(alias):
250                        string_message = self.command + string_message[len(alias):]
251                        break
252            else:
253                return False
254
255        # 更新消息对象
256        message = QQRichText.QQRichText(string_message)
257
258        # 将回复消息段添加到消息段列表中(如果有)
259        if reply_segment is not None:
260            message.rich_array.insert(0, reply_segment)
261
262        event_data.message = message
263        event_data.raw_message = string_message
264        return True

匹配事件

Arguments:
  • event_data: 事件数据
Returns:

是否匹配到事件

to_me = <FuncRule object>
class Matcher:
290class Matcher:
291    """
292    事件处理器
293    """
294
295    def __init__(self):
296        self.handlers = []
297
298    def register_handler(self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs):
299        """
300        注册事件处理器
301        如果注册的处理器返回True,则事件传播将被阻断
302        Args:
303            priority: 事件优先级
304            rules: 匹配规则
305        """
306        if rules is None:
307            rules = []
308        if any(not isinstance(rule, Rule) for rule in rules):
309            raise TypeError("rules must be a list of Rule")
310
311        def wrapper(func):
312            self.handlers.append((priority, rules, func, args, kwargs))
313            return func
314
315        return wrapper
316
317    def match(self, event_data: EventClassifier.Event, plugin_data: dict):
318        """
319        匹配事件处理器
320        Args:
321            event_data: 事件数据
322            plugin_data: 插件数据
323        """
324        for priority, rules, handler, args, kwargs in sorted(self.handlers, key=lambda x: x[0], reverse=True):
325            try:
326                if not all(rule.match(event_data) for rule in rules):
327                    continue
328
329                # 检测依赖注入
330                handler_kwargs = kwargs.copy()  # 复制静态 kwargs
331
332                sig = inspect.signature(handler)
333
334                for name, param in sig.parameters.items():
335                    if name == "state":
336                        if (isinstance(event_data, EventClassifier.MessageEvent) or
337                                isinstance(event_data, EventClassifier.PrivateMessageEvent)):
338                            state_id = f"u{event_data.user_id}"
339                        elif isinstance(event_data, EventClassifier.GroupMessageEvent):
340                            state_id = f"g{event_data.group_id}_u{event_data.user_id}"
341                        else:
342                            raise TypeError("event_data must be a MessageEvent")
343                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
344                    elif name == "user_state":
345                        if isinstance(event_data, EventClassifier.MessageEvent):
346                            state_id = f"u{event_data.user_id}"
347                        else:
348                            raise TypeError("event_data must be a MessageEvent")
349                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
350                    elif name == "group_state":
351                        if isinstance(event_data, EventClassifier.GroupMessageEvent):
352                            state_id = f"g{event_data.group_id}"
353                        else:
354                            raise TypeError("event_data must be a MessageEvent")
355                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
356
357                result = handler(event_data, *args, **handler_kwargs)
358
359                if result is True:
360                    logger.debug(f"处理器 {handler.__name__} 阻断了事件 {event_data} 的传播")
361                    return  # 阻断同一 Matcher 内的传播
362            except Exception as e:
363                if ConfigManager.GlobalConfig().debug.save_dump:
364                    dump_path = save_exc_dump(f"执行匹配事件或执行处理器时出错 {event_data}")
365                else:
366                    dump_path = None
367                logger.error(
368                    f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n"
369                    f"{traceback.format_exc()}"
370                    f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}"
371                )

事件处理器

handlers
def register_handler( self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs):
298    def register_handler(self, priority: int = 0, rules: list[Rule] = None, *args, **kwargs):
299        """
300        注册事件处理器
301        如果注册的处理器返回True,则事件传播将被阻断
302        Args:
303            priority: 事件优先级
304            rules: 匹配规则
305        """
306        if rules is None:
307            rules = []
308        if any(not isinstance(rule, Rule) for rule in rules):
309            raise TypeError("rules must be a list of Rule")
310
311        def wrapper(func):
312            self.handlers.append((priority, rules, func, args, kwargs))
313            return func
314
315        return wrapper

注册事件处理器 如果注册的处理器返回True,则事件传播将被阻断

Arguments:
  • priority: 事件优先级
  • rules: 匹配规则
def match(self, event_data: Lib.utils.EventClassifier.Event, plugin_data: dict):
317    def match(self, event_data: EventClassifier.Event, plugin_data: dict):
318        """
319        匹配事件处理器
320        Args:
321            event_data: 事件数据
322            plugin_data: 插件数据
323        """
324        for priority, rules, handler, args, kwargs in sorted(self.handlers, key=lambda x: x[0], reverse=True):
325            try:
326                if not all(rule.match(event_data) for rule in rules):
327                    continue
328
329                # 检测依赖注入
330                handler_kwargs = kwargs.copy()  # 复制静态 kwargs
331
332                sig = inspect.signature(handler)
333
334                for name, param in sig.parameters.items():
335                    if name == "state":
336                        if (isinstance(event_data, EventClassifier.MessageEvent) or
337                                isinstance(event_data, EventClassifier.PrivateMessageEvent)):
338                            state_id = f"u{event_data.user_id}"
339                        elif isinstance(event_data, EventClassifier.GroupMessageEvent):
340                            state_id = f"g{event_data.group_id}_u{event_data.user_id}"
341                        else:
342                            raise TypeError("event_data must be a MessageEvent")
343                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
344                    elif name == "user_state":
345                        if isinstance(event_data, EventClassifier.MessageEvent):
346                            state_id = f"u{event_data.user_id}"
347                        else:
348                            raise TypeError("event_data must be a MessageEvent")
349                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
350                    elif name == "group_state":
351                        if isinstance(event_data, EventClassifier.GroupMessageEvent):
352                            state_id = f"g{event_data.group_id}"
353                        else:
354                            raise TypeError("event_data must be a MessageEvent")
355                        handler_kwargs[name] = StateManager.get_state(state_id, plugin_data)
356
357                result = handler(event_data, *args, **handler_kwargs)
358
359                if result is True:
360                    logger.debug(f"处理器 {handler.__name__} 阻断了事件 {event_data} 的传播")
361                    return  # 阻断同一 Matcher 内的传播
362            except Exception as e:
363                if ConfigManager.GlobalConfig().debug.save_dump:
364                    dump_path = save_exc_dump(f"执行匹配事件或执行处理器时出错 {event_data}")
365                else:
366                    dump_path = None
367                logger.error(
368                    f"执行匹配事件或执行处理器时出错 {event_data}: {repr(e)}\n"
369                    f"{traceback.format_exc()}"
370                    f"{f"\n已保存异常到 {dump_path}" if dump_path else ""}"
371                )

匹配事件处理器

Arguments:
  • event_data: 事件数据
  • plugin_data: 插件数据
events_matchers: dict[str, dict[typing.Type[Lib.utils.EventClassifier.Event], list[tuple[int, list[Rule], Matcher]]]] = {}
def on_event( event: Type[Lib.utils.EventClassifier.Event], priority: int = 0, rules: list[Rule] = None):
385def on_event(event: Type[EventClassifier.Event], priority: int = 0, rules: list[Rule] = None):
386    """
387    注册事件处理器
388    Args:
389        event: 事件类型
390        priority: 事件优先级
391        rules: 匹配规则
392    Returns:
393        事件处理器
394    """
395    if rules is None:
396        rules = []
397    if any(not isinstance(rule, Rule) for rule in rules):
398        raise TypeError("rules must be a list of Rule")
399    if not issubclass(event, EventClassifier.Event):
400        raise TypeError("event must be an instance of EventClassifier.Event")
401    plugin_data = PluginManager.get_caller_plugin_data()
402    path = plugin_data["path"]
403    if path not in events_matchers:
404        events_matchers[path] = {}
405    if event not in events_matchers[path]:
406        events_matchers[path][event] = []
407        EventManager.event_listener(event, path=path, event_type=event, plugin_data=plugin_data)(_on_event)
408    events_matcher = Matcher()
409    events_matchers[path][event].append((priority, rules, events_matcher))
410    return events_matcher

注册事件处理器

Arguments:
  • event: 事件类型
  • priority: 事件优先级
  • rules: 匹配规则
Returns:

事件处理器