import socket import threading import json import time import sys import logging import colorama from datetime import datetime # 初始化colorama以支持跨平台彩色终端输出 colorama.init() # 自定义日志格式化器,带有颜色和分类 class MinecraftStyleFormatter(logging.Formatter): """Minecraft风格的日志格式化器,带有颜色和分类""" # ANSI颜色代码 COLORS = { 'RESET': colorama.Fore.RESET, 'BLACK': colorama.Fore.BLACK, 'RED': colorama.Fore.RED, 'GREEN': colorama.Fore.GREEN, 'YELLOW': colorama.Fore.YELLOW, 'BLUE': colorama.Fore.BLUE, 'MAGENTA': colorama.Fore.MAGENTA, 'CYAN': colorama.Fore.CYAN, 'WHITE': colorama.Fore.WHITE, 'BRIGHT_BLACK': colorama.Fore.LIGHTBLACK_EX, 'BRIGHT_RED': colorama.Fore.LIGHTRED_EX, 'BRIGHT_GREEN': colorama.Fore.LIGHTGREEN_EX, 'BRIGHT_YELLOW': colorama.Fore.LIGHTYELLOW_EX, 'BRIGHT_BLUE': colorama.Fore.LIGHTBLUE_EX, 'BRIGHT_MAGENTA': colorama.Fore.LIGHTMAGENTA_EX, 'BRIGHT_CYAN': colorama.Fore.LIGHTCYAN_EX, 'BRIGHT_WHITE': colorama.Fore.LIGHTWHITE_EX, } # 日志级别颜色(类似于Minecraft) LEVEL_COLORS = { 'DEBUG': COLORS['BRIGHT_BLACK'], 'INFO': COLORS['WHITE'], 'WARNING': COLORS['YELLOW'], 'ERROR': COLORS['RED'], 'CRITICAL': COLORS['BRIGHT_RED'], } # 类别及其颜色 CATEGORIES = { 'SERVER': COLORS['BRIGHT_CYAN'], 'NETWORK': COLORS['BRIGHT_GREEN'], 'CLIENT': COLORS['BRIGHT_YELLOW'], 'SYSTEM': COLORS['BRIGHT_MAGENTA'], } def format(self, record): # 获取日志级别颜色 level_color = self.LEVEL_COLORS.get(record.levelname, self.COLORS['WHITE']) # 从记录名称中确定类别,默认为SERVER category_name = getattr(record, 'category', 'SERVER') category_color = self.CATEGORIES.get(category_name, self.COLORS['WHITE']) # 格式化时间戳,类似于Minecraft:[HH:MM:SS] timestamp = datetime.now().strftime('%H:%M:%S') # 格式化消息 formatted_message = f"{self.COLORS['BRIGHT_BLACK']}[{timestamp}] {category_color}[{category_name}] {level_color}{record.levelname}: {record.getMessage()}{self.COLORS['RESET']}" return formatted_message class TCPServer: def __init__(self, host='127.0.0.1', port=9000, buffer_size=4096): """初始化TCP服务器""" self.host = host self.port = port self.buffer_size = buffer_size self.socket = None self.clients = {} # 存储客户端连接 {client_id: (socket, address)} self.running = False self.client_buffers = {} # 每个客户端的消息缓冲区 # 配置日志 self._setup_logging() def _setup_logging(self): """设置Minecraft风格的日志系统""" # 创建日志器 self.logger = logging.getLogger('TCPServer') self.logger.setLevel(logging.INFO) # 清除任何现有的处理器 if self.logger.handlers: self.logger.handlers.clear() # 创建控制台处理器 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) # 设置格式化器 formatter = MinecraftStyleFormatter() console_handler.setFormatter(formatter) # 添加处理器到日志器 self.logger.addHandler(console_handler) def log(self, level, message, category='SERVER'): """使用指定的分类和级别记录日志""" record = logging.LogRecord( name=self.logger.name, level=getattr(logging, level), pathname='', lineno=0, msg=message, args=(), exc_info=None ) record.category = category # 检查是否存在控制台输入锁,如果存在则使用锁来避免打乱命令输入 if hasattr(self, '_console_input_lock'): with self._console_input_lock: self.logger.handle(record) else: self.logger.handle(record) def start(self): """启动服务器""" try: # 创建TCP套接字 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 禁用Nagle算法 # 绑定地址和监听 self.socket.bind((self.host, self.port)) self.socket.listen(5) self.running = True self.log('INFO', f"服务器启动,监听 {self.host}:{self.port}", 'SERVER') # 接受客户端连接的主循环 self._accept_clients() except Exception as e: self.log('ERROR', f"服务器启动错误: {e}", 'SYSTEM') self.stop() def _accept_clients(self): """接受客户端连接的循环""" while self.running: try: # 接受新的客户端连接 client_socket, address = self.socket.accept() client_id = f"{address[0]}:{address[1]}" self.log('INFO', f"新客户端连接: {client_id}", 'NETWORK') # 存储客户端信息 self.clients[client_id] = (client_socket, address) self.client_buffers[client_id] = "" # 创建处理线程 client_thread = threading.Thread( target=self._handle_client, args=(client_id,) ) client_thread.daemon = True client_thread.start() # 通知客户端连接成功 self.send_data(client_id, {"type": "connection_status", "status": "connected"}) except KeyboardInterrupt: self.log('INFO', "收到中断信号,服务器停止中...", 'SYSTEM') break except Exception as e: self.log('ERROR', f"接受连接时出错: {e}", 'NETWORK') time.sleep(1) # 避免CPU过度使用 def _handle_client(self, client_id): """处理客户端消息的线程""" client_socket, _ = self.clients.get(client_id, (None, None)) if not client_socket: return # 设置超时,用于定期检查连接状态 client_socket.settimeout(30) while self.running and client_id in self.clients: try: # 接收数据 data = client_socket.recv(self.buffer_size) if not data: # 客户端断开连接 self.log('INFO', f"客户端 {client_id} 断开连接", 'CLIENT') self._remove_client(client_id) break # 处理接收的数据 self._process_data(client_id, data) except socket.timeout: # 发送保活消息 try: self.send_data(client_id, {"type": "ping"}) except: self.log('INFO', f"客户端 {client_id} 连接超时", 'CLIENT') self._remove_client(client_id) break except Exception as e: self.log('ERROR', f"处理客户端 {client_id} 数据时出错: {e}", 'CLIENT') self._remove_client(client_id) break def _process_data(self, client_id, data): """处理从客户端接收的数据""" # 将接收的字节添加到缓冲区 try: decoded_data = data.decode('utf-8') self.client_buffers[client_id] += decoded_data # 处理可能包含多条JSON消息的缓冲区 self._process_buffer(client_id) except UnicodeDecodeError as e: self.log('ERROR', f"解码客户端 {client_id} 数据出错: {e}", 'CLIENT') def _process_buffer(self, client_id): """处理客户端消息缓冲区""" buffer = self.client_buffers.get(client_id, "") # 按换行符分割消息 while '\n' in buffer: message_end = buffer.find('\n') message_text = buffer[:message_end].strip() buffer = buffer[message_end + 1:] # 处理非空消息 if message_text: try: # 解析JSON消息 message = json.loads(message_text) self.log('INFO', f"从客户端 {client_id} 接收JSON: {message}", 'CLIENT') # 处理消息 - 实现自定义逻辑 self._handle_message(client_id, message) except json.JSONDecodeError: # 非JSON格式,作为原始文本处理 self.log('INFO', f"从客户端 {client_id} 接收文本: {message_text}", 'CLIENT') self._handle_raw_message(client_id, message_text) # 更新缓冲区 self.client_buffers[client_id] = buffer def _handle_message(self, client_id, message): """处理JSON消息 - 可被子类覆盖以实现自定义逻辑""" # 默认实现:简单回显 response = { "type": "response", "original": message, "timestamp": time.time() } self.send_data(client_id, response) def _handle_raw_message(self, client_id, message): """处理原始文本消息 - 可被子类覆盖以实现自定义逻辑""" # 默认实现:简单回显 response = { "type": "text_response", "content": f"收到: {message}", "timestamp": time.time() } self.send_data(client_id, response) def send_data(self, client_id, data): """向指定客户端发送JSON数据""" if client_id not in self.clients: self.log('WARNING', f"客户端 {client_id} 不存在,无法发送数据", 'NETWORK') return False client_socket, _ = self.clients[client_id] try: # 转换为JSON字符串,添加换行符 if isinstance(data, (dict, list)): message = json.dumps(data) + '\n' else: message = str(data) + '\n' # 发送数据 client_socket.sendall(message.encode('utf-8')) return True except Exception as e: self.log('ERROR', f"向客户端 {client_id} 发送数据时出错: {e}", 'NETWORK') self._remove_client(client_id) return False def broadcast(self, data, exclude=None): """向所有客户端广播消息,可选排除特定客户端""" exclude = exclude or [] for client_id in list(self.clients.keys()): if client_id not in exclude: self.send_data(client_id, data) def _remove_client(self, client_id): """断开并移除客户端连接""" if client_id in self.clients: client_socket, _ = self.clients[client_id] try: client_socket.close() except: pass del self.clients[client_id] if client_id in self.client_buffers: del self.client_buffers[client_id] self.log('INFO', f"客户端 {client_id} 已移除", 'CLIENT') def stop(self): """停止服务器""" self.running = False # 关闭所有客户端连接 for client_id in list(self.clients.keys()): self._remove_client(client_id) # 关闭服务器套接字 if self.socket: try: self.socket.close() except: pass self.log('INFO', "服务器已停止", 'SERVER') # 使用示例 if __name__ == "__main__": try: # 创建并启动服务器 server = TCPServer() # 以阻塞方式启动服务器 server_thread = threading.Thread(target=server.start) server_thread.daemon = True server_thread.start() # 运行直到按Ctrl+C print("服务器运行中,按Ctrl+C停止...") while True: time.sleep(1) except KeyboardInterrupt: print("\n程序被用户中断") if 'server' in locals(): server.stop() sys.exit(0)