Lib.common

工具

  1"""
  2工具
  3"""
  4import os.path
  5import shutil
  6import sys
  7import threading
  8import time
  9import traceback
 10import uuid
 11from collections import OrderedDict
 12from io import BytesIO
 13
 14import requests
 15
 16from .constants import *
 17from .utils import Logger
 18
 19logger = Logger.get_logger()
 20
 21
 22class LimitedSizeDict(OrderedDict):
 23    """
 24    带有限制大小的字典
 25    """
 26
 27    def __init__(self, max_size):
 28        self._max_size = max_size
 29        super().__init__()
 30
 31    def __setitem__(self, key, value):
 32        if key in self:
 33            del self[key]
 34        elif len(self) >= self._max_size:
 35            oldest_key = next(iter(self))
 36            del self[oldest_key]
 37        super().__setitem__(key, value)
 38
 39
 40def restart() -> None:
 41    """
 42    MRB2重启
 43    Returns:
 44        None
 45    """
 46    # 获取当前解释器路径
 47    p = sys.executable
 48    try:
 49        # 启动新程序(解释器路径, 当前程序)
 50        os.execl(p, p, *sys.argv)
 51    except OSError:
 52        # 关闭当前程序
 53        sys.exit()
 54
 55
 56def download_file_to_cache(url: str, headers=None, file_name: str = "",
 57                           download_path: str = None, stream=False, fake_headers: bool = True) -> str | None:
 58    """
 59    下载文件到缓存
 60    Args:
 61        url: 下载的url
 62        headers: 下载请求的请求头
 63        file_name: 文件名
 64        download_path: 下载路径
 65        stream: 是否使用流式传输
 66        fake_headers: 是否使用自动生成的假请求头
 67    Returns:
 68        文件路径
 69    """
 70    if headers is None:
 71        headers = {}
 72
 73    if fake_headers:
 74        headers["User-Agent"] = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
 75                                 "Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.42")
 76        headers["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8,da;q=0.7,ko;q=0.6"
 77        headers["Accept-Encoding"] = "gzip, deflate, br"
 78        headers["Accept"] = ("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,"
 79                             "application/signed-exchange;v=b3;q=0.7")
 80        headers["Connection"] = "keep-alive"
 81        headers["Upgrade-Insecure-Requests"] = "1"
 82        headers["Cache-Control"] = "max-age=0"
 83        headers["Sec-Fetch-Dest"] = "document"
 84        headers["Sec-Fetch-Mode"] = "navigate"
 85        headers["Sec-Fetch-Site"] = "none"
 86        headers["Sec-Fetch-User"] = "?1"
 87        headers["Sec-Ch-Ua"] = "\"Chromium\";v=\"113\", \"Not-A.Brand\";v=\"24\", \"Microsoft Edge\";v=\"113\""
 88        headers["Sec-Ch-Ua-Mobile"] = "?0"
 89        headers["Sec-Ch-Ua-Platform"] = "\"Windows\""
 90        headers["Host"] = url.split("/")[2]
 91
 92    # 路径拼接
 93    if file_name == "":
 94        file_name = uuid.uuid4().hex + ".cache"
 95
 96    if download_path is None:
 97        file_path = os.path.join(CACHE_PATH, file_name)
 98    else:
 99        file_path = os.path.join(download_path, file_name)
100
101    # 路径不存在特判
102    if not os.path.exists(CACHE_PATH):
103        os.makedirs(CACHE_PATH)
104
105    try:
106        # 下载
107        if stream:
108            with open(file_path, "wb") as f, requests.get(url, stream=True, headers=headers) as res:
109                for chunk in res.iter_content(chunk_size=64 * 1024):
110                    if not chunk:
111                        break
112                    f.write(chunk)
113        else:
114            # 不使用流式传输
115            res = requests.get(url, headers=headers)
116
117            with open(file_path, "wb") as f:
118                f.write(res.content)
119    except requests.exceptions.RequestException as e:
120        logger.warning(f"下载文件失败: {e}")
121        if os.path.exists(file_path):
122            os.remove(file_path)
123        return None
124
125    return file_path
126
127
128# 删除缓存文件
129def clean_cache() -> None:
130    """
131    清理缓存
132    Returns:
133        None
134    """
135    if os.path.exists(CACHE_PATH):
136        try:
137            shutil.rmtree(CACHE_PATH, ignore_errors=True)
138        except Exception as e:
139            logger.warning("删除缓存时报错,报错信息: %s" % repr(e))
140
141
142# 函数缓存
143def function_cache(max_size: int, expiration_time: int = -1):
144    """
145    函数缓存
146    Args:
147        max_size: 最大大小
148        expiration_time: 过期时间
149    Returns:
150        None
151    """
152    cache = LimitedSizeDict(max_size)
153
154    def cache_decorator(func):
155        """
156        缓存装饰器
157        Args:
158            @param func:
159        Returns:
160            None
161        """
162
163        def wrapper(*args, **kwargs):
164            key = str(func.__name__) + str(args) + str(kwargs)
165            if key in cache and (expiration_time == -1 or time.time() - cache[key][1] < expiration_time):
166                return cache[key][0]
167            result = func(*args, **kwargs)
168            cache[key] = (result, time.time())
169            return result
170
171        def clear_cache():
172            """清理缓存"""
173            cache.clear()
174
175        def get_cache():
176            """获取缓存"""
177            return dict(cache)
178
179        def original_func(*args, **kwargs):
180            """调用原函数"""
181            return func(*args, **kwargs)
182
183        wrapper.clear_cache = clear_cache
184        wrapper.get_cache = get_cache
185        wrapper.original_func = original_func
186        return wrapper
187
188    return cache_decorator
189
190
191def thread_lock(func):
192    """
193    线程锁装饰器
194    """
195    thread_lock = threading.Lock()
196
197    def wrapper(*args, **kwargs):
198        with thread_lock:
199            return func(*args, **kwargs)
200
201    return wrapper
202
203
204def finalize_and_cleanup():
205    """
206    结束运行
207    @return:
208    """
209    logger.info("MuRainBot即将关闭,正在删除缓存")
210
211    clean_cache()
212
213    logger.warning("MuRainBot结束运行!")
214    logger.info("再见!\n")
215
216
217@thread_lock
218def save_exc_dump(description: str = None, path: str = None):
219    """
220    保存异常堆栈
221    Args:
222        description: 保存的dump描述,为空则默认
223        path: 保存的路径,为空则自动根据错误生成
224    """
225    try:
226        import coredumpy
227    except ImportError:
228        logger.warning("coredumpy未安装,无法保存异常堆栈")
229        return
230
231    try:
232        exc_type, exc_value, exc_traceback = sys.exc_info()
233        if not exc_traceback:
234            raise Exception("No traceback found")
235
236        # 遍历 traceback 链表,找到最后一个 frame (异常最初发生的位置)
237        current_tb = exc_traceback
238        frame = current_tb.tb_frame
239        while current_tb:
240            frame = current_tb.tb_frame
241            current_tb = current_tb.tb_next
242
243        i = 0
244        while True:
245            if i > 0:
246                path_ = os.path.join(DUMPS_PATH,
247                                     f"coredumpy_"
248                                     f"{time.strftime('%Y%m%d%H%M%S')}_"
249                                     f"{frame.f_code.co_name}_{i}.dump")
250            else:
251                path_ = os.path.join(DUMPS_PATH,
252                                     f"coredumpy_"
253                                     f"{time.strftime('%Y%m%d%H%M%S')}_"
254                                     f"{frame.f_code.co_name}.dump")
255            if not os.path.exists(path_):
256                break
257            i += 1
258
259        for _ in ['?', '*', '"', '<', '>']:
260            path_ = path_.replace(_, "")
261
262        kwargs = {
263            "frame": frame,
264            "path": os.path.normpath(path_)
265        }
266        if description:
267            kwargs["description"] = description
268        if path:
269            kwargs["path"] = path
270
271        coredumpy.dump(**kwargs)
272    except Exception as e:
273        logger.error(f"保存异常堆栈时发生错误: {repr(e)}\n"
274                     f"{traceback.format_exc()}")
275        return None
276
277    return kwargs["path"]
278
279
280def bytes_io_to_file(
281        io_bytes: BytesIO,
282        file_name: str | None = None,
283        file_type: str | None = None,
284        save_dir: str = CACHE_PATH
285):
286    """
287    将BytesIO对象保存成文件,并返回路径
288    Args:
289        io_bytes: BytesIO对象
290        file_name: 要保存的文件名,与file_type选一个填即可
291        file_type: 文件类型(扩展名),与file_name选一个填即可
292        save_dir: 保存的文件夹
293
294    Returns:
295        保存的文件路径
296    """
297    if not isinstance(io_bytes, BytesIO):
298        raise TypeError("bytes_io_to_file: 输入类型错误")
299    if file_name is None:
300        if file_type is None:
301            file_type = "cache"
302        file_name = uuid.uuid4().hex + "." + file_type
303    if not os.path.exists(save_dir):
304        os.makedirs(save_dir)
305
306    with open(os.path.join(save_dir, file_name), "wb") as f:
307        f.write(io_bytes.getvalue())
308    return os.path.join(save_dir, file_name)
logger = <RootLogger root (INFO)>
class LimitedSizeDict(collections.OrderedDict):
23class LimitedSizeDict(OrderedDict):
24    """
25    带有限制大小的字典
26    """
27
28    def __init__(self, max_size):
29        self._max_size = max_size
30        super().__init__()
31
32    def __setitem__(self, key, value):
33        if key in self:
34            del self[key]
35        elif len(self) >= self._max_size:
36            oldest_key = next(iter(self))
37            del self[oldest_key]
38        super().__setitem__(key, value)

带有限制大小的字典

def restart() -> None:
41def restart() -> None:
42    """
43    MRB2重启
44    Returns:
45        None
46    """
47    # 获取当前解释器路径
48    p = sys.executable
49    try:
50        # 启动新程序(解释器路径, 当前程序)
51        os.execl(p, p, *sys.argv)
52    except OSError:
53        # 关闭当前程序
54        sys.exit()

MRB2重启

Returns:

None

def download_file_to_cache( url: str, headers=None, file_name: str = '', download_path: str = None, stream=False, fake_headers: bool = True) -> str | None:
 57def download_file_to_cache(url: str, headers=None, file_name: str = "",
 58                           download_path: str = None, stream=False, fake_headers: bool = True) -> str | None:
 59    """
 60    下载文件到缓存
 61    Args:
 62        url: 下载的url
 63        headers: 下载请求的请求头
 64        file_name: 文件名
 65        download_path: 下载路径
 66        stream: 是否使用流式传输
 67        fake_headers: 是否使用自动生成的假请求头
 68    Returns:
 69        文件路径
 70    """
 71    if headers is None:
 72        headers = {}
 73
 74    if fake_headers:
 75        headers["User-Agent"] = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
 76                                 "Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.42")
 77        headers["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8,da;q=0.7,ko;q=0.6"
 78        headers["Accept-Encoding"] = "gzip, deflate, br"
 79        headers["Accept"] = ("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,"
 80                             "application/signed-exchange;v=b3;q=0.7")
 81        headers["Connection"] = "keep-alive"
 82        headers["Upgrade-Insecure-Requests"] = "1"
 83        headers["Cache-Control"] = "max-age=0"
 84        headers["Sec-Fetch-Dest"] = "document"
 85        headers["Sec-Fetch-Mode"] = "navigate"
 86        headers["Sec-Fetch-Site"] = "none"
 87        headers["Sec-Fetch-User"] = "?1"
 88        headers["Sec-Ch-Ua"] = "\"Chromium\";v=\"113\", \"Not-A.Brand\";v=\"24\", \"Microsoft Edge\";v=\"113\""
 89        headers["Sec-Ch-Ua-Mobile"] = "?0"
 90        headers["Sec-Ch-Ua-Platform"] = "\"Windows\""
 91        headers["Host"] = url.split("/")[2]
 92
 93    # 路径拼接
 94    if file_name == "":
 95        file_name = uuid.uuid4().hex + ".cache"
 96
 97    if download_path is None:
 98        file_path = os.path.join(CACHE_PATH, file_name)
 99    else:
100        file_path = os.path.join(download_path, file_name)
101
102    # 路径不存在特判
103    if not os.path.exists(CACHE_PATH):
104        os.makedirs(CACHE_PATH)
105
106    try:
107        # 下载
108        if stream:
109            with open(file_path, "wb") as f, requests.get(url, stream=True, headers=headers) as res:
110                for chunk in res.iter_content(chunk_size=64 * 1024):
111                    if not chunk:
112                        break
113                    f.write(chunk)
114        else:
115            # 不使用流式传输
116            res = requests.get(url, headers=headers)
117
118            with open(file_path, "wb") as f:
119                f.write(res.content)
120    except requests.exceptions.RequestException as e:
121        logger.warning(f"下载文件失败: {e}")
122        if os.path.exists(file_path):
123            os.remove(file_path)
124        return None
125
126    return file_path

下载文件到缓存

Arguments:
  • url: 下载的url
  • headers: 下载请求的请求头
  • file_name: 文件名
  • download_path: 下载路径
  • stream: 是否使用流式传输
  • fake_headers: 是否使用自动生成的假请求头
Returns:

文件路径

def clean_cache() -> None:
130def clean_cache() -> None:
131    """
132    清理缓存
133    Returns:
134        None
135    """
136    if os.path.exists(CACHE_PATH):
137        try:
138            shutil.rmtree(CACHE_PATH, ignore_errors=True)
139        except Exception as e:
140            logger.warning("删除缓存时报错,报错信息: %s" % repr(e))

清理缓存

Returns:

None

def function_cache(max_size: int, expiration_time: int = -1):
144def function_cache(max_size: int, expiration_time: int = -1):
145    """
146    函数缓存
147    Args:
148        max_size: 最大大小
149        expiration_time: 过期时间
150    Returns:
151        None
152    """
153    cache = LimitedSizeDict(max_size)
154
155    def cache_decorator(func):
156        """
157        缓存装饰器
158        Args:
159            @param func:
160        Returns:
161            None
162        """
163
164        def wrapper(*args, **kwargs):
165            key = str(func.__name__) + str(args) + str(kwargs)
166            if key in cache and (expiration_time == -1 or time.time() - cache[key][1] < expiration_time):
167                return cache[key][0]
168            result = func(*args, **kwargs)
169            cache[key] = (result, time.time())
170            return result
171
172        def clear_cache():
173            """清理缓存"""
174            cache.clear()
175
176        def get_cache():
177            """获取缓存"""
178            return dict(cache)
179
180        def original_func(*args, **kwargs):
181            """调用原函数"""
182            return func(*args, **kwargs)
183
184        wrapper.clear_cache = clear_cache
185        wrapper.get_cache = get_cache
186        wrapper.original_func = original_func
187        return wrapper
188
189    return cache_decorator

函数缓存

Arguments:
  • max_size: 最大大小
  • expiration_time: 过期时间
Returns:

None

def thread_lock(func):
192def thread_lock(func):
193    """
194    线程锁装饰器
195    """
196    thread_lock = threading.Lock()
197
198    def wrapper(*args, **kwargs):
199        with thread_lock:
200            return func(*args, **kwargs)
201
202    return wrapper

线程锁装饰器

def finalize_and_cleanup():
205def finalize_and_cleanup():
206    """
207    结束运行
208    @return:
209    """
210    logger.info("MuRainBot即将关闭,正在删除缓存")
211
212    clean_cache()
213
214    logger.warning("MuRainBot结束运行!")
215    logger.info("再见!\n")

结束运行 @return:

def save_exc_dump(*args, **kwargs):
198    def wrapper(*args, **kwargs):
199        with thread_lock:
200            return func(*args, **kwargs)

保存异常堆栈

Arguments:
  • description: 保存的dump描述,为空则默认
  • path: 保存的路径,为空则自动根据错误生成
def bytes_io_to_file( io_bytes: _io.BytesIO, file_name: str | None = None, file_type: str | None = None, save_dir: str = '/home/MuRainBot2/data/cache'):
281def bytes_io_to_file(
282        io_bytes: BytesIO,
283        file_name: str | None = None,
284        file_type: str | None = None,
285        save_dir: str = CACHE_PATH
286):
287    """
288    将BytesIO对象保存成文件,并返回路径
289    Args:
290        io_bytes: BytesIO对象
291        file_name: 要保存的文件名,与file_type选一个填即可
292        file_type: 文件类型(扩展名),与file_name选一个填即可
293        save_dir: 保存的文件夹
294
295    Returns:
296        保存的文件路径
297    """
298    if not isinstance(io_bytes, BytesIO):
299        raise TypeError("bytes_io_to_file: 输入类型错误")
300    if file_name is None:
301        if file_type is None:
302            file_type = "cache"
303        file_name = uuid.uuid4().hex + "." + file_type
304    if not os.path.exists(save_dir):
305        os.makedirs(save_dir)
306
307    with open(os.path.join(save_dir, file_name), "wb") as f:
308        f.write(io_bytes.getvalue())
309    return os.path.join(save_dir, file_name)

将BytesIO对象保存成文件,并返回路径

Arguments:
  • io_bytes: BytesIO对象
  • file_name: 要保存的文件名,与file_type选一个填即可
  • file_type: 文件类型(扩展名),与file_name选一个填即可
  • save_dir: 保存的文件夹
Returns:

保存的文件路径