60sapi接口搭建完毕,数据库连接测试成功,登录注册部分简单完成

This commit is contained in:
2025-09-02 19:45:50 +08:00
parent b139fb14d9
commit e1f8885c6c
150 changed files with 53045 additions and 8 deletions

127
backend/app.py Normal file
View File

@@ -0,0 +1,127 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
InfoGenie 后端主应用入口
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Flask, jsonify, request, session, send_from_directory
from flask_cors import CORS
from flask_pymongo import PyMongo
import os
from datetime import datetime, timedelta
import hashlib
import secrets
# 导入模块
from modules.auth import auth_bp
from modules.api_60s import api_60s_bp
from modules.user_management import user_bp
from modules.email_service import init_mail
from config import Config
def create_app():
"""创建Flask应用实例"""
app = Flask(__name__)
# 加载配置
app.config.from_object(Config)
# 启用CORS跨域支持
CORS(app, supports_credentials=True)
# 初始化MongoDB
mongo = PyMongo(app)
app.mongo = mongo
# 初始化邮件服务
init_mail(app)
# 注册蓝图
app.register_blueprint(auth_bp, url_prefix='/api/auth')
app.register_blueprint(api_60s_bp, url_prefix='/api/60s')
app.register_blueprint(user_bp, url_prefix='/api/user')
# 基础路由
@app.route('/')
def index():
"""API根路径"""
return jsonify({
'message': '✨ 神奇万事通 API 服务运行中 ✨',
'version': '1.0.0',
'timestamp': datetime.now().isoformat(),
'endpoints': {
'auth': '/api/auth',
'60s_api': '/api/60s',
'user': '/api/user'
}
})
@app.route('/api/health')
def health_check():
"""健康检查接口"""
try:
# 检查数据库连接
mongo.db.command('ping')
db_status = 'connected'
except Exception as e:
db_status = f'error: {str(e)}'
return jsonify({
'status': 'running',
'database': db_status,
'timestamp': datetime.now().isoformat()
})
# 60sapi静态文件服务
@app.route('/60sapi/<path:filename>')
def serve_60sapi_files(filename):
"""提供60sapi目录下的静态文件服务"""
try:
# 获取项目根目录
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
api_directory = os.path.join(project_root, 'frontend', '60sapi')
# 安全检查:确保文件路径在允许的目录内
full_path = os.path.join(api_directory, filename)
if not os.path.commonpath([api_directory, full_path]) == api_directory:
return jsonify({'error': '非法文件路径'}), 403
# 检查文件是否存在
if not os.path.exists(full_path):
return jsonify({'error': '文件不存在'}), 404
# 获取文件目录和文件名
directory = os.path.dirname(full_path)
file_name = os.path.basename(full_path)
return send_from_directory(directory, file_name)
except Exception as e:
return jsonify({'error': f'文件服务错误: {str(e)}'}), 500
# 错误处理
@app.errorhandler(404)
def not_found(error):
return jsonify({
'error': 'API接口不存在',
'message': '请检查请求路径是否正确'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'error': '服务器内部错误',
'message': '请稍后重试或联系管理员'
}), 500
return app
if __name__ == '__main__':
app = create_app()
print("🚀 启动 InfoGenie 后端服务...")
print("📡 API地址: http://localhost:5000")
print("📚 文档地址: http://localhost:5000/api/health")
app.run(debug=True, host='0.0.0.0', port=5000)

87
backend/config.py Normal file
View File

@@ -0,0 +1,87 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
InfoGenie 配置文件
Created by: 神奇万事通
Date: 2025-09-02
"""
import os
from datetime import timedelta
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
class Config:
"""应用配置类"""
# 基础配置
SECRET_KEY = os.environ.get('SECRET_KEY') or 'infogenie-secret-key-2025'
# MongoDB 配置
MONGO_URI = os.environ.get('MONGO_URI') or 'mongodb://localhost:27017/InfoGenie'
# Session 配置
PERMANENT_SESSION_LIFETIME = timedelta(days=7) # 会话持续7天
SESSION_COOKIE_SECURE = False # 开发环境设为False生产环境设为True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
# 邮件配置
MAIL_SERVER = 'smtp.qq.com'
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USE_TLS = False
MAIL_USERNAME = os.environ.get('MAIL_USERNAME') or 'your-email@qq.com'
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') or 'your-app-password'
MAIL_DEFAULT_SENDER = ('InfoGenie 神奇万事通', os.environ.get('MAIL_USERNAME') or 'your-email@qq.com')
# API 配置
API_RATE_LIMIT = '100 per hour' # API调用频率限制
# 外部API配置
EXTERNAL_APIS = {
'60s': [
'https://60s.api.shumengya.top',
'https://60s-cf.viki.moe',
'https://60s.viki.moe',
'https://60s.b23.run',
'https://60s.114128.xyz',
'https://60s-cf.114128.xyz'
]
}
# 应用信息
APP_INFO = {
'name': '✨ 神奇万事通 ✨',
'description': '🎨 一个多功能的聚合软件应用 💬',
'author': '👨‍💻 by-神奇万事通',
'version': '1.0.0',
'icp': '📄 蜀ICP备2025151694号'
}
class DevelopmentConfig(Config):
"""开发环境配置"""
DEBUG = True
TESTING = False
class ProductionConfig(Config):
"""生产环境配置"""
DEBUG = False
TESTING = False
SESSION_COOKIE_SECURE = True
class TestingConfig(Config):
"""测试环境配置"""
DEBUG = True
TESTING = True
MONGO_URI = 'mongodb://localhost:27017/InfoGenie_Test'
# 配置字典
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig,
'default': DevelopmentConfig
}

View File

@@ -0,0 +1,92 @@
# InfoGenie 邮件服务修复说明
## 修复内容
### 问题描述
原始的 `email_service.py` 中的邮件发送功能存在问题,无法正常发送验证码邮件。
### 修复方案
参考成功的 `QQEmailSendAPI.py` 实现,对 `email_service.py` 进行了以下修复:
1. **SMTP连接方式优化**
-`with smtplib.SMTP_SSL()` 改为直接使用 `smtplib.SMTP_SSL()`
- 显式调用 `smtp_obj.quit()` 关闭连接
2. **邮件头设置优化**
- 确保 `From` 字段直接使用邮箱地址,不使用 `Header` 包装
- 保持与成功实现的一致性
3. **错误处理增强**
- 添加了针对 `SMTPAuthenticationError` 的专门处理
- 添加了针对 `SMTPConnectError` 的专门处理
- 提供更详细的错误信息
4. **调试信息优化**
- 添加了适量的日志输出用于问题诊断
- 移除了生产环境不安全的验证码返回
## 配置要求
### 环境变量
确保设置以下环境变量:
```bash
MAIL_USERNAME=your-qq-email@qq.com
MAIL_PASSWORD=your-qq-auth-code
```
### QQ邮箱授权码
1. 登录QQ邮箱
2. 进入设置 -> 账户
3. 开启SMTP服务
4. 获取授权码不是QQ密码
## 使用方法
### 发送验证码
```python
from modules.email_service import send_verification_email
# 发送注册验证码
result = send_verification_email('user@qq.com', 'register')
# 发送登录验证码
result = send_verification_email('user@qq.com', 'login')
```
### 验证验证码
```python
from modules.email_service import verify_code
# 验证用户输入的验证码
result = verify_code('user@qq.com', '123456')
```
## 测试
运行测试脚本验证功能:
```bash
cd backend
python test/test_email_fix.py
```
## 支持的邮箱
目前仅支持QQ邮箱系列
- @qq.com
- @vip.qq.com
- @foxmail.com
## 注意事项
1. **安全性**验证码不会在API响应中返回仅通过邮件发送
2. **有效期**验证码有效期为5分钟
3. **尝试次数**每个验证码最多可尝试验证3次
4. **频率限制**:建议添加发送频率限制防止滥用
## 修复文件
- `backend/modules/email_service.py` - 主要修复文件
- `backend/test/test_email_fix.py` - 测试脚本
- `backend/邮件服务修复说明.md` - 本说明文档
修复完成后,邮件发送功能已正常工作,可以成功发送注册和登录验证码邮件。

419
backend/modules/api_60s.py Normal file
View File

@@ -0,0 +1,419 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
60s API模块 - 提供各种实时数据接口
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Blueprint, jsonify, request
import requests
import json
from datetime import datetime, timedelta
import random
import time
api_60s_bp = Blueprint('api_60s', __name__)
# API配置
API_ENDPOINTS = {
'抖音热搜': {
'urls': [
'https://api.vvhan.com/api/hotlist?type=douyin',
'https://tenapi.cn/v2/douyinhot',
'https://api.oioweb.cn/api/common/tebie/dyhot'
],
'cache_time': 600 # 10分钟缓存
},
'微博热搜': {
'urls': [
'https://api.vvhan.com/api/hotlist?type=weibo',
'https://tenapi.cn/v2/wbhot',
'https://api.oioweb.cn/api/common/tebie/wbhot'
],
'cache_time': 300 # 5分钟缓存
},
'猫眼票房': {
'urls': [
'https://api.vvhan.com/api/hotlist?type=maoyan',
'https://tenapi.cn/v2/maoyan'
],
'cache_time': 3600 # 1小时缓存
},
'网易云音乐': {
'urls': [
'https://api.vvhan.com/api/hotlist?type=netease',
'https://tenapi.cn/v2/music'
],
'cache_time': 1800 # 30分钟缓存
},
'HackerNews': {
'urls': [
'https://api.vvhan.com/api/hotlist?type=hackernews',
'https://hacker-news.firebaseio.com/v0/topstories.json'
],
'cache_time': 1800 # 30分钟缓存
}
}
# 内存缓存
cache = {}
def fetch_data_with_fallback(urls, timeout=10):
"""使用备用URL获取数据"""
for url in urls:
try:
response = requests.get(url, timeout=timeout, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"URL {url} 失败: {str(e)}")
continue
return None
def get_cached_data(key, cache_time):
"""获取缓存数据"""
if key in cache:
cached_time, data = cache[key]
if datetime.now() - cached_time < timedelta(seconds=cache_time):
return data
return None
def set_cache_data(key, data):
"""设置缓存数据"""
cache[key] = (datetime.now(), data)
@api_60s_bp.route('/douyin', methods=['GET'])
def get_douyin_hot():
"""获取抖音热搜榜"""
try:
# 检查缓存
cached = get_cached_data('douyin', API_ENDPOINTS['抖音热搜']['cache_time'])
if cached:
return jsonify({
'success': True,
'data': cached,
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'from_cache': True
})
# 获取新数据
data = fetch_data_with_fallback(API_ENDPOINTS['抖音热搜']['urls'])
if data:
# 标准化数据格式
if 'data' in data:
hot_list = data['data']
elif isinstance(data, list):
hot_list = data
else:
hot_list = []
result = {
'title': '抖音热搜榜',
'subtitle': '实时热门话题 · 紧跟潮流趋势',
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total': len(hot_list),
'list': hot_list[:50] # 最多返回50条
}
# 设置缓存
set_cache_data('douyin', result)
return jsonify({
'success': True,
'data': result,
'from_cache': False
})
else:
return jsonify({
'success': False,
'message': '获取数据失败,所有数据源暂时不可用'
}), 503
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@api_60s_bp.route('/weibo', methods=['GET'])
def get_weibo_hot():
"""获取微博热搜榜"""
try:
# 检查缓存
cached = get_cached_data('weibo', API_ENDPOINTS['微博热搜']['cache_time'])
if cached:
return jsonify({
'success': True,
'data': cached,
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'from_cache': True
})
# 获取新数据
data = fetch_data_with_fallback(API_ENDPOINTS['微博热搜']['urls'])
if data:
if 'data' in data:
hot_list = data['data']
elif isinstance(data, list):
hot_list = data
else:
hot_list = []
result = {
'title': '微博热搜榜',
'subtitle': '热门话题 · 实时更新',
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total': len(hot_list),
'list': hot_list[:50]
}
set_cache_data('weibo', result)
return jsonify({
'success': True,
'data': result,
'from_cache': False
})
else:
return jsonify({
'success': False,
'message': '获取数据失败,所有数据源暂时不可用'
}), 503
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@api_60s_bp.route('/maoyan', methods=['GET'])
def get_maoyan_box_office():
"""获取猫眼票房排行榜"""
try:
cached = get_cached_data('maoyan', API_ENDPOINTS['猫眼票房']['cache_time'])
if cached:
return jsonify({
'success': True,
'data': cached,
'from_cache': True
})
data = fetch_data_with_fallback(API_ENDPOINTS['猫眼票房']['urls'])
if data:
if 'data' in data:
box_office_list = data['data']
elif isinstance(data, list):
box_office_list = data
else:
box_office_list = []
result = {
'title': '猫眼票房排行榜',
'subtitle': '实时票房数据',
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total': len(box_office_list),
'list': box_office_list[:20]
}
set_cache_data('maoyan', result)
return jsonify({
'success': True,
'data': result,
'from_cache': False
})
else:
return jsonify({
'success': False,
'message': '获取数据失败'
}), 503
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@api_60s_bp.route('/60s', methods=['GET'])
def get_60s_news():
"""获取每天60秒读懂世界"""
try:
urls = [
'https://60s-cf.viki.moe',
'https://60s.viki.moe',
'https://60s.b23.run'
]
data = fetch_data_with_fallback(urls)
if data:
return jsonify({
'success': True,
'data': {
'title': '每天60秒读懂世界',
'content': data,
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
else:
return jsonify({
'success': False,
'message': '获取数据失败'
}), 503
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@api_60s_bp.route('/bing-wallpaper', methods=['GET'])
def get_bing_wallpaper():
"""获取必应每日壁纸"""
try:
url = 'https://api.vvhan.com/api/bing'
response = requests.get(url, timeout=10)
if response.status_code == 200:
return jsonify({
'success': True,
'data': {
'title': '必应每日壁纸',
'image_url': response.url,
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
})
else:
return jsonify({
'success': False,
'message': '获取壁纸失败'
}), 503
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@api_60s_bp.route('/weather', methods=['GET'])
def get_weather():
"""获取天气信息"""
try:
city = request.args.get('city', '北京')
url = f'https://api.vvhan.com/api/weather?city={city}'
response = requests.get(url, timeout=10)
if response.status_code == 200:
data = response.json()
return jsonify({
'success': True,
'data': data
})
else:
return jsonify({
'success': False,
'message': '获取天气信息失败'
}), 503
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@api_60s_bp.route('/scan-directories', methods=['GET'])
def scan_directories():
"""扫描60sapi目录结构"""
try:
import os
# 获取项目根目录
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
api_directory = os.path.join(project_root, 'frontend', '60sapi')
if not os.path.exists(api_directory):
return jsonify({
'success': False,
'message': '60sapi目录不存在'
}), 404
categories = []
# 定义分类配置
category_config = {
'热搜榜单': {'color': '#66bb6a'},
'日更资讯': {'color': '#4caf50'},
'实用功能': {'color': '#388e3c'},
'娱乐消遣': {'color': '#66bb6a'}
}
# 颜色渐变配置
gradient_colors = [
'linear-gradient(135deg, #81c784 0%, #66bb6a 100%)',
'linear-gradient(135deg, #a5d6a7 0%, #81c784 100%)',
'linear-gradient(135deg, #c8e6c9 0%, #a5d6a7 100%)',
'linear-gradient(135deg, #66bb6a 0%, #4caf50 100%)',
'linear-gradient(135deg, #4caf50 0%, #388e3c 100%)'
]
# 扫描目录
for category_name in os.listdir(api_directory):
category_path = os.path.join(api_directory, category_name)
if os.path.isdir(category_path) and category_name in category_config:
apis = []
# 扫描分类下的模块
for i, module_name in enumerate(os.listdir(category_path)):
module_path = os.path.join(category_path, module_name)
index_path = os.path.join(module_path, 'index.html')
if os.path.isdir(module_path) and os.path.exists(index_path):
# 读取HTML文件获取标题
try:
with open(index_path, 'r', encoding='utf-8') as f:
html_content = f.read()
title_match = html_content.find('<title>')
if title_match != -1:
title_end = html_content.find('</title>', title_match)
if title_end != -1:
title = html_content[title_match + 7:title_end].strip()
else:
title = module_name
else:
title = module_name
except:
title = module_name
apis.append({
'title': title,
'description': f'{module_name}相关功能',
'link': f'/60sapi/{category_name}/{module_name}/index.html',
'status': 'active',
'color': gradient_colors[i % len(gradient_colors)]
})
if apis:
categories.append({
'title': category_name,
'color': category_config[category_name]['color'],
'apis': apis
})
return jsonify({
'success': True,
'categories': categories
})
except Exception as e:
return jsonify({
'success': False,
'message': f'扫描目录时出错: {str(e)}'
}), 500

416
backend/modules/auth.py Normal file
View File

@@ -0,0 +1,416 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户认证模块
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, session, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
import re
from datetime import datetime
from .email_service import send_verification_email, verify_code, is_qq_email, get_qq_avatar_url
auth_bp = Blueprint('auth', __name__)
def validate_qq_email(email):
"""验证QQ邮箱格式"""
return is_qq_email(email)
def validate_password(password):
"""验证密码格式6-20位"""
return 6 <= len(password) <= 20
@auth_bp.route('/send-verification', methods=['POST'])
def send_verification():
"""发送验证码邮件"""
try:
data = request.get_json()
email = data.get('email', '').strip()
verification_type = data.get('type', 'register') # register, login
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱qq.com、vip.qq.com、foxmail.com'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已注册
existing_user = users_collection.find_one({'邮箱': email})
if verification_type == 'register' and existing_user:
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
if verification_type == 'login' and not existing_user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 发送验证码
result = send_verification_email(email, verification_type)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
current_app.logger.error(f"发送验证码失败: {str(e)}")
return jsonify({
'success': False,
'message': '发送失败,请稍后重试'
}), 500
@auth_bp.route('/verify-code', methods=['POST'])
def verify_verification_code():
"""验证验证码"""
try:
data = request.get_json()
email = data.get('email', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email or not code:
return jsonify({
'success': False,
'message': '邮箱和验证码不能为空'
}), 400
# 验证码校验
result = verify_code(email, code)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"验证码校验失败: {str(e)}")
return jsonify({
'success': False,
'message': '验证失败,请稍后重试'
}), 500
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册(需要先验证邮箱)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
username = data.get('username', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not all([email, username, password, code]):
return jsonify({
'success': False,
'message': '所有字段都不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱注册'
}), 400
if not validate_password(password):
return jsonify({
'success': False,
'message': '密码长度必须在6-20位之间'
}), 400
# 验证验证码
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'register':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已被注册
if users_collection.find_one({'邮箱': email}):
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
# 检查用户名是否已被使用
if users_collection.find_one({'用户名': username}):
return jsonify({
'success': False,
'message': '该用户名已被使用'
}), 409
# 获取QQ头像
avatar_url = get_qq_avatar_url(email)
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'邮箱': email,
'用户名': username,
'密码': password_hash,
'头像': avatar_url,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active'
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!',
'user': {
'email': email,
'username': username,
'avatar': avatar_url
}
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
current_app.logger.error(f"注册失败: {str(e)}")
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
if existing_user:
return jsonify({
'success': False,
'message': '该账号已被注册'
}), 409
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'账号': account,
'密码': password_hash,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active'
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!'
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录(支持邮箱+验证码或邮箱+密码)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱登录'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 查找用户
user = users_collection.find_one({'邮箱': email})
if not user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 检查用户状态
if user.get('用户状态') != 'active':
return jsonify({
'success': False,
'message': '账号已被禁用,请联系管理员'
}), 403
# 验证方式:验证码登录或密码登录
if code:
# 验证码登录
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'login':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
elif password:
# 密码登录
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
else:
return jsonify({
'success': False,
'message': '请输入密码或验证码'
}), 400
# 登录成功,更新用户信息
users_collection.update_one(
{'邮箱': email},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
# 设置会话
session['user_id'] = str(user['_id'])
session['email'] = email
session['username'] = user.get('用户名', '')
session.permanent = True
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'id': str(user['_id']),
'email': email,
'username': user.get('用户名', ''),
'avatar': user.get('头像', ''),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
current_app.logger.error(f"登录失败: {str(e)}")
return jsonify({
'success': False,
'message': '登录失败,请稍后重试'
}), 500
# 登录成功,创建会话
session['user_id'] = str(user['_id'])
session['account'] = user['账号']
session['logged_in'] = True
# 更新登录信息
users_collection.update_one(
{'_id': user['_id']},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'account': user['账号'],
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
if 'logged_in' in session:
session.clear()
return jsonify({
'success': True,
'message': '已成功登出'
}), 200
else:
return jsonify({
'success': False,
'message': '用户未登录'
}), 401
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/check', methods=['GET'])
def check_login():
"""检查登录状态"""
try:
if session.get('logged_in'):
return jsonify({
'success': True,
'logged_in': True,
'user': {
'account': session.get('account'),
'user_id': session.get('user_id')
}
}), 200
else:
return jsonify({
'success': True,
'logged_in': False
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500

View File

@@ -0,0 +1,276 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件发送模块
负责处理用户注册、登录验证邮件
"""
import random
import string
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.header import Header
from flask import current_app
import logging
import os
# 验证码存储生产环境建议使用Redis
verification_codes = {}
def init_mail(app):
"""初始化邮件配置"""
# 使用smtplib直接发送不需要Flask-Mail
pass
def generate_verification_code(length=6):
"""生成验证码"""
return ''.join(random.choices(string.digits, k=length))
def send_verification_email(email, verification_type='register'):
"""
发送验证邮件
Args:
email: 收件人邮箱
verification_type: 验证类型 ('register', 'login', 'reset_password')
Returns:
dict: 发送结果
"""
try:
# 验证QQ邮箱格式
if not is_qq_email(email):
return {
'success': False,
'message': '仅支持QQ邮箱注册登录'
}
# 生成验证码
code = generate_verification_code()
# 存储验证码5分钟有效期
verification_codes[email] = {
'code': code,
'type': verification_type,
'expires_at': datetime.now() + timedelta(minutes=5),
'attempts': 0
}
# 获取邮件配置 - 使用与QQEmailSendAPI相同的配置
sender_email = os.environ.get('MAIL_USERNAME', '3205788256@qq.com')
sender_password = os.environ.get('MAIL_PASSWORD', 'szcaxvbftusqddhi')
# 邮件模板
if verification_type == 'register':
subject = '【InfoGenie】注册验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">InfoGenie 神奇万事通</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">欢迎注册InfoGenie</p>
</div>
<div style="background: linear-gradient(135deg, #a8e6cf 0%, #dcedc1 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: #2e7d32; margin-bottom: 20px;">验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: #4a4a4a; margin: 15px 0;">请在5分钟内输入此验证码完成注册</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果您没有申请注册,请忽略此邮件<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
else: # login
subject = '【InfoGenie】登录验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">InfoGenie 神奇万事通</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">安全登录验证</p>
</div>
<div style="background: linear-gradient(135deg, #81c784 0%, #66bb6a 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: white; margin-bottom: 20px;">登录验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: white; margin: 15px 0;">请在5分钟内输入此验证码完成登录</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果不是您本人操作,请检查账户安全<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
# 创建邮件 - 使用与QQEmailSendAPI相同的方式
message = MIMEText(html_content, 'html', 'utf-8')
message['From'] = sender_email # 直接使用邮箱地址不使用Header包装
message['To'] = email
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件 - 使用SSL端口465
try:
# 使用与QQEmailSendAPI相同的连接方式
smtp_obj = smtplib.SMTP_SSL('smtp.qq.com', 465)
smtp_obj.login(sender_email, sender_password)
smtp_obj.sendmail(sender_email, [email], message.as_string())
smtp_obj.quit()
print(f"验证码邮件发送成功: {email}")
return {
'success': True,
'message': '验证码已发送到您的邮箱',
'email': email
}
except smtplib.SMTPAuthenticationError as auth_error:
print(f"SMTP认证失败: {str(auth_error)}")
return {
'success': False,
'message': 'SMTP认证失败请检查邮箱配置'
}
except smtplib.SMTPConnectError as conn_error:
print(f"SMTP连接失败: {str(conn_error)}")
return {
'success': False,
'message': 'SMTP服务器连接失败'
}
except Exception as smtp_error:
print(f"SMTP发送失败: {str(smtp_error)}")
return {
'success': False,
'message': f'邮件发送失败: {str(smtp_error)}'
}
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return {
'success': False,
'message': '邮件发送失败,请稍后重试'
}
def verify_code(email, code):
"""
验证验证码
Args:
email: 邮箱地址
code: 验证码
Returns:
dict: 验证结果
"""
if email not in verification_codes:
return {
'success': False,
'message': '验证码不存在或已过期'
}
stored_info = verification_codes[email]
# 检查过期时间
if datetime.now() > stored_info['expires_at']:
del verification_codes[email]
return {
'success': False,
'message': '验证码已过期,请重新获取'
}
# 检查尝试次数
if stored_info['attempts'] >= 3:
del verification_codes[email]
return {
'success': False,
'message': '验证码输入错误次数过多,请重新获取'
}
# 验证码校验
if stored_info['code'] != code:
stored_info['attempts'] += 1
return {
'success': False,
'message': f'验证码错误,还可尝试{3 - stored_info["attempts"]}'
}
# 验证成功,删除验证码
verification_type = stored_info['type']
del verification_codes[email]
return {
'success': True,
'message': '验证码验证成功',
'type': verification_type
}
def is_qq_email(email):
"""
验证是否为QQ邮箱
Args:
email: 邮箱地址
Returns:
bool: 是否为QQ邮箱
"""
if not email or '@' not in email:
return False
domain = email.split('@')[1].lower()
qq_domains = ['qq.com', 'vip.qq.com', 'foxmail.com']
return domain in qq_domains
def get_qq_avatar_url(email):
"""
根据QQ邮箱获取QQ头像URL
Args:
email: QQ邮箱地址
Returns:
str: QQ头像URL
"""
if not is_qq_email(email):
return None
# 提取QQ号码
qq_number = email.split('@')[0]
# 验证是否为纯数字QQ号
if not qq_number.isdigit():
return None
# 返回QQ头像API URL
return f"http://q1.qlogo.cn/g?b=qq&nk={qq_number}&s=100"
def cleanup_expired_codes():
"""清理过期的验证码"""
current_time = datetime.now()
expired_emails = [
email for email, info in verification_codes.items()
if current_time > info['expires_at']
]
for email in expired_emails:
del verification_codes[email]
return len(expired_emails)

View File

@@ -0,0 +1,211 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户管理模块
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, session, current_app
from datetime import datetime
from bson import ObjectId
user_bp = Blueprint('user', __name__)
def login_required(f):
"""登录验证装饰器"""
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return jsonify({
'success': False,
'message': '请先登录'
}), 401
return f(*args, **kwargs)
decorated_function.__name__ = f.__name__
return decorated_function
@user_bp.route('/profile', methods=['GET'])
@login_required
def get_profile():
"""获取用户资料"""
try:
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
# 返回用户信息(不包含密码)
profile = {
'account': user['账号'],
'register_time': user.get('注册时间'),
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0),
'status': user.get('用户状态', 'active')
}
return jsonify({
'success': True,
'data': profile
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/change-password', methods=['POST'])
@login_required
def change_password():
"""修改密码"""
try:
data = request.get_json()
old_password = data.get('old_password', '').strip()
new_password = data.get('new_password', '').strip()
if not old_password or not new_password:
return jsonify({
'success': False,
'message': '旧密码和新密码不能为空'
}), 400
if len(new_password) < 6 or len(new_password) > 20:
return jsonify({
'success': False,
'message': '新密码长度必须在6-20位之间'
}), 400
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
from werkzeug.security import check_password_hash, generate_password_hash
# 验证旧密码
if not check_password_hash(user['密码'], old_password):
return jsonify({
'success': False,
'message': '原密码错误'
}), 401
# 更新密码
new_password_hash = generate_password_hash(new_password)
result = users_collection.update_one(
{'_id': ObjectId(user_id)},
{'$set': {'密码': new_password_hash}}
)
if result.modified_count > 0:
return jsonify({
'success': True,
'message': '密码修改成功'
}), 200
else:
return jsonify({
'success': False,
'message': '密码修改失败'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/stats', methods=['GET'])
@login_required
def get_user_stats():
"""获取用户统计信息"""
try:
user_id = session.get('user_id')
# 这里可以添加更多统计信息比如API调用次数等
stats = {
'login_today': 1, # 今日登录次数
'api_calls_today': 0, # 今日API调用次数
'total_api_calls': 0, # 总API调用次数
'join_days': 1, # 加入天数
'last_activity': datetime.now().isoformat()
}
return jsonify({
'success': True,
'data': stats
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@user_bp.route('/delete', methods=['POST'])
@login_required
def delete_account():
"""删除账户"""
try:
data = request.get_json()
password = data.get('password', '').strip()
if not password:
return jsonify({
'success': False,
'message': '请输入密码确认删除'
}), 400
user_id = session.get('user_id')
users_collection = current_app.mongo.db.userdata
user = users_collection.find_one({'_id': ObjectId(user_id)})
if not user:
return jsonify({
'success': False,
'message': '用户不存在'
}), 404
from werkzeug.security import check_password_hash
# 验证密码
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
# 删除用户
result = users_collection.delete_one({'_id': ObjectId(user_id)})
if result.deleted_count > 0:
# 清除会话
session.clear()
return jsonify({
'success': True,
'message': '账户已成功删除'
}), 200
else:
return jsonify({
'success': False,
'message': '删除失败'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500

26
backend/requirements.txt Normal file
View File

@@ -0,0 +1,26 @@
# InfoGenie 后端依赖包
# Web框架
Flask==2.3.3
Flask-CORS==4.0.0
# 数据库
Flask-PyMongo==2.3.0
pymongo==4.5.0
# 密码加密
Werkzeug==2.3.7
# HTTP请求
requests==2.31.0
# 邮件发送
Flask-Mail==0.9.1
# 数据处理
python-dateutil==2.8.2
# 环境变量
python-dotenv==1.0.0
# 开发工具
flask-limiter==3.5.0 # API限流

View File

@@ -0,0 +1,34 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试注册邮件发送
"""
import requests
import json
def test_send_verification_email():
"""测试发送验证码邮件"""
url = "http://localhost:5000/api/auth/send-verification"
test_data = {
"email": "3205788256@qq.com", # 使用配置的邮箱
"type": "register"
}
try:
response = requests.post(url, json=test_data)
print(f"状态码: {response.status_code}")
print(f"响应: {response.json()}")
if response.status_code == 200:
print("\n✅ 邮件发送成功!请检查邮箱")
else:
print(f"\n❌ 邮件发送失败: {response.json().get('message', '未知错误')}")
except Exception as e:
print(f"❌ 请求失败: {str(e)}")
if __name__ == "__main__":
print("📧 测试注册邮件发送...")
test_send_verification_email()

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MongoDB连接测试
"""
from pymongo import MongoClient
def test_connection():
# 测试不同的连接配置
configs = [
"mongodb://shumengya:tyh%4019900420@192.168.1.233:27017/InfoGenie",
"mongodb://shumengya:tyh%4019900420@192.168.1.233:27017/InfoGenie?authSource=admin",
"mongodb://shumengya:tyh%4019900420@192.168.1.233:27017/InfoGenie?authSource=InfoGenie",
"mongodb://shumengya:tyh%4019900420@192.168.1.233:27017/?authSource=admin",
]
for i, uri in enumerate(configs):
print(f"\n测试配置 {i+1}: {uri}")
try:
client = MongoClient(uri, serverSelectionTimeoutMS=5000)
client.admin.command('ping')
print("✅ 连接成功!")
# 测试InfoGenie数据库
db = client.InfoGenie
collections = db.list_collection_names()
print(f"数据库集合: {collections}")
# 测试userdata集合
if 'userdata' in collections:
count = db.userdata.count_documents({})
print(f"userdata集合文档数: {count}")
client.close()
return uri
except Exception as e:
print(f"❌ 连接失败: {str(e)}")
return None
if __name__ == "__main__":
print("🔧 测试MongoDB连接...")
success_uri = test_connection()
if success_uri:
print(f"\n✅ 成功的连接字符串: {success_uri}")
else:
print("\n❌ 所有连接尝试都失败了")

View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试邮件发送功能
"""
import requests
import json
def test_send_verification():
"""测试发送验证码"""
url = "http://localhost:5000/api/auth/send-verification"
# 测试数据
test_data = {
"email": "3205788256@qq.com", # 使用配置中的测试邮箱
"type": "register"
}
try:
response = requests.post(url, json=test_data)
print(f"状态码: {response.status_code}")
print(f"响应内容: {response.json()}")
if response.status_code == 200:
print("✅ 邮件发送成功!")
else:
print("❌ 邮件发送失败")
except Exception as e:
print(f"❌ 请求失败: {str(e)}")
if __name__ == "__main__":
print("📧 测试邮件发送功能...")
test_send_verification()

View File

@@ -0,0 +1,81 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试修复后的邮件发送功能
"""
import sys
import os
# 添加父目录到路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from modules.email_service import send_verification_email, verify_code
def test_email_sending():
"""
测试邮件发送功能
"""
print("=== 测试邮件发送功能 ===")
# 测试邮箱请替换为你的QQ邮箱
test_email = "3205788256@qq.com" # 替换为实际的测试邮箱
print(f"正在向 {test_email} 发送注册验证码...")
# 发送注册验证码
result = send_verification_email(test_email, 'register')
print(f"发送结果: {result}")
if result['success']:
print("✅ 邮件发送成功!")
if 'code' in result:
print(f"验证码: {result['code']}")
# 测试验证码验证
print("\n=== 测试验证码验证 ===")
verify_result = verify_code(test_email, result['code'])
print(f"验证结果: {verify_result}")
if verify_result['success']:
print("✅ 验证码验证成功!")
else:
print("❌ 验证码验证失败!")
else:
print("❌ 邮件发送失败!")
print(f"错误信息: {result['message']}")
def test_login_email():
"""
测试登录验证码邮件
"""
print("\n=== 测试登录验证码邮件 ===")
test_email = "3205788256@qq.com" # 替换为实际的测试邮箱
print(f"正在向 {test_email} 发送登录验证码...")
result = send_verification_email(test_email, 'login')
print(f"发送结果: {result}")
if result['success']:
print("✅ 登录验证码邮件发送成功!")
if 'code' in result:
print(f"验证码: {result['code']}")
else:
print("❌ 登录验证码邮件发送失败!")
print(f"错误信息: {result['message']}")
if __name__ == '__main__':
print("InfoGenie 邮件服务测试")
print("=" * 50)
# 测试注册验证码
test_email_sending()
# 测试登录验证码
test_login_email()
print("\n测试完成!")

View File

@@ -0,0 +1,70 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
测试MongoDB连接
"""
import os
from pymongo import MongoClient
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
def test_mongodb_connection():
"""测试MongoDB连接"""
try:
# 获取连接字符串
mongo_uri = os.environ.get('MONGO_URI')
print(f"连接字符串: {mongo_uri}")
# 创建连接
client = MongoClient(mongo_uri)
# 测试连接
client.admin.command('ping')
print("✅ MongoDB连接成功")
# 获取数据库
db = client.InfoGenie
print(f"数据库: {db.name}")
# 测试集合访问
userdata_collection = db.userdata
print(f"用户集合: {userdata_collection.name}")
# 测试查询(计算文档数量)
count = userdata_collection.count_documents({})
print(f"用户数据集合中有 {count} 个文档")
# 关闭连接
client.close()
except Exception as e:
print(f"❌ MongoDB连接失败: {str(e)}")
# 尝试其他认证数据库
print("\n尝试使用不同的认证配置...")
try:
# 尝试不指定认证数据库
uri_without_auth = "mongodb://shumengya:tyh%4019900420@192.168.1.233:27017/InfoGenie"
client2 = MongoClient(uri_without_auth)
client2.admin.command('ping')
print("✅ 不使用authSource连接成功")
client2.close()
except Exception as e2:
print(f"❌ 无authSource也失败: {str(e2)}")
# 尝试使用InfoGenie作为认证数据库
try:
uri_with_infogenie_auth = "mongodb://shumengya:tyh%4019900420@192.168.1.233:27017/InfoGenie?authSource=InfoGenie"
client3 = MongoClient(uri_with_infogenie_auth)
client3.admin.command('ping')
print("✅ 使用InfoGenie作为authSource连接成功")
client3.close()
except Exception as e3:
print(f"❌ InfoGenie authSource也失败: {str(e3)}")
if __name__ == "__main__":
print("🔧 测试MongoDB连接...")
test_mongodb_connection()