继续迁移游戏配置到数据库

This commit is contained in:
2025-07-20 22:16:57 +08:00
parent ec94e10fda
commit e3741ed9d4
6 changed files with 850 additions and 332 deletions

View File

@@ -140,36 +140,6 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"获取游戏配置失败 [{config_type}]: {e}")
return None
def get_daily_checkin_config(self) -> Optional[Dict[str, Any]]:
"""
获取每日签到配置
Returns:
Dict: 每日签到配置数据
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID查找
object_id = ObjectId("687cce278e77ba00a7414ba2")
result = collection.find_one({"_id": object_id})
if result:
# 移除MongoDB的_id字段
if "_id" in result:
del result["_id"]
self.logger.info("成功获取每日签到配置")
return result
else:
self.logger.warning("未找到每日签到配置")
return None
except Exception as e:
self.logger.error(f"获取每日签到配置失败: {e}")
return None
def set_game_config(self, config_type: str, config_data: Dict[str, Any]) -> bool:
"""
设置游戏配置
@@ -205,6 +175,37 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"设置游戏配置异常 [{config_type}]: {e}")
return False
#=====================每日签到系统======================
def get_daily_checkin_config(self) -> Optional[Dict[str, Any]]:
"""
获取每日签到配置
Returns:
Dict: 每日签到配置数据
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID查找
object_id = ObjectId("687cce278e77ba00a7414ba2")
result = collection.find_one({"_id": object_id})
if result:
# 移除MongoDB的_id字段
if "_id" in result:
del result["_id"]
self.logger.info("成功获取每日签到配置")
return result
else:
self.logger.warning("未找到每日签到配置")
return None
except Exception as e:
self.logger.error(f"获取每日签到配置失败: {e}")
return None
def update_daily_checkin_config(self, config_data: Dict[str, Any]) -> bool:
"""
@@ -240,7 +241,10 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"更新每日签到配置异常: {e}")
return False
#=====================每日签到系统======================
#=====================幸运抽奖系统======================
def get_lucky_draw_config(self) -> Optional[Dict[str, Any]]:
"""
获取幸运抽奖配置
@@ -306,7 +310,10 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"更新幸运抽奖配置异常: {e}")
return False
#=====================幸运抽奖系统======================
#=====================新手大礼包系统======================
def get_new_player_config(self) -> Optional[Dict[str, Any]]:
"""
获取新手大礼包配置
@@ -372,7 +379,10 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"更新新手大礼包配置异常: {e}")
return False
#=====================新手大礼包系统======================
#=====================智慧树系统======================
def get_wisdom_tree_config(self) -> Optional[Dict[str, Any]]:
"""
获取智慧树配置
@@ -438,7 +448,79 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"更新智慧树配置异常: {e}")
return False
#=====================智慧树系统======================
#=====================稻草人系统======================
def get_scare_crow_config(self) -> Optional[Dict[str, Any]]:
"""
获取稻草人配置
Returns:
Dict: 稻草人配置数据
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID查找
object_id = ObjectId("687cea258e77ba00a7414ba8")
result = collection.find_one({"_id": object_id})
if result:
# 移除MongoDB的_id字段和updated_at字段
if "_id" in result:
del result["_id"]
if "updated_at" in result:
del result["updated_at"]
self.logger.info("成功获取稻草人配置")
return result
else:
self.logger.warning("未找到稻草人配置")
return None
except Exception as e:
self.logger.error(f"获取稻草人配置失败: {e}")
return None
def update_scare_crow_config(self, config_data: Dict[str, Any]) -> bool:
"""
更新稻草人配置
Args:
config_data: 配置数据
Returns:
bool: 是否成功
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID更新
object_id = ObjectId("687cea258e77ba00a7414ba8")
# 添加更新时间
update_data = {
"updated_at": datetime.now(),
**config_data
}
result = collection.replace_one({"_id": object_id}, update_data)
if result.acknowledged and result.matched_count > 0:
self.logger.info("成功更新稻草人配置")
return True
else:
self.logger.error("更新稻草人配置失败")
return False
except Exception as e:
self.logger.error(f"更新稻草人配置异常: {e}")
return False
#=====================稻草人系统======================
#=====================在线礼包系统======================
def get_online_gift_config(self) -> Optional[Dict[str, Any]]:
"""
获取在线礼包配置
@@ -504,7 +586,216 @@ class SMYMongoDBAPI:
except Exception as e:
self.logger.error(f"更新在线礼包配置异常: {e}")
return False
#=====================在线礼包系统======================
#=====================道具配置系统======================
def get_item_config(self) -> Optional[Dict[str, Any]]:
"""
获取道具配置
Returns:
Dict: 道具配置数据
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID查找
object_id = ObjectId("687cf17c8e77ba00a7414baa")
result = collection.find_one({"_id": object_id})
if result:
# 移除MongoDB的_id字段和updated_at字段
if "_id" in result:
del result["_id"]
if "updated_at" in result:
del result["updated_at"]
self.logger.info("成功获取道具配置")
return result
else:
self.logger.warning("未找到道具配置")
return None
except Exception as e:
self.logger.error(f"获取道具配置失败: {e}")
return None
def update_item_config(self, config_data: Dict[str, Any]) -> bool:
"""
更新道具配置
Args:
config_data: 配置数据
Returns:
bool: 是否成功
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID更新
object_id = ObjectId("687cf17c8e77ba00a7414baa")
# 添加更新时间
update_data = {
"updated_at": datetime.now(),
**config_data
}
result = collection.replace_one({"_id": object_id}, update_data)
if result.acknowledged and result.matched_count > 0:
self.logger.info("成功更新道具配置")
return True
else:
self.logger.error("更新道具配置失败")
return False
except Exception as e:
self.logger.error(f"更新道具配置异常: {e}")
return False
#=====================道具配置系统======================
#=====================宠物配置系统======================
def get_pet_config(self) -> Optional[Dict[str, Any]]:
"""
获取宠物配置
Returns:
Dict: 宠物配置数据
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID查找
object_id = ObjectId("687cf59b8e77ba00a7414bab")
result = collection.find_one({"_id": object_id})
if result:
# 移除MongoDB的_id字段和updated_at字段
if "_id" in result:
del result["_id"]
if "updated_at" in result:
del result["updated_at"]
self.logger.info("成功获取宠物配置")
return result
else:
self.logger.warning("未找到宠物配置")
return None
except Exception as e:
self.logger.error(f"获取宠物配置失败: {e}")
return None
def update_pet_config(self, config_data: Dict[str, Any]) -> bool:
"""
更新宠物配置
Args:
config_data: 配置数据
Returns:
bool: 是否成功
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID更新
object_id = ObjectId("687cf59b8e77ba00a7414bab")
# 添加更新时间
update_data = {
"updated_at": datetime.now(),
**config_data
}
result = collection.replace_one({"_id": object_id}, update_data)
if result.acknowledged and result.matched_count > 0:
self.logger.info("成功更新宠物配置")
return True
else:
self.logger.error("更新宠物配置失败")
return False
except Exception as e:
self.logger.error(f"更新宠物配置异常: {e}")
return False
#=====================宠物配置系统======================
#=====================体力系统======================
def get_stamina_config(self) -> Optional[Dict[str, Any]]:
"""
获取体力系统配置
Returns:
Dict: 体力系统配置数据
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID查找
object_id = ObjectId("687cefba8e77ba00a7414ba9")
result = collection.find_one({"_id": object_id})
if result:
# 移除MongoDB的_id字段和updated_at字段
if "_id" in result:
del result["_id"]
if "updated_at" in result:
del result["updated_at"]
self.logger.info("成功获取体力系统配置")
return result
else:
self.logger.warning("未找到体力系统配置")
return None
except Exception as e:
self.logger.error(f"获取体力系统配置失败: {e}")
return None
def update_stamina_config(self, config_data: Dict[str, Any]) -> bool:
"""
更新体力系统配置
Args:
config_data: 配置数据
Returns:
bool: 是否成功
"""
try:
collection = self.get_collection("gameconfig")
# 使用已知的文档ID更新
object_id = ObjectId("687cefba8e77ba00a7414ba9")
# 添加更新时间
update_data = {
"updated_at": datetime.now(),
**config_data
}
result = collection.replace_one({"_id": object_id}, update_data)
if result.acknowledged and result.matched_count > 0:
self.logger.info("成功更新体力系统配置")
return True
else:
self.logger.error("更新体力系统配置失败")
return False
except Exception as e:
self.logger.error(f"更新体力系统配置异常: {e}")
return False
#=====================体力系统======================
# ========================= 通用数据库操作 =========================
def insert_document(self, collection_name: str, document: Dict[str, Any]) -> Optional[str]:
@@ -682,8 +973,33 @@ def test_api():
else:
print("✗ 获取在线礼包配置失败")
# 测试获取稻草人配置
print("\n6. 测试获取稻草人配置:")
scare_crow_config = api.get_scare_crow_config()
if scare_crow_config:
print("✓ 成功获取稻草人配置")
scare_crow_types = scare_crow_config.get("稻草人类型", {})
modify_cost = scare_crow_config.get("修改稻草人配置花费", "N/A")
print(f"稻草人类型数量: {len(scare_crow_types)}")
print(f"修改费用: {modify_cost}金币")
else:
print("✗ 获取稻草人配置失败")
# 测试获取宠物配置
print("\n7. 测试获取宠物配置:")
pet_config = api.get_pet_config()
if pet_config:
print("✓ 成功获取宠物配置")
pets = pet_config.get("宠物", {})
print(f"宠物类型数量: {len(pets)}")
if pets:
first_pet = list(pets.values())[0]
print(f"示例宠物属性: {list(first_pet.keys())[:3]}...")
else:
print("✗ 获取宠物配置失败")
# 测试查找所有游戏配置
print("\n6. 测试查找游戏配置集合:")
print("\n8. 测试查找游戏配置集合:")
try:
configs = api.find_documents("gameconfig")
print(f"找到 {len(configs)} 个配置文档")