Lib.core.ListenerServer

监听服务器

  1"""
  2监听服务器
  3"""
  4
  5from ..utils import Logger
  6from .ConfigManager import GlobalConfig
  7from Lib.core import EventManager
  8
  9from concurrent.futures import ThreadPoolExecutor
 10from wsgiref.simple_server import WSGIServer
 11
 12from flask import Flask, request
 13import hmac
 14
 15logger = Logger.get_logger()
 16app = Flask(__name__)
 17
 18
 19class EscalationEvent(EventManager.Event):
 20    """
 21    上报事件
 22    """
 23
 24    def __init__(self, event_data):
 25        self.event_data = event_data
 26
 27
 28@app.route("/", methods=["POST"])
 29def post_data():
 30    """
 31    上报处理
 32    """
 33    if GlobalConfig().server.secret:
 34        sig = hmac.new(GlobalConfig().server.secret.encode("utf-8"), request.get_data(), 'sha1').hexdigest()
 35        try:
 36            received_sig = request.headers['X-Signature'][len('sha1='):]
 37        except KeyError:
 38            logger.warning("收到非法请求(缺少签名),拒绝访问")
 39            return "", 401
 40        if sig != received_sig:
 41            logger.warning("收到非法请求(签名不匹配),拒绝访问")
 42            return "", 401
 43    data = request.get_json()
 44    logger.debug("收到上报: %s" % data)
 45    if "self" in data and GlobalConfig().account.user_id != 0 and data.get("self") != GlobalConfig().account.user_id:
 46        logger.warning(f"收到来自其他bot的消息,忽略: {data}")
 47        return "ok", 204
 48    EscalationEvent(data).call_async()
 49
 50    return "ok", 204
 51
 52
 53config = GlobalConfig()
 54if config.server.server == "werkzeug":
 55    # 使用werkzeug服务器
 56    from werkzeug.serving import WSGIRequestHandler
 57
 58
 59    class ThreadPoolWSGIServer(WSGIServer):
 60        """
 61        线程池WSGI服务器
 62        """
 63
 64        def __init__(self, server_address, app=None, max_workers=10, passthrough_errors=False,
 65                     handler_class=WSGIRequestHandler, **kwargs):
 66            super().__init__(server_address, handler_class, **kwargs)
 67            self.executor = ThreadPoolExecutor(max_workers=max_workers)
 68            self.app = app
 69            self.ssl_context = None
 70            self.multithread = True
 71            self.multiprocess = False
 72            self.threaded = True
 73            self.passthrough_errors = passthrough_errors
 74
 75        def handle_request(self):
 76            """
 77            处理请求
 78            """
 79            request, client_address = self.get_request()
 80            if self.verify_request(request, client_address):
 81                self.executor.submit(self.process_request, request, client_address)
 82
 83        def serve_forever(self):
 84            """
 85            启动服务器
 86            """
 87            while True:
 88                self.handle_request()
 89
 90
 91    class ThreadPoolWSGIRequestHandler(WSGIRequestHandler):
 92        def handle(self):
 93            super().handle()
 94
 95
 96    server = ThreadPoolWSGIServer((config.server.host, config.server.port),
 97                                  app=app,
 98                                  max_workers=config.server.max_works)
 99    server.RequestHandlerClass = ThreadPoolWSGIRequestHandler
100    start_server = lambda: server.serve_forever()
101elif config.server.server == "waitress":
102    # 使用waitress服务器
103    from waitress import serve
104
105    start_server = lambda: serve(app, host=config.server.host, port=config.server.port, threads=config.server.max_works)
106else:
107    raise ValueError("服务器类型错误: 未知服务器类型")
logger = <RootLogger root (INFO)>
app = <Flask 'Lib.core.ListenerServer'>
class EscalationEvent(Lib.core.EventManager.Event):
20class EscalationEvent(EventManager.Event):
21    """
22    上报事件
23    """
24
25    def __init__(self, event_data):
26        self.event_data = event_data

上报事件

EscalationEvent(event_data)
25    def __init__(self, event_data):
26        self.event_data = event_data
event_data
@app.route('/', methods=['POST'])
def post_data():
29@app.route("/", methods=["POST"])
30def post_data():
31    """
32    上报处理
33    """
34    if GlobalConfig().server.secret:
35        sig = hmac.new(GlobalConfig().server.secret.encode("utf-8"), request.get_data(), 'sha1').hexdigest()
36        try:
37            received_sig = request.headers['X-Signature'][len('sha1='):]
38        except KeyError:
39            logger.warning("收到非法请求(缺少签名),拒绝访问")
40            return "", 401
41        if sig != received_sig:
42            logger.warning("收到非法请求(签名不匹配),拒绝访问")
43            return "", 401
44    data = request.get_json()
45    logger.debug("收到上报: %s" % data)
46    if "self" in data and GlobalConfig().account.user_id != 0 and data.get("self") != GlobalConfig().account.user_id:
47        logger.warning(f"收到来自其他bot的消息,忽略: {data}")
48        return "ok", 204
49    EscalationEvent(data).call_async()
50
51    return "ok", 204

上报处理

class ThreadPoolWSGIServer(wsgiref.simple_server.WSGIServer):
60    class ThreadPoolWSGIServer(WSGIServer):
61        """
62        线程池WSGI服务器
63        """
64
65        def __init__(self, server_address, app=None, max_workers=10, passthrough_errors=False,
66                     handler_class=WSGIRequestHandler, **kwargs):
67            super().__init__(server_address, handler_class, **kwargs)
68            self.executor = ThreadPoolExecutor(max_workers=max_workers)
69            self.app = app
70            self.ssl_context = None
71            self.multithread = True
72            self.multiprocess = False
73            self.threaded = True
74            self.passthrough_errors = passthrough_errors
75
76        def handle_request(self):
77            """
78            处理请求
79            """
80            request, client_address = self.get_request()
81            if self.verify_request(request, client_address):
82                self.executor.submit(self.process_request, request, client_address)
83
84        def serve_forever(self):
85            """
86            启动服务器
87            """
88            while True:
89                self.handle_request()

线程池WSGI服务器

ThreadPoolWSGIServer( server_address, app=None, max_workers=10, passthrough_errors=False, handler_class=<class 'werkzeug.serving.WSGIRequestHandler'>, **kwargs)
65        def __init__(self, server_address, app=None, max_workers=10, passthrough_errors=False,
66                     handler_class=WSGIRequestHandler, **kwargs):
67            super().__init__(server_address, handler_class, **kwargs)
68            self.executor = ThreadPoolExecutor(max_workers=max_workers)
69            self.app = app
70            self.ssl_context = None
71            self.multithread = True
72            self.multiprocess = False
73            self.threaded = True
74            self.passthrough_errors = passthrough_errors

Constructor. May be extended, do not override.

executor
app
ssl_context
multithread
multiprocess
threaded
passthrough_errors
def handle_request(self):
76        def handle_request(self):
77            """
78            处理请求
79            """
80            request, client_address = self.get_request()
81            if self.verify_request(request, client_address):
82                self.executor.submit(self.process_request, request, client_address)

处理请求

def serve_forever(self):
84        def serve_forever(self):
85            """
86            启动服务器
87            """
88            while True:
89                self.handle_request()

启动服务器

class ThreadPoolWSGIRequestHandler(werkzeug.serving.WSGIRequestHandler):
92    class ThreadPoolWSGIRequestHandler(WSGIRequestHandler):
93        def handle(self):
94            super().handle()

A request handler that implements WSGI dispatching.

def handle(self):
93        def handle(self):
94            super().handle()

Handles a request ignoring dropped connections.

server = <ThreadPoolWSGIServer object>
def start_server():
101    start_server = lambda: server.serve_forever()