优化结果

This commit is contained in:
2025-09-15 19:08:47 +08:00
parent 72084a8782
commit dcfa89e63c
357 changed files with 16156 additions and 1589 deletions

View File

@@ -0,0 +1,765 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
AI模型应用服务模块
Created by: 神奇万事通
Date: 2025-01-15
"""
from flask import Blueprint, request, jsonify
import requests
import json
import os
from datetime import datetime
# 创建蓝图
aimodelapp_bp = Blueprint('aimodelapp', __name__)
#加载AI配置文件
def load_ai_config():
"""加载AI配置文件"""
try:
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'ai_config.json')
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"加载AI配置失败: {e}")
return None
#调用DeepSeek API带重试机制
def call_deepseek_api(messages, model="deepseek-chat", max_retries=3):
"""调用DeepSeek API带重试机制"""
config = load_ai_config()
if not config or 'deepseek' not in config:
return None, "AI配置加载失败"
deepseek_config = config['deepseek']
headers = {
'Authorization': f'Bearer {deepseek_config["api_key"]}',
'Content-Type': 'application/json'
}
data = {
'model': model,
'messages': messages,
'temperature': 0.7,
'max_tokens': 2000
}
import time
for attempt in range(max_retries):
try:
# 增加超时时间到90秒
response = requests.post(
f"{deepseek_config['api_base']}/chat/completions",
headers=headers,
json=data,
timeout=90
)
if response.status_code == 200:
result = response.json()
return result['choices'][0]['message']['content'], None
else:
error_msg = f"API调用失败: {response.status_code} - {response.text}"
if attempt < max_retries - 1:
print(f"{attempt + 1}次尝试失败,等待重试: {error_msg}")
time.sleep(2 ** attempt) # 指数退避
continue
return None, error_msg
except requests.exceptions.Timeout:
error_msg = "API请求超时"
if attempt < max_retries - 1:
print(f"{attempt + 1}次尝试超时,等待重试")
time.sleep(2 ** attempt) # 指数退避
continue
return None, f"{error_msg}(已重试{max_retries}次)"
except Exception as e:
error_msg = f"API调用异常: {str(e)}"
if attempt < max_retries - 1:
print(f"{attempt + 1}次尝试异常,等待重试: {error_msg}")
time.sleep(2 ** attempt) # 指数退避
continue
return None, f"{error_msg}(已重试{max_retries}次)"
#调用Kimi API
def call_kimi_api(messages, model="kimi-k2-0905-preview"):
"""调用Kimi API"""
config = load_ai_config()
if not config or 'kimi' not in config:
return None, "AI配置加载失败"
kimi_config = config['kimi']
headers = {
'Authorization': f'Bearer {kimi_config["api_key"]}',
'Content-Type': 'application/json'
}
data = {
'model': model,
'messages': messages,
'temperature': 0.7,
'max_tokens': 2000
}
try:
response = requests.post(
f"{kimi_config['api_base']}/v1/chat/completions",
headers=headers,
json=data,
timeout=30
)
if response.status_code == 200:
result = response.json()
return result['choices'][0]['message']['content'], None
else:
return None, f"API调用失败: {response.status_code} - {response.text}"
except Exception as e:
return None, f"API调用异常: {str(e)}"
#统一的AI聊天接口
@aimodelapp_bp.route('/chat', methods=['POST'])
def ai_chat():
"""统一的AI聊天接口"""
try:
data = request.get_json()
if not data:
return jsonify({'error': '请求数据为空'}), 400
# 获取请求参数
messages = data.get('messages', [])
model_provider = data.get('provider', 'deepseek') # 默认使用deepseek
model_name = data.get('model', 'deepseek-chat') # 默认模型
if not messages:
return jsonify({'error': '消息内容不能为空'}), 400
# 根据提供商调用对应的API
if model_provider == 'deepseek':
content, error = call_deepseek_api(messages, model_name)
elif model_provider == 'kimi':
content, error = call_kimi_api(messages, model_name)
else:
return jsonify({'error': f'不支持的AI提供商: {model_provider}'}), 400
if error:
return jsonify({'error': error}), 500
return jsonify({
'success': True,
'content': content,
'provider': model_provider,
'model': model_name,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'服务器错误: {str(e)}'}), 500
#姓名分析专用接口
@aimodelapp_bp.route('/name-analysis', methods=['POST'])
def name_analysis():
"""姓名分析专用接口"""
try:
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'error': '姓名不能为空'}), 400
# 构建姓名分析的专业提示词
prompt = f"""你是一位专业的姓名学专家和语言学家,请对输入的姓名进行全面分析。请直接输出分析结果,不要包含任何思考过程或<think>标签。
姓名:{name}
请按照以下格式严格输出分析结果:
【稀有度评分】
评分X%
评价:[对稀有度的详细说明,包括姓氏和名字的常见程度分析]
【音韵评价】
评分X%
评价:[对音韵美感的分析,包括声调搭配、读音流畅度、音律和谐度等]
【含义解读】
[详细分析姓名的寓意内涵,包括:
1. 姓氏的历史渊源和文化背景
2. 名字各字的含义和象征
3. 整体姓名的寓意组合
4. 可能体现的父母期望或文化内涵
5. 与传统文化、诗词典故的关联等]
要求:
1. 评分必须是1-100的整数百分比要有明显区分度避免雷同
2. 分析要专业、客观、有依据,评分要根据实际情况有所差异
3. 含义解读要详细深入至少150字
4. 严格按照上述格式输出,不要添加思考过程、<think>标签或其他内容
5. 如果是生僻字或罕见姓名,要特别说明
6. 直接输出最终结果,不要显示推理过程"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行分析
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
return jsonify({
'success': True,
'analysis': content,
'name': name,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'姓名分析失败: {str(e)}'}), 500
#变量命名助手接口
@aimodelapp_bp.route('/variable-naming', methods=['POST'])
def variable_naming():
"""变量命名助手接口"""
try:
data = request.get_json()
description = data.get('description', '').strip()
language = data.get('language', 'javascript').lower()
if not description:
return jsonify({'error': '变量描述不能为空'}), 400
# 构建变量命名的提示词
prompt = f"""你是一个专业的变量命名助手。请根据以下描述为变量生成合适的名称:
描述:{description}
请为每种命名规范生成3个变量名建议
1. camelCase (驼峰命名法)
2. PascalCase (帕斯卡命名法)
3. snake_case (下划线命名法)
4. kebab-case (短横线命名法)
5. CONSTANT_CASE (常量命名法)
要求:
- 变量名要准确反映功能和用途
- 严格遵循各自的命名规范
- 避免使用缩写,除非是广泛认知的缩写
- 名称要简洁但具有描述性
- 考虑代码的可读性和维护性
请按以下JSON格式返回
{{
"suggestions": {{
"camelCase": [
{{"name": "变量名1", "description": "解释说明1"}},
{{"name": "变量名2", "description": "解释说明2"}},
{{"name": "变量名3", "description": "解释说明3"}}
],
"PascalCase": [
{{"name": "变量名1", "description": "解释说明1"}},
{{"name": "变量名2", "description": "解释说明2"}},
{{"name": "变量名3", "description": "解释说明3"}}
],
"snake_case": [
{{"name": "变量名1", "description": "解释说明1"}},
{{"name": "变量名2", "description": "解释说明2"}},
{{"name": "变量名3", "description": "解释说明3"}}
],
"kebab-case": [
{{"name": "变量名1", "description": "解释说明1"}},
{{"name": "变量名2", "description": "解释说明2"}},
{{"name": "变量名3", "description": "解释说明3"}}
],
"CONSTANT_CASE": [
{{"name": "变量名1", "description": "解释说明1"}},
{{"name": "变量名2", "description": "解释说明2"}},
{{"name": "变量名3", "description": "解释说明3"}}
]
}}
}}
只返回JSON格式的结果不要包含其他文字。"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行分析
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
# 解析AI返回的JSON格式数据
try:
# 尝试直接解析JSON
ai_response = json.loads(content)
suggestions = ai_response.get('suggestions', {})
except json.JSONDecodeError:
# 如果直接解析失败尝试提取JSON部分
import re
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
try:
ai_response = json.loads(json_match.group())
suggestions = ai_response.get('suggestions', {})
except json.JSONDecodeError:
return jsonify({'error': 'AI返回的数据格式无法解析'}), 500
else:
return jsonify({'error': 'AI返回的数据中未找到有效的JSON格式'}), 500
return jsonify({
'success': True,
'suggestions': suggestions,
'description': description,
'language': language,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'变量命名失败: {str(e)}'}), 500
@aimodelapp_bp.route('/poetry', methods=['POST'])
def poetry_assistant():
"""AI写诗助手接口"""
try:
data = request.get_json()
theme = data.get('theme', '').strip()
style = data.get('style', '现代诗').strip()
mood = data.get('mood', '').strip()
if not theme:
return jsonify({'error': '诗歌主题不能为空'}), 400
# 构建写诗的提示词
prompt = f"""你是一位才华横溢的诗人,请根据以下要求创作一首诗歌。
主题:{theme}
风格:{style}
情感基调:{mood if mood else '自由发挥'}
创作要求:
1. 紧扣主题,情感真挚
2. 语言优美,意境深远
3. 符合指定的诗歌风格
4. 长度适中,朗朗上口
5. 如果是古体诗,注意平仄和韵律
请直接输出诗歌作品,不需要额外的解释或分析。"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行创作
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
return jsonify({
'success': True,
'poem': content,
'theme': theme,
'style': style,
'mood': mood,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'诗歌创作失败: {str(e)}'}), 500
@aimodelapp_bp.route('/translation', methods=['POST'])
def translation():
"""AI语言翻译接口"""
try:
data = request.get_json()
source_text = data.get('source_text', '').strip()
target_language = data.get('target_language', 'zh-CN').strip()
if not source_text:
return jsonify({'error': '翻译内容不能为空'}), 400
# 语言映射
language_map = {
'zh-CN': '中文(简体)',
'zh-TW': '中文(繁体)',
'en': '英语',
'ja': '日语',
'ko': '韩语',
'fr': '法语',
'de': '德语',
'es': '西班牙语',
'it': '意大利语',
'pt': '葡萄牙语',
'ru': '俄语',
'ar': '阿拉伯语',
'hi': '印地语',
'th': '泰语',
'vi': '越南语'
}
target_language_name = language_map.get(target_language, target_language)
# 构建翻译的专业提示词
prompt = f"""你是一位专业的翻译专家,精通多种语言的翻译工作。请将以下文本翻译成{target_language_name}
原文:{source_text}
翻译要求:
1. 【信】- 忠实原文,准确传达原意,不遗漏、不添加、不歪曲
2. 【达】- 译文通顺流畅,符合目标语言的表达习惯和语法规范
3. 【雅】- 用词优美得体,风格与原文相符,具有良好的可读性
特别注意:
- 自动检测源语言,无需用户指定
- 保持原文的语气、情感色彩和文体风格
- 对于专业术语,提供准确的对应翻译
- 对于文化特色词汇,在保持原意的基础上进行适当的本土化处理
- 如果是单词或短语,提供多个常用含义的翻译
- 如果是句子,确保语法正确、表达自然
请按以下JSON格式返回翻译结果
{{
"detected_language": "检测到的源语言名称",
"target_language": "{target_language_name}",
"translation": "翻译结果",
"alternative_translations": [
"备选翻译1",
"备选翻译2",
"备选翻译3"
],
"explanation": "翻译说明(包括语境、用法、注意事项等)",
"pronunciation": "目标语言的发音指导(如适用)"
}}
只返回JSON格式的结果不要包含其他文字。"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行翻译
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
return jsonify({
'success': True,
'translation_result': content,
'source_text': source_text,
'target_language': target_language,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'翻译失败: {str(e)}'}), 500
#现代文转文言文接口
@aimodelapp_bp.route('/classical_conversion', methods=['POST'])
def classical_conversion():
"""现代文转文言文接口"""
try:
data = request.get_json()
modern_text = data.get('modern_text', '').strip()
style = data.get('style', '古雅').strip()
article_type = data.get('article_type', '散文').strip()
if not modern_text:
return jsonify({'error': '现代文内容不能为空'}), 400
# 构建文言文转换的专业提示词
prompt = f"""你是一位精通古代文言文的文学大师,擅长将现代文转换为优美的文言文。请将以下现代文转换为文言文。
现代文:{modern_text}
转换要求:
1. 风格:{style}
2. 文体:{article_type}
3. 保持原文的核心意思和情感色彩
4. 使用恰当的文言文语法和词汇
5. 注重音韵美感和文字的雅致
6. 根据不同风格调整用词和句式
风格说明:
- 古雅:典雅庄重,用词考究,句式工整
- 简洁:言简意赅,删繁就简,朴实无华
- 华丽:辞藻华美,对仗工整,音韵和谐
- 朴实:平实自然,通俗易懂,贴近生活
文体特点:
- 散文:行文自由,情理并茂
- 诗歌:讲究韵律,意境深远
- 议论文:逻辑严密,论证有力
- 记叙文:叙事生动,描写细腻
- 书信:格式规范,情真意切
- 公文:庄重严肃,用词准确
请按以下JSON格式返回转换结果
{{
"classical_text": "转换后的文言文",
"translation_notes": "转换说明,包括重要词汇的选择理由和语法特点",
"style_analysis": "风格分析,说明如何体现所选风格特点",
"difficulty_level": "难度等级(初级/中级/高级)",
"key_phrases": [
{{
"modern": "现代词汇",
"classical": "对应文言文词汇",
"explanation": "转换说明"
}}
],
"cultural_elements": "文化内涵说明,包含的典故、意象等"
}}
只返回JSON格式的结果不要包含其他文字。"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行文言文转换
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
return jsonify({
'success': True,
'conversion_result': content,
'modern_text': modern_text,
'style': style,
'article_type': article_type,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'文言文转换失败: {str(e)}'}), 500
#AI表情制作器接口
@aimodelapp_bp.route('/expression-maker', methods=['POST'])
def expression_maker():
"""AI表情制作器接口"""
try:
data = request.get_json()
text = data.get('text', '').strip()
style = data.get('style', 'mixed').strip()
if not text:
return jsonify({'error': '文字内容不能为空'}), 400
# 风格映射
style_prompts = {
'mixed': '混合使用Emoji表情和颜文字',
'emoji': '仅使用Emoji表情符号',
'kaomoji': '仅使用颜文字(日式表情符号)',
'cute': '使用可爱风格的表情符号',
'cool': '使用酷炫风格的表情符号'
}
style_description = style_prompts.get(style, style_prompts['mixed'])
# 构建表情制作的提示词
prompt = f"""你是一个专业的表情符号专家,擅长为文字内容生成合适的表情符号。请根据以下文字内容生成相应的表情符号:
文字内容:{text}
表情风格:{style_description}
请为这个文字内容生成表情符号,要求:
1. 准确表达文字的情感和含义
2. 符合指定的表情风格
3. 提供多样化的选择
4. 包含使用场景说明
请按以下分类生成表情符号:
1. Emoji表情使用Unicode表情符号
2. 颜文字使用ASCII字符组成的表情
3. 组合表情(多个符号组合使用)
每个分类提供5个不同的表情选项每个选项包含
- 表情符号本身
- 适用场景说明
- 情感强度(轻微/中等/强烈)
请按以下JSON格式返回
{{
"expressions": {{
"emoji": [
{{
"symbol": "😊",
"description": "适用场景和情感说明",
"intensity": "中等",
"usage": "使用建议"
}}
],
"kaomoji": [
{{
"symbol": "(^_^)",
"description": "适用场景和情感说明",
"intensity": "轻微",
"usage": "使用建议"
}}
],
"combination": [
{{
"symbol": "🎉✨",
"description": "适用场景和情感说明",
"intensity": "强烈",
"usage": "使用建议"
}}
]
}},
"summary": {{
"emotion_analysis": "对输入文字的情感分析",
"recommended_usage": "推荐的使用场景",
"style_notes": "风格特点说明"
}}
}}
只返回JSON格式的结果不要包含其他文字。"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行分析
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
# 解析AI返回的JSON格式数据
try:
# 尝试直接解析JSON
ai_response = json.loads(content)
expressions = ai_response.get('expressions', {})
summary = ai_response.get('summary', {})
except json.JSONDecodeError:
# 如果直接解析失败尝试提取JSON部分
import re
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
try:
ai_response = json.loads(json_match.group())
expressions = ai_response.get('expressions', {})
summary = ai_response.get('summary', {})
except json.JSONDecodeError:
return jsonify({'error': 'AI返回的数据格式无法解析'}), 500
else:
return jsonify({'error': 'AI返回的数据中未找到有效的JSON格式'}), 500
return jsonify({
'success': True,
'expressions': expressions,
'summary': summary,
'text': text,
'style': style,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'表情制作失败: {str(e)}'}), 500
#Linux命令生成接口
@aimodelapp_bp.route('/linux-command', methods=['POST'])
def linux_command_generator():
"""Linux命令生成接口"""
try:
data = request.get_json()
task_description = data.get('task_description', '').strip()
difficulty_level = data.get('difficulty_level', 'beginner').strip()
if not task_description:
return jsonify({'error': '任务描述不能为空'}), 400
# 构建Linux命令生成的专业提示词
prompt = f"""你是一位Linux系统专家请根据用户的任务描述生成相应的Linux命令。
任务描述:{task_description}
用户水平:{difficulty_level}
请为这个任务生成合适的Linux命令要求
1. 命令准确可用符合Linux标准
2. 根据用户水平提供适当的复杂度
3. 提供多种实现方式(如果有的话)
4. 包含安全提示和注意事项
5. 解释每个命令的作用和参数
用户水平说明:
- beginner初学者提供基础命令详细解释
- intermediate中级提供常用命令和选项
- advanced高级提供高效命令和高级用法
请按以下JSON格式返回
{{
"commands": [
{{
"command": "具体的Linux命令",
"description": "命令的详细说明",
"safety_level": "safe/caution/dangerous",
"explanation": "命令各部分的解释",
"example_output": "预期的命令输出示例",
"alternatives": ["替代命令1", "替代命令2"]
}}
],
"safety_warnings": ["安全提示1", "安全提示2"],
"prerequisites": ["前置条件1", "前置条件2"],
"related_concepts": ["相关概念1", "相关概念2"]
}}
只返回JSON格式的结果不要包含其他文字。"""
messages = [
{"role": "user", "content": prompt}
]
# 使用DeepSeek进行命令生成
content, error = call_deepseek_api(messages)
if error:
return jsonify({'error': error}), 500
return jsonify({
'success': True,
'command_result': content,
'task_description': task_description,
'difficulty_level': difficulty_level,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return jsonify({'error': f'Linux命令生成失败: {str(e)}'}), 500
#获取可用的AI模型列表
@aimodelapp_bp.route('/models', methods=['GET'])
def get_available_models():
"""获取可用的AI模型列表"""
try:
config = load_ai_config()
if not config:
return jsonify({'error': 'AI配置加载失败'}), 500
models = {}
for provider, provider_config in config.items():
if 'model' in provider_config:
models[provider] = provider_config['model']
return jsonify({
'success': True,
'models': models,
'default_provider': 'deepseek',
'default_model': 'deepseek-chat'
})
except Exception as e:
return jsonify({'error': f'获取模型列表失败: {str(e)}'}), 500

View File

@@ -0,0 +1,476 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户认证模块
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, session, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
import re
import jwt
from datetime import datetime, timedelta
from functools import wraps
from .email_service import send_verification_email, verify_code, is_qq_email, get_qq_avatar_url
auth_bp = Blueprint('auth', __name__)
def generate_token(user_data):
"""生成JWT token"""
payload = {
'user_id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username'],
'exp': datetime.utcnow() + timedelta(days=7), # 7天过期
'iat': datetime.utcnow()
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
def verify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
return {'success': True, 'data': payload}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Token已过期'}
except jwt.InvalidTokenError:
return {'success': False, 'message': 'Token无效'}
def token_required(f):
"""JWT token验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'success': False, 'message': '缺少认证token'}), 401
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if not result['success']:
return jsonify({'success': False, 'message': result['message']}), 401
request.current_user = result['data']
return f(*args, **kwargs)
return decorated
def validate_qq_email(email):
"""验证QQ邮箱格式"""
return is_qq_email(email)
def validate_password(password):
"""验证密码格式6-20位"""
return 6 <= len(password) <= 20
@auth_bp.route('/send-verification', methods=['POST'])
def send_verification():
"""发送验证码邮件"""
try:
data = request.get_json()
email = data.get('email', '').strip()
verification_type = data.get('type', 'register') # register, login
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱qq.com、vip.qq.com、foxmail.com'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已注册
existing_user = users_collection.find_one({'邮箱': email})
if verification_type == 'register' and existing_user:
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
if verification_type == 'login' and not existing_user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 发送验证码
result = send_verification_email(email, verification_type)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
current_app.logger.error(f"发送验证码失败: {str(e)}")
return jsonify({
'success': False,
'message': '发送失败,请稍后重试'
}), 500
@auth_bp.route('/verify-code', methods=['POST'])
def verify_verification_code():
"""验证验证码"""
try:
data = request.get_json()
email = data.get('email', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email or not code:
return jsonify({
'success': False,
'message': '邮箱和验证码不能为空'
}), 400
# 验证码校验
result = verify_code(email, code)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"验证码校验失败: {str(e)}")
return jsonify({
'success': False,
'message': '验证失败,请稍后重试'
}), 500
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册(需要先验证邮箱)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
username = data.get('username', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not all([email, username, password, code]):
return jsonify({
'success': False,
'message': '所有字段都不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱注册'
}), 400
if not validate_password(password):
return jsonify({
'success': False,
'message': '密码长度必须在6-20位之间'
}), 400
# 验证验证码
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'register':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已被注册
if users_collection.find_one({'邮箱': email}):
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
# 检查用户名是否已被使用
if users_collection.find_one({'用户名': username}):
return jsonify({
'success': False,
'message': '该用户名已被使用'
}), 409
# 获取QQ头像
avatar_url = get_qq_avatar_url(email)
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'邮箱': email,
'用户名': username,
'密码': password_hash,
'头像': avatar_url,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active',
'等级': 0,
'经验': 0,
'萌芽币': 0,
'签到系统': {
'连续签到天数': 0,
'今日是否已签到': False,
'签到时间': '2025-01-01'
}
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!',
'user': {
'email': email,
'username': username,
'avatar': avatar_url
}
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
current_app.logger.error(f"注册失败: {str(e)}")
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
if existing_user:
return jsonify({
'success': False,
'message': '该账号已被注册'
}), 409
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'账号': account,
'密码': password_hash,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active'
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!'
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录(支持邮箱+验证码或邮箱+密码)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱登录'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 查找用户
user = users_collection.find_one({'邮箱': email})
if not user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 检查用户状态
if user.get('用户状态') != 'active':
return jsonify({
'success': False,
'message': '账号已被禁用,请联系管理员'
}), 403
# 验证方式:验证码登录或密码登录
if code:
# 验证码登录
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'login':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
elif password:
# 密码登录
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
else:
return jsonify({
'success': False,
'message': '请输入密码或验证码'
}), 400
# 登录成功,更新用户信息
users_collection.update_one(
{'邮箱': email},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
# 生成JWT token
user_data = {
'user_id': str(user['_id']),
'email': email,
'username': user.get('用户名', '')
}
token = generate_token(user_data)
return jsonify({
'success': True,
'message': '登录成功!',
'token': token,
'user': {
'id': str(user['_id']),
'email': email,
'username': user.get('用户名', ''),
'avatar': user.get('头像', ''),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
current_app.logger.error(f"登录失败: {str(e)}")
return jsonify({
'success': False,
'message': '登录失败,请稍后重试'
}), 500
# 登录成功,创建会话
session['user_id'] = str(user['_id'])
session['account'] = user['账号']
session['logged_in'] = True
# 更新登录信息
users_collection.update_one(
{'_id': user['_id']},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'account': user['账号'],
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
# JWT是无状态的客户端删除token即可
return jsonify({
'success': True,
'message': '已成功登出'
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/check', methods=['GET'])
def check_login():
"""检查登录状态"""
try:
token = request.headers.get('Authorization')
if not token:
return jsonify({
'success': True,
'logged_in': False
}), 200
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if result['success']:
user_data = result['data']
return jsonify({
'success': True,
'logged_in': True,
'user': {
'id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username']
}
}), 200
else:
return jsonify({
'success': True,
'logged_in': False
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500

View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件发送模块
负责处理用户注册、登录验证邮件
"""
import random
import string
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.header import Header
from flask import current_app
import logging
import os
# 验证码存储生产环境建议使用Redis
verification_codes = {}
def init_mail(app):
"""初始化邮件配置"""
# 使用smtplib直接发送不需要Flask-Mail
pass
def generate_verification_code(length=6):
"""生成验证码"""
return ''.join(random.choices(string.digits, k=length))
def send_verification_email(email, verification_type='register'):
"""
发送验证邮件
Args:
email: 收件人邮箱
verification_type: 验证类型 ('register', 'login', 'reset_password')
Returns:
dict: 发送结果
"""
try:
# 验证QQ邮箱格式
if not is_qq_email(email):
return {
'success': False,
'message': '仅支持QQ邮箱注册登录'
}
# 生成验证码
code = generate_verification_code()
# 存储验证码5分钟有效期
verification_codes[email] = {
'code': code,
'type': verification_type,
'expires_at': datetime.now() + timedelta(minutes=5),
'attempts': 0
}
# 获取邮件配置 - 使用与QQEmailSendAPI相同的配置
sender_email = os.environ.get('MAIL_USERNAME', '3205788256@qq.com')
sender_password = os.environ.get('MAIL_PASSWORD', 'szcaxvbftusqddhi')
# 邮件模板
if verification_type == 'register':
subject = '【InfoGenie】注册验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">InfoGenie 神奇万事通</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">欢迎注册InfoGenie</p>
</div>
<div style="background: linear-gradient(135deg, #a8e6cf 0%, #dcedc1 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: #2e7d32; margin-bottom: 20px;">验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: #4a4a4a; margin: 15px 0;">请在5分钟内输入此验证码完成注册</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果您没有申请注册,请忽略此邮件<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
else: # login
subject = '【InfoGenie】登录验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">InfoGenie 神奇万事通</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">安全登录验证</p>
</div>
<div style="background: linear-gradient(135deg, #81c784 0%, #66bb6a 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: white; margin-bottom: 20px;">登录验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: white; margin: 15px 0;">请在5分钟内输入此验证码完成登录</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果不是您本人操作,请检查账户安全<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
# 创建邮件 - 使用与QQEmailSendAPI相同的方式
message = MIMEText(html_content, 'html', 'utf-8')
message['From'] = sender_email # 直接使用邮箱地址不使用Header包装
message['To'] = email
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件 - 使用SSL端口465
try:
# 使用与QQEmailSendAPI相同的连接方式
smtp_obj = smtplib.SMTP_SSL('smtp.qq.com', 465)
smtp_obj.login(sender_email, sender_password)
smtp_obj.sendmail(sender_email, [email], message.as_string())
smtp_obj.quit()
print(f"验证码邮件发送成功: {email}")
return {
'success': True,
'message': '验证码已发送到您的邮箱',
'email': email
}
except smtplib.SMTPAuthenticationError as auth_error:
print(f"SMTP认证失败: {str(auth_error)}")
return {
'success': False,
'message': 'SMTP认证失败请检查邮箱配置'
}
except smtplib.SMTPConnectError as conn_error:
print(f"SMTP连接失败: {str(conn_error)}")
return {
'success': False,
'message': 'SMTP服务器连接失败'
}
except Exception as smtp_error:
print(f"SMTP发送失败: {str(smtp_error)}")
return {
'success': False,
'message': f'邮件发送失败: {str(smtp_error)}'
}
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return {
'success': False,
'message': '邮件发送失败,请稍后重试'
}
def verify_code(email, code):
"""
验证验证码
Args:
email: 邮箱地址
code: 验证码
Returns:
dict: 验证结果
"""
if email not in verification_codes:
return {
'success': False,
'message': '验证码不存在或已过期'
}
stored_info = verification_codes[email]
# 检查过期时间
if datetime.now() > stored_info['expires_at']:
del verification_codes[email]
return {
'success': False,
'message': '验证码已过期,请重新获取'
}
# 检查尝试次数
if stored_info['attempts'] >= 3:
del verification_codes[email]
return {
'success': False,
'message': '验证码输入错误次数过多,请重新获取'
}
# 验证码校验
if stored_info['code'] != code:
stored_info['attempts'] += 1
return {
'success': False,
'message': f'验证码错误,还可尝试{3 - stored_info["attempts"]}'
}
# 验证成功,删除验证码
verification_type = stored_info['type']
del verification_codes[email]
return {
'success': True,
'message': '验证码验证成功',
'type': verification_type
}
def is_qq_email(email):
"""
验证是否为QQ邮箱
Args:
email: 邮箱地址
Returns:
bool: 是否为QQ邮箱
"""
if not email or '@' not in email:
return False
domain = email.split('@')[1].lower()
qq_domains = ['qq.com', 'vip.qq.com', 'foxmail.com']
return domain in qq_domains
def get_qq_avatar_url(email):
"""
根据QQ邮箱获取QQ头像URL
Args:
email: QQ邮箱地址
Returns:
str: QQ头像URL
"""
if not is_qq_email(email):
return None
# 提取QQ号码
qq_number = email.split('@')[0]
# 验证是否为纯数字QQ号
if not qq_number.isdigit():
return None
# 返回QQ头像API URL
return f"http://q1.qlogo.cn/g?b=qq&nk={qq_number}&s=100"
def cleanup_expired_codes():
"""清理过期的验证码"""
current_time = datetime.now()
expired_emails = [
email for email, info in verification_codes.items()
if current_time > info['expires_at']
]
for email in expired_emails:
del verification_codes[email]
return len(expired_emails)

View File

@@ -0,0 +1,408 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户管理模块
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, session, current_app
from datetime import datetime
from bson import ObjectId
import jwt
from functools import wraps
user_bp = Blueprint('user', __name__)
def verify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
return {'success': True, 'data': payload}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Token已过期'}
except jwt.InvalidTokenError:
return {'success': False, 'message': 'Token无效'}
def login_required(f):
"""登录验证装饰器支持JWT token和session"""
@wraps(f)
def decorated_function(*args, **kwargs):
# 优先检查JWT token
token = request.headers.get('Authorization')
if token:
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if result['success']:
request.current_user = result['data']
return f(*args, **kwargs)
# 回退到session验证
if not session.get('logged_in'):
return jsonify({
'success': False,
'message': '请先登录'
}), 401
return f(*args, **kwargs)
return decorated_function
@user_bp.route('/profile', methods=['GET'])
@login_required
def get_profile():
"""获取用户资料"""
try:
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
# 返回用户信息(不包含密码)
profile = {
'account': user['账号'],
'register_time': user.get('注册时间'),
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0),
'status': user.get('用户状态', 'active')
}
return jsonify({
'success': True,
'data': profile
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/change-password', methods=['POST'])
@login_required
def change_password():
"""修改密码"""
try:
data = request.get_json()
old_password = data.get('old_password', '').strip()
new_password = data.get('new_password', '').strip()
if not old_password or not new_password:
return jsonify({
'success': False,
'message': '旧密码和新密码不能为空'
}), 400
if len(new_password) < 6 or len(new_password) > 20:
return jsonify({
'success': False,
'message': '新密码长度必须在6-20位之间'
}), 400
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
from werkzeug.security import check_password_hash, generate_password_hash
# 验证旧密码
if not check_password_hash(user['密码'], old_password):
return jsonify({
'success': False,
'message': '原密码错误'
}), 401
# 更新密码
new_password_hash = generate_password_hash(new_password)
result = users_collection.update_one(
{'_id': ObjectId(user_id)},
{'$set': {'密码': new_password_hash}}
)
if result.modified_count > 0:
return jsonify({
'success': True,
'message': '密码修改成功'
}), 200
else:
return jsonify({
'success': False,
'message': '密码修改失败'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/stats', methods=['GET'])
@login_required
def get_user_stats():
"""获取用户统计信息"""
try:
user_id = session.get('user_id')
# 这里可以添加更多统计信息比如API调用次数等
stats = {
'login_today': 1, # 今日登录次数
'api_calls_today': 0, # 今日API调用次数
'total_api_calls': 0, # 总API调用次数
'join_days': 1, # 加入天数
'last_activity': datetime.now().isoformat()
}
return jsonify({
'success': True,
'data': stats
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/game-data', methods=['GET'])
@login_required
def get_user_game_data():
"""获取用户游戏数据"""
try:
# 优先从JWT token获取用户ID
if hasattr(request, 'current_user'):
user_id = request.current_user['user_id']
else:
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
# 返回用户游戏数据
game_data = {
'level': user.get('等级', 0),
'experience': user.get('经验', 0),
'coins': user.get('萌芽币', 0),
'checkin_system': user.get('签到系统', {
'连续签到天数': 0,
'今日是否已签到': False,
'签到时间': '2025-01-01'
})
}
return jsonify({
'success': True,
'data': game_data
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/checkin', methods=['POST'])
@login_required
def daily_checkin():
"""每日签到"""
try:
# 优先从JWT token获取用户ID
if hasattr(request, 'current_user'):
user_id = request.current_user['user_id']
else:
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
# 获取当前日期
today = datetime.now().strftime('%Y-%m-%d')
# 获取签到系统数据
checkin_system = user.get('签到系统', {
'连续签到天数': 0,
'今日是否已签到': False,
'签到时间': '2025-01-01'
})
# 检查今日是否已签到
if checkin_system.get('今日是否已签到', False) and checkin_system.get('签到时间') == today:
return jsonify({
'success': False,
'message': '今日已签到,请明天再来!'
}), 400
# 计算连续签到天数
last_checkin_date = checkin_system.get('签到时间', '2025-01-01')
consecutive_days = checkin_system.get('连续签到天数', 0)
# 检查是否连续签到
if last_checkin_date:
try:
last_date = datetime.strptime(last_checkin_date, '%Y-%m-%d')
today_date = datetime.strptime(today, '%Y-%m-%d')
days_diff = (today_date - last_date).days
if days_diff == 1:
# 连续签到
consecutive_days += 1
elif days_diff > 1:
# 断签,重新开始
consecutive_days = 1
else:
# 同一天,不应该发生
consecutive_days = consecutive_days
except:
consecutive_days = 1
else:
consecutive_days = 1
# 签到奖励
coin_reward = 300
exp_reward = 200
# 获取当前用户数据
current_coins = user.get('萌芽币', 0)
current_exp = user.get('经验', 0)
current_level = user.get('等级', 0)
# 计算新的经验和等级
new_exp = current_exp + exp_reward
new_level = current_level
# 等级升级逻辑100 × 1.2^(等级)
while True:
exp_needed = int(100 * (1.2 ** new_level))
if new_exp >= exp_needed:
new_exp -= exp_needed
new_level += 1
else:
break
# 更新用户数据
update_data = {
'萌芽币': current_coins + coin_reward,
'经验': new_exp,
'等级': new_level,
'签到系统': {
'连续签到天数': consecutive_days,
'今日是否已签到': True,
'签到时间': today
}
}
result = users_collection.update_one(
{'_id': ObjectId(user_id)},
{'$set': update_data}
)
if result.modified_count > 0:
level_up = new_level > current_level
return jsonify({
'success': True,
'message': '签到成功!',
'data': {
'coin_reward': coin_reward,
'exp_reward': exp_reward,
'consecutive_days': consecutive_days,
'level_up': level_up,
'new_level': new_level,
'new_coins': current_coins + coin_reward,
'new_exp': new_exp
}
}), 200
else:
return jsonify({
'success': False,
'message': '签到失败,请稍后重试'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/delete', methods=['POST'])
@login_required
def delete_account():
"""删除账户"""
try:
data = request.get_json()
password = data.get('password', '').strip()
if not password:
return jsonify({
'success': False,
'message': '请输入密码确认删除'
}), 400
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
from werkzeug.security import check_password_hash
# 验证密码
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
# 删除用户
result = users_collection.delete_one({'_id': ObjectId(user_id)})
if result.deleted_count > 0:
# 清除会话
session.clear()
return jsonify({
'success': True,
'message': '账户已成功删除'
}), 200
else:
return jsonify({
'success': False,
'message': '删除失败'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500