255 lines
8.9 KiB
Python
255 lines
8.9 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
WebSocket协议的服务器远程命令API
|
|
作者: AI Assistant
|
|
功能: 提供基于WebSocket的远程控制台命令执行功能
|
|
"""
|
|
|
|
import asyncio
|
|
import websockets
|
|
import json
|
|
import threading
|
|
import time
|
|
from typing import Dict, Any, Optional
|
|
#自定义包
|
|
from .ConsoleCommandsAPI import ConsoleCommandsAPI
|
|
|
|
class WSRemoteCmdApi:
|
|
"""WebSocket远程命令API服务器"""
|
|
|
|
def __init__(self, game_server, host="0.0.0.0", port=7071, auth_key="mengya2024"):
|
|
"""
|
|
初始化WebSocket远程命令API服务器
|
|
|
|
Args:
|
|
game_server: 游戏服务器实例
|
|
host: WebSocket服务器监听地址
|
|
port: WebSocket服务器监听端口
|
|
auth_key: 认证密钥
|
|
"""
|
|
self.game_server = game_server
|
|
self.host = host
|
|
self.port = port
|
|
self.auth_key = auth_key
|
|
self.server = None
|
|
self.clients = {} # 存储已连接的客户端
|
|
self.console_api = ConsoleCommandsAPI(game_server)
|
|
self.running = False
|
|
|
|
async def register_client(self, websocket, path=None):
|
|
"""注册新的客户端连接"""
|
|
client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}_{int(time.time())}"
|
|
self.clients[client_id] = {
|
|
"websocket": websocket,
|
|
"authenticated": False,
|
|
"connect_time": time.time()
|
|
}
|
|
|
|
try:
|
|
# 发送欢迎消息
|
|
await self.send_message(websocket, {
|
|
"type": "welcome",
|
|
"message": "欢迎连接到萌芽农场远程控制台",
|
|
"server_version": getattr(self.game_server, 'server_version', '2.2.0'),
|
|
"require_auth": True
|
|
})
|
|
|
|
# 处理客户端消息
|
|
async for message in websocket:
|
|
await self.handle_message(client_id, message)
|
|
|
|
except websockets.exceptions.ConnectionClosed:
|
|
pass
|
|
except Exception as e:
|
|
print(f"❌ 客户端 {client_id} 连接处理出错: {str(e)}")
|
|
finally:
|
|
# 清理客户端连接
|
|
if client_id in self.clients:
|
|
del self.clients[client_id]
|
|
print(f"🔌 客户端 {client_id} 已断开连接")
|
|
|
|
async def handle_message(self, client_id: str, message: str):
|
|
"""处理客户端消息"""
|
|
try:
|
|
data = json.loads(message)
|
|
message_type = data.get("type", "")
|
|
|
|
if message_type == "auth":
|
|
await self.handle_auth(client_id, data)
|
|
elif message_type == "command":
|
|
await self.handle_command(client_id, data)
|
|
elif message_type == "ping":
|
|
await self.handle_ping(client_id, data)
|
|
else:
|
|
await self.send_error(client_id, f"未知消息类型: {message_type}")
|
|
|
|
except json.JSONDecodeError:
|
|
await self.send_error(client_id, "无效的JSON格式")
|
|
except Exception as e:
|
|
await self.send_error(client_id, f"处理消息时出错: {str(e)}")
|
|
|
|
async def handle_auth(self, client_id: str, data: Dict[str, Any]):
|
|
"""处理客户端认证"""
|
|
if client_id not in self.clients:
|
|
return
|
|
|
|
provided_key = data.get("auth_key", "")
|
|
|
|
if provided_key == self.auth_key:
|
|
self.clients[client_id]["authenticated"] = True
|
|
await self.send_message(self.clients[client_id]["websocket"], {
|
|
"type": "auth_result",
|
|
"success": True,
|
|
"message": "认证成功,欢迎使用远程控制台"
|
|
})
|
|
print(f"✅ 客户端 {client_id} 认证成功")
|
|
else:
|
|
await self.send_message(self.clients[client_id]["websocket"], {
|
|
"type": "auth_result",
|
|
"success": False,
|
|
"message": "认证失败,密钥错误"
|
|
})
|
|
print(f"❌ 客户端 {client_id} 认证失败")
|
|
|
|
async def handle_command(self, client_id: str, data: Dict[str, Any]):
|
|
"""处理控制台命令"""
|
|
if client_id not in self.clients:
|
|
return
|
|
|
|
# 检查是否已认证
|
|
if not self.clients[client_id]["authenticated"]:
|
|
await self.send_error(client_id, "请先进行认证")
|
|
return
|
|
|
|
command = data.get("command", "").strip()
|
|
if not command:
|
|
await self.send_error(client_id, "命令不能为空")
|
|
return
|
|
|
|
# 执行命令并捕获输出
|
|
try:
|
|
# 重定向标准输出来捕获命令执行结果
|
|
import io
|
|
import sys
|
|
|
|
old_stdout = sys.stdout
|
|
sys.stdout = captured_output = io.StringIO()
|
|
|
|
# 执行命令
|
|
success = self.console_api.process_command(command)
|
|
|
|
# 恢复标准输出
|
|
sys.stdout = old_stdout
|
|
output = captured_output.getvalue()
|
|
|
|
# 发送执行结果
|
|
await self.send_message(self.clients[client_id]["websocket"], {
|
|
"type": "command_result",
|
|
"command": command,
|
|
"success": success,
|
|
"output": output if output else ("命令执行成功" if success else "命令执行失败")
|
|
})
|
|
|
|
print(f"📝 客户端 {client_id} 执行命令: {command} - {'成功' if success else '失败'}")
|
|
|
|
except Exception as e:
|
|
await self.send_error(client_id, f"执行命令时出错: {str(e)}")
|
|
|
|
async def handle_ping(self, client_id: str, data: Dict[str, Any]):
|
|
"""处理ping请求"""
|
|
if client_id not in self.clients:
|
|
return
|
|
|
|
await self.send_message(self.clients[client_id]["websocket"], {
|
|
"type": "pong",
|
|
"timestamp": time.time()
|
|
})
|
|
|
|
async def send_message(self, websocket, data: Dict[str, Any]):
|
|
"""发送消息到客户端"""
|
|
try:
|
|
message = json.dumps(data, ensure_ascii=False)
|
|
await websocket.send(message)
|
|
except Exception as e:
|
|
print(f"❌ 发送消息失败: {str(e)}")
|
|
|
|
async def send_error(self, client_id: str, error_message: str):
|
|
"""发送错误消息到客户端"""
|
|
if client_id in self.clients:
|
|
await self.send_message(self.clients[client_id]["websocket"], {
|
|
"type": "error",
|
|
"message": error_message
|
|
})
|
|
|
|
def start_server(self):
|
|
"""启动WebSocket服务器"""
|
|
if self.running:
|
|
return
|
|
|
|
async def run_server_async():
|
|
try:
|
|
self.server = await websockets.serve(
|
|
self.register_client,
|
|
self.host,
|
|
self.port
|
|
)
|
|
|
|
self.running = True
|
|
print(f"🌐 WebSocket远程控制台服务器已启动: ws://{self.host}:{self.port}")
|
|
print(f"🔑 认证密钥: {self.auth_key}")
|
|
|
|
# 保持服务器运行
|
|
await self.server.wait_closed()
|
|
|
|
except Exception as e:
|
|
print(f"❌ WebSocket服务器启动失败: {str(e)}")
|
|
self.running = False
|
|
|
|
def run_server():
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
try:
|
|
loop.run_until_complete(run_server_async())
|
|
except Exception as e:
|
|
print(f"❌ WebSocket服务器线程异常: {str(e)}")
|
|
self.running = False
|
|
|
|
# 在新线程中运行WebSocket服务器
|
|
server_thread = threading.Thread(target=run_server, daemon=True)
|
|
server_thread.start()
|
|
|
|
def stop_server(self):
|
|
"""停止WebSocket服务器"""
|
|
if not self.running:
|
|
return
|
|
|
|
self.running = False
|
|
|
|
# 关闭所有客户端连接
|
|
for client_id, client_info in list(self.clients.items()):
|
|
try:
|
|
asyncio.create_task(client_info["websocket"].close())
|
|
except:
|
|
pass
|
|
|
|
self.clients.clear()
|
|
|
|
if self.server:
|
|
try:
|
|
self.server.close()
|
|
except:
|
|
pass
|
|
|
|
print("🌐 WebSocket远程控制台服务器已停止")
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""获取服务器状态"""
|
|
return {
|
|
"running": self.running,
|
|
"host": self.host,
|
|
"port": self.port,
|
|
"connected_clients": len(self.clients),
|
|
"authenticated_clients": len([c for c in self.clients.values() if c["authenticated"]])
|
|
} |