Lib.common
工具
1""" 2工具 3""" 4import os.path 5import shutil 6import sys 7import threading 8import time 9import traceback 10import uuid 11from collections import OrderedDict 12from io import BytesIO 13 14import requests 15 16from .constants import * 17from .utils import Logger 18 19logger = Logger.get_logger() 20 21 22class LimitedSizeDict(OrderedDict): 23 """ 24 带有限制大小的字典 25 """ 26 27 def __init__(self, max_size): 28 self._max_size = max_size 29 super().__init__() 30 31 def __setitem__(self, key, value): 32 if key in self: 33 del self[key] 34 elif len(self) >= self._max_size: 35 oldest_key = next(iter(self)) 36 del self[oldest_key] 37 super().__setitem__(key, value) 38 39 40def restart() -> None: 41 """ 42 MRB2重启 43 Returns: 44 None 45 """ 46 # 获取当前解释器路径 47 p = sys.executable 48 try: 49 # 启动新程序(解释器路径, 当前程序) 50 os.execl(p, p, *sys.argv) 51 except OSError: 52 # 关闭当前程序 53 sys.exit() 54 55 56def download_file_to_cache(url: str, headers=None, file_name: str = "", 57 download_path: str = None, stream=False, fake_headers: bool = True) -> str | None: 58 """ 59 下载文件到缓存 60 Args: 61 url: 下载的url 62 headers: 下载请求的请求头 63 file_name: 文件名 64 download_path: 下载路径 65 stream: 是否使用流式传输 66 fake_headers: 是否使用自动生成的假请求头 67 Returns: 68 文件路径 69 """ 70 if headers is None: 71 headers = {} 72 73 if fake_headers: 74 headers["User-Agent"] = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " 75 "Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.42") 76 headers["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8,da;q=0.7,ko;q=0.6" 77 headers["Accept-Encoding"] = "gzip, deflate, br" 78 headers["Accept"] = ("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8," 79 "application/signed-exchange;v=b3;q=0.7") 80 headers["Connection"] = "keep-alive" 81 headers["Upgrade-Insecure-Requests"] = "1" 82 headers["Cache-Control"] = "max-age=0" 83 headers["Sec-Fetch-Dest"] = "document" 84 headers["Sec-Fetch-Mode"] = "navigate" 85 headers["Sec-Fetch-Site"] = "none" 86 headers["Sec-Fetch-User"] = "?1" 87 headers["Sec-Ch-Ua"] = "\"Chromium\";v=\"113\", \"Not-A.Brand\";v=\"24\", \"Microsoft Edge\";v=\"113\"" 88 headers["Sec-Ch-Ua-Mobile"] = "?0" 89 headers["Sec-Ch-Ua-Platform"] = "\"Windows\"" 90 headers["Host"] = url.split("/")[2] 91 92 # 路径拼接 93 if file_name == "": 94 file_name = uuid.uuid4().hex + ".cache" 95 96 if download_path is None: 97 file_path = os.path.join(CACHE_PATH, file_name) 98 else: 99 file_path = os.path.join(download_path, file_name) 100 101 # 路径不存在特判 102 if not os.path.exists(CACHE_PATH): 103 os.makedirs(CACHE_PATH) 104 105 try: 106 # 下载 107 if stream: 108 with open(file_path, "wb") as f, requests.get(url, stream=True, headers=headers) as res: 109 for chunk in res.iter_content(chunk_size=64 * 1024): 110 if not chunk: 111 break 112 f.write(chunk) 113 else: 114 # 不使用流式传输 115 res = requests.get(url, headers=headers) 116 117 with open(file_path, "wb") as f: 118 f.write(res.content) 119 except requests.exceptions.RequestException as e: 120 logger.warning(f"下载文件失败: {e}") 121 if os.path.exists(file_path): 122 os.remove(file_path) 123 return None 124 125 return file_path 126 127 128# 删除缓存文件 129def clean_cache() -> None: 130 """ 131 清理缓存 132 Returns: 133 None 134 """ 135 if os.path.exists(CACHE_PATH): 136 try: 137 shutil.rmtree(CACHE_PATH, ignore_errors=True) 138 except Exception as e: 139 logger.warning("删除缓存时报错,报错信息: %s" % repr(e)) 140 141 142# 函数缓存 143def function_cache(max_size: int, expiration_time: int = -1): 144 """ 145 函数缓存 146 Args: 147 max_size: 最大大小 148 expiration_time: 过期时间 149 Returns: 150 None 151 """ 152 cache = LimitedSizeDict(max_size) 153 154 def cache_decorator(func): 155 """ 156 缓存装饰器 157 Args: 158 @param func: 159 Returns: 160 None 161 """ 162 163 def wrapper(*args, **kwargs): 164 key = str(func.__name__) + str(args) + str(kwargs) 165 if key in cache and (expiration_time == -1 or time.time() - cache[key][1] < expiration_time): 166 return cache[key][0] 167 result = func(*args, **kwargs) 168 cache[key] = (result, time.time()) 169 return result 170 171 def clear_cache(): 172 """清理缓存""" 173 cache.clear() 174 175 def get_cache(): 176 """获取缓存""" 177 return dict(cache) 178 179 def original_func(*args, **kwargs): 180 """调用原函数""" 181 return func(*args, **kwargs) 182 183 wrapper.clear_cache = clear_cache 184 wrapper.get_cache = get_cache 185 wrapper.original_func = original_func 186 return wrapper 187 188 return cache_decorator 189 190 191def thread_lock(func): 192 """ 193 线程锁装饰器 194 """ 195 thread_lock = threading.Lock() 196 197 def wrapper(*args, **kwargs): 198 with thread_lock: 199 return func(*args, **kwargs) 200 201 return wrapper 202 203 204def finalize_and_cleanup(): 205 """ 206 结束运行 207 @return: 208 """ 209 logger.info("MuRainBot即将关闭,正在删除缓存") 210 211 clean_cache() 212 213 logger.warning("MuRainBot结束运行!") 214 logger.info("再见!\n") 215 216 217@thread_lock 218def save_exc_dump(description: str = None, path: str = None): 219 """ 220 保存异常堆栈 221 Args: 222 description: 保存的dump描述,为空则默认 223 path: 保存的路径,为空则自动根据错误生成 224 """ 225 try: 226 import coredumpy 227 except ImportError: 228 logger.warning("coredumpy未安装,无法保存异常堆栈") 229 return 230 231 try: 232 exc_type, exc_value, exc_traceback = sys.exc_info() 233 if not exc_traceback: 234 raise Exception("No traceback found") 235 236 # 遍历 traceback 链表,找到最后一个 frame (异常最初发生的位置) 237 current_tb = exc_traceback 238 frame = current_tb.tb_frame 239 while current_tb: 240 frame = current_tb.tb_frame 241 current_tb = current_tb.tb_next 242 243 i = 0 244 while True: 245 if i > 0: 246 path_ = os.path.join(DUMPS_PATH, 247 f"coredumpy_" 248 f"{time.strftime('%Y%m%d%H%M%S')}_" 249 f"{frame.f_code.co_name}_{i}.dump") 250 else: 251 path_ = os.path.join(DUMPS_PATH, 252 f"coredumpy_" 253 f"{time.strftime('%Y%m%d%H%M%S')}_" 254 f"{frame.f_code.co_name}.dump") 255 if not os.path.exists(path_): 256 break 257 i += 1 258 259 for _ in ['?', '*', '"', '<', '>']: 260 path_ = path_.replace(_, "") 261 262 kwargs = { 263 "frame": frame, 264 "path": os.path.normpath(path_) 265 } 266 if description: 267 kwargs["description"] = description 268 if path: 269 kwargs["path"] = path 270 271 coredumpy.dump(**kwargs) 272 except Exception as e: 273 logger.error(f"保存异常堆栈时发生错误: {repr(e)}\n" 274 f"{traceback.format_exc()}") 275 return None 276 277 return kwargs["path"] 278 279 280def bytes_io_to_file( 281 io_bytes: BytesIO, 282 file_name: str | None = None, 283 file_type: str | None = None, 284 save_dir: str = CACHE_PATH 285): 286 """ 287 将BytesIO对象保存成文件,并返回路径 288 Args: 289 io_bytes: BytesIO对象 290 file_name: 要保存的文件名,与file_type选一个填即可 291 file_type: 文件类型(扩展名),与file_name选一个填即可 292 save_dir: 保存的文件夹 293 294 Returns: 295 保存的文件路径 296 """ 297 if not isinstance(io_bytes, BytesIO): 298 raise TypeError("bytes_io_to_file: 输入类型错误") 299 if file_name is None: 300 if file_type is None: 301 file_type = "cache" 302 file_name = uuid.uuid4().hex + "." + file_type 303 if not os.path.exists(save_dir): 304 os.makedirs(save_dir) 305 306 with open(os.path.join(save_dir, file_name), "wb") as f: 307 f.write(io_bytes.getvalue()) 308 return os.path.join(save_dir, file_name)
logger =
<RootLogger root (INFO)>
class
LimitedSizeDict(collections.OrderedDict):
23class LimitedSizeDict(OrderedDict): 24 """ 25 带有限制大小的字典 26 """ 27 28 def __init__(self, max_size): 29 self._max_size = max_size 30 super().__init__() 31 32 def __setitem__(self, key, value): 33 if key in self: 34 del self[key] 35 elif len(self) >= self._max_size: 36 oldest_key = next(iter(self)) 37 del self[oldest_key] 38 super().__setitem__(key, value)
带有限制大小的字典
def
restart() -> None:
41def restart() -> None: 42 """ 43 MRB2重启 44 Returns: 45 None 46 """ 47 # 获取当前解释器路径 48 p = sys.executable 49 try: 50 # 启动新程序(解释器路径, 当前程序) 51 os.execl(p, p, *sys.argv) 52 except OSError: 53 # 关闭当前程序 54 sys.exit()
MRB2重启
Returns:
None
def
download_file_to_cache( url: str, headers=None, file_name: str = '', download_path: str = None, stream=False, fake_headers: bool = True) -> str | None:
57def download_file_to_cache(url: str, headers=None, file_name: str = "", 58 download_path: str = None, stream=False, fake_headers: bool = True) -> str | None: 59 """ 60 下载文件到缓存 61 Args: 62 url: 下载的url 63 headers: 下载请求的请求头 64 file_name: 文件名 65 download_path: 下载路径 66 stream: 是否使用流式传输 67 fake_headers: 是否使用自动生成的假请求头 68 Returns: 69 文件路径 70 """ 71 if headers is None: 72 headers = {} 73 74 if fake_headers: 75 headers["User-Agent"] = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " 76 "Chrome/113.0.0.0 Safari/537.36 Edg/113.0.1774.42") 77 headers["Accept-Language"] = "zh-CN,zh;q=0.9,en;q=0.8,da;q=0.7,ko;q=0.6" 78 headers["Accept-Encoding"] = "gzip, deflate, br" 79 headers["Accept"] = ("text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8," 80 "application/signed-exchange;v=b3;q=0.7") 81 headers["Connection"] = "keep-alive" 82 headers["Upgrade-Insecure-Requests"] = "1" 83 headers["Cache-Control"] = "max-age=0" 84 headers["Sec-Fetch-Dest"] = "document" 85 headers["Sec-Fetch-Mode"] = "navigate" 86 headers["Sec-Fetch-Site"] = "none" 87 headers["Sec-Fetch-User"] = "?1" 88 headers["Sec-Ch-Ua"] = "\"Chromium\";v=\"113\", \"Not-A.Brand\";v=\"24\", \"Microsoft Edge\";v=\"113\"" 89 headers["Sec-Ch-Ua-Mobile"] = "?0" 90 headers["Sec-Ch-Ua-Platform"] = "\"Windows\"" 91 headers["Host"] = url.split("/")[2] 92 93 # 路径拼接 94 if file_name == "": 95 file_name = uuid.uuid4().hex + ".cache" 96 97 if download_path is None: 98 file_path = os.path.join(CACHE_PATH, file_name) 99 else: 100 file_path = os.path.join(download_path, file_name) 101 102 # 路径不存在特判 103 if not os.path.exists(CACHE_PATH): 104 os.makedirs(CACHE_PATH) 105 106 try: 107 # 下载 108 if stream: 109 with open(file_path, "wb") as f, requests.get(url, stream=True, headers=headers) as res: 110 for chunk in res.iter_content(chunk_size=64 * 1024): 111 if not chunk: 112 break 113 f.write(chunk) 114 else: 115 # 不使用流式传输 116 res = requests.get(url, headers=headers) 117 118 with open(file_path, "wb") as f: 119 f.write(res.content) 120 except requests.exceptions.RequestException as e: 121 logger.warning(f"下载文件失败: {e}") 122 if os.path.exists(file_path): 123 os.remove(file_path) 124 return None 125 126 return file_path
下载文件到缓存
Arguments:
- url: 下载的url
- headers: 下载请求的请求头
- file_name: 文件名
- download_path: 下载路径
- stream: 是否使用流式传输
- fake_headers: 是否使用自动生成的假请求头
Returns:
文件路径
def
clean_cache() -> None:
130def clean_cache() -> None: 131 """ 132 清理缓存 133 Returns: 134 None 135 """ 136 if os.path.exists(CACHE_PATH): 137 try: 138 shutil.rmtree(CACHE_PATH, ignore_errors=True) 139 except Exception as e: 140 logger.warning("删除缓存时报错,报错信息: %s" % repr(e))
清理缓存
Returns:
None
def
function_cache(max_size: int, expiration_time: int = -1):
144def function_cache(max_size: int, expiration_time: int = -1): 145 """ 146 函数缓存 147 Args: 148 max_size: 最大大小 149 expiration_time: 过期时间 150 Returns: 151 None 152 """ 153 cache = LimitedSizeDict(max_size) 154 155 def cache_decorator(func): 156 """ 157 缓存装饰器 158 Args: 159 @param func: 160 Returns: 161 None 162 """ 163 164 def wrapper(*args, **kwargs): 165 key = str(func.__name__) + str(args) + str(kwargs) 166 if key in cache and (expiration_time == -1 or time.time() - cache[key][1] < expiration_time): 167 return cache[key][0] 168 result = func(*args, **kwargs) 169 cache[key] = (result, time.time()) 170 return result 171 172 def clear_cache(): 173 """清理缓存""" 174 cache.clear() 175 176 def get_cache(): 177 """获取缓存""" 178 return dict(cache) 179 180 def original_func(*args, **kwargs): 181 """调用原函数""" 182 return func(*args, **kwargs) 183 184 wrapper.clear_cache = clear_cache 185 wrapper.get_cache = get_cache 186 wrapper.original_func = original_func 187 return wrapper 188 189 return cache_decorator
函数缓存
Arguments:
- max_size: 最大大小
- expiration_time: 过期时间
Returns:
None
def
thread_lock(func):
192def thread_lock(func): 193 """ 194 线程锁装饰器 195 """ 196 thread_lock = threading.Lock() 197 198 def wrapper(*args, **kwargs): 199 with thread_lock: 200 return func(*args, **kwargs) 201 202 return wrapper
线程锁装饰器
def
finalize_and_cleanup():
205def finalize_and_cleanup(): 206 """ 207 结束运行 208 @return: 209 """ 210 logger.info("MuRainBot即将关闭,正在删除缓存") 211 212 clean_cache() 213 214 logger.warning("MuRainBot结束运行!") 215 logger.info("再见!\n")
结束运行 @return:
def
save_exc_dump(*args, **kwargs):
保存异常堆栈
Arguments:
- description: 保存的dump描述,为空则默认
- path: 保存的路径,为空则自动根据错误生成
def
bytes_io_to_file( io_bytes: _io.BytesIO, file_name: str | None = None, file_type: str | None = None, save_dir: str = '/home/MuRainBot2/data/cache'):
281def bytes_io_to_file( 282 io_bytes: BytesIO, 283 file_name: str | None = None, 284 file_type: str | None = None, 285 save_dir: str = CACHE_PATH 286): 287 """ 288 将BytesIO对象保存成文件,并返回路径 289 Args: 290 io_bytes: BytesIO对象 291 file_name: 要保存的文件名,与file_type选一个填即可 292 file_type: 文件类型(扩展名),与file_name选一个填即可 293 save_dir: 保存的文件夹 294 295 Returns: 296 保存的文件路径 297 """ 298 if not isinstance(io_bytes, BytesIO): 299 raise TypeError("bytes_io_to_file: 输入类型错误") 300 if file_name is None: 301 if file_type is None: 302 file_type = "cache" 303 file_name = uuid.uuid4().hex + "." + file_type 304 if not os.path.exists(save_dir): 305 os.makedirs(save_dir) 306 307 with open(os.path.join(save_dir, file_name), "wb") as f: 308 f.write(io_bytes.getvalue()) 309 return os.path.join(save_dir, file_name)
将BytesIO对象保存成文件,并返回路径
Arguments:
- io_bytes: BytesIO对象
- file_name: 要保存的文件名,与file_type选一个填即可
- file_type: 文件类型(扩展名),与file_name选一个填即可
- save_dir: 保存的文件夹
Returns:
保存的文件路径