chore: sync local changes (2026-03-12)

This commit is contained in:
2026-03-12 18:58:26 +08:00
parent 04a4cb962a
commit 939442e061
348 changed files with 91638 additions and 92091 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,455 +1,455 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户认证模块
Created by: 万象口袋
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
import re
import jwt
from datetime import datetime, timedelta
from functools import wraps
from .email_service import send_verification_email, verify_code, is_qq_email, get_qq_avatar_url
auth_bp = Blueprint('auth', __name__)
#生成JWT token
def generate_token(user_data):
"""生成JWT token"""
payload = {
'user_id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username'],
'exp': datetime.utcnow() + timedelta(days=7), # 7天过期
'iat': datetime.utcnow()
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
#验证JWT token
def verify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
return {'success': True, 'data': payload}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Token已过期'}
except jwt.InvalidTokenError:
return {'success': False, 'message': 'Token无效'}
#JWT token验证装饰器
def token_required(f):
"""JWT token验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'success': False, 'message': '缺少认证token'}), 401
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if not result['success']:
return jsonify({'success': False, 'message': result['message']}), 401
request.current_user = result['data']
return f(*args, **kwargs)
return decorated
#验证QQ邮箱格式
def validate_qq_email(email):
"""验证QQ邮箱格式"""
return is_qq_email(email)
#验证密码格式
def validate_password(password):
"""验证密码格式6-20位"""
return 6 <= len(password) <= 20
#==========================对外暴露的HTTP接口==========================
#发送验证码邮件
@auth_bp.route('/send-verification', methods=['POST'])
def send_verification():
"""发送验证码邮件"""
try:
data = request.get_json()
email = data.get('email', '').strip()
verification_type = data.get('type', 'register') # register, login
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱qq.com、vip.qq.com、foxmail.com'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已注册
existing_user = users_collection.find_one({'邮箱': email})
if verification_type == 'register' and existing_user:
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
if verification_type == 'login' and not existing_user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 发送验证码
result = send_verification_email(email, verification_type)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
current_app.logger.error(f"发送验证码失败: {str(e)}")
return jsonify({
'success': False,
'message': '发送失败,请稍后重试'
}), 500
#验证验证码
@auth_bp.route('/verify-code', methods=['POST'])
def verify_verification_code():
"""验证验证码"""
try:
data = request.get_json()
email = data.get('email', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email or not code:
return jsonify({
'success': False,
'message': '邮箱和验证码不能为空'
}), 400
# 验证码校验
result = verify_code(email, code)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"验证码校验失败: {str(e)}")
return jsonify({
'success': False,
'message': '验证失败,请稍后重试'
}), 500
#用户注册
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册(需要先验证邮箱)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
username = data.get('username', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not all([email, username, password, code]):
return jsonify({
'success': False,
'message': '所有字段都不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱注册'
}), 400
if not validate_password(password):
return jsonify({
'success': False,
'message': '密码长度必须在6-20位之间'
}), 400
# 验证验证码
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'register':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已被注册
if users_collection.find_one({'邮箱': email}):
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
# 检查用户名是否已被使用
if users_collection.find_one({'用户名': username}):
return jsonify({
'success': False,
'message': '该用户名已被使用'
}), 409
# 获取QQ头像
avatar_url = get_qq_avatar_url(email)
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'邮箱': email,
'用户名': username,
'密码': password_hash,
'头像': avatar_url,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active',
'等级': 0,
'经验': 0,
'萌芽币': 0,
'签到系统': {
'连续签到天数': 0,
'今日是否已签到': False,
'签到时间': '2025-01-01'
}
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!',
'user': {
'email': email,
'username': username,
'avatar': avatar_url
}
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
current_app.logger.error(f"注册失败: {str(e)}")
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
#用户登录
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录(支持邮箱+验证码或邮箱+密码)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱登录'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 查找用户
user = users_collection.find_one({'邮箱': email})
if not user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 检查用户状态
if user.get('用户状态') != 'active':
return jsonify({
'success': False,
'message': '账号已被禁用,请联系管理员'
}), 403
# 验证方式:验证码登录或密码登录
if code:
# 验证码登录
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'login':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
elif password:
# 密码登录
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
else:
return jsonify({
'success': False,
'message': '请输入密码或验证码'
}), 400
# 登录成功,更新用户信息
users_collection.update_one(
{'邮箱': email},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
# 生成JWT token
user_data = {
'user_id': str(user['_id']),
'email': email,
'username': user.get('用户名', '')
}
token = generate_token(user_data)
return jsonify({
'success': True,
'message': '登录成功!',
'token': token,
'user': {
'id': str(user['_id']),
'email': email,
'username': user.get('用户名', ''),
'avatar': user.get('头像', ''),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
current_app.logger.error(f"登录失败: {str(e)}")
return jsonify({
'success': False,
'message': '登录失败,请稍后重试'
}), 500
# 登录成功,创建会话
hwt = getattr(request, 'hwt', {})
hwt['user_id'] = str(user['_id'])
hwt['account'] = user['账号']
hwt['logged_in'] = True
# 更新登录信息
users_collection.update_one(
{'_id': user['_id']},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'account': user['账号'],
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#用户登出
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
# JWT是无状态的客户端删除token即可
return jsonify({
'success': True,
'message': '已成功登出'
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#检查登录状态
@auth_bp.route('/check', methods=['GET'])
def check_login():
"""检查登录状态"""
try:
token = request.headers.get('Authorization')
if not token:
return jsonify({
'success': True,
'logged_in': False
}), 200
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if result['success']:
user_data = result['data']
return jsonify({
'success': True,
'logged_in': True,
'user': {
'id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username']
}
}), 200
else:
return jsonify({
'success': True,
'logged_in': False
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户认证模块
Created by: 万象口袋
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
import re
import jwt
from datetime import datetime, timedelta
from functools import wraps
from .email_service import send_verification_email, verify_code, is_qq_email, get_qq_avatar_url
auth_bp = Blueprint('auth', __name__)
#生成JWT token
def generate_token(user_data):
"""生成JWT token"""
payload = {
'user_id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username'],
'exp': datetime.utcnow() + timedelta(days=7), # 7天过期
'iat': datetime.utcnow()
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
#验证JWT token
def verify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
return {'success': True, 'data': payload}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Token已过期'}
except jwt.InvalidTokenError:
return {'success': False, 'message': 'Token无效'}
#JWT token验证装饰器
def token_required(f):
"""JWT token验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'success': False, 'message': '缺少认证token'}), 401
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if not result['success']:
return jsonify({'success': False, 'message': result['message']}), 401
request.current_user = result['data']
return f(*args, **kwargs)
return decorated
#验证QQ邮箱格式
def validate_qq_email(email):
"""验证QQ邮箱格式"""
return is_qq_email(email)
#验证密码格式
def validate_password(password):
"""验证密码格式6-20位"""
return 6 <= len(password) <= 20
#==========================对外暴露的HTTP接口==========================
#发送验证码邮件
@auth_bp.route('/send-verification', methods=['POST'])
def send_verification():
"""发送验证码邮件"""
try:
data = request.get_json()
email = data.get('email', '').strip()
verification_type = data.get('type', 'register') # register, login
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱qq.com、vip.qq.com、foxmail.com'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已注册
existing_user = users_collection.find_one({'邮箱': email})
if verification_type == 'register' and existing_user:
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
if verification_type == 'login' and not existing_user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 发送验证码
result = send_verification_email(email, verification_type)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
current_app.logger.error(f"发送验证码失败: {str(e)}")
return jsonify({
'success': False,
'message': '发送失败,请稍后重试'
}), 500
#验证验证码
@auth_bp.route('/verify-code', methods=['POST'])
def verify_verification_code():
"""验证验证码"""
try:
data = request.get_json()
email = data.get('email', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email or not code:
return jsonify({
'success': False,
'message': '邮箱和验证码不能为空'
}), 400
# 验证码校验
result = verify_code(email, code)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"验证码校验失败: {str(e)}")
return jsonify({
'success': False,
'message': '验证失败,请稍后重试'
}), 500
#用户注册
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册(需要先验证邮箱)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
username = data.get('username', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not all([email, username, password, code]):
return jsonify({
'success': False,
'message': '所有字段都不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱注册'
}), 400
if not validate_password(password):
return jsonify({
'success': False,
'message': '密码长度必须在6-20位之间'
}), 400
# 验证验证码
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'register':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已被注册
if users_collection.find_one({'邮箱': email}):
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
# 检查用户名是否已被使用
if users_collection.find_one({'用户名': username}):
return jsonify({
'success': False,
'message': '该用户名已被使用'
}), 409
# 获取QQ头像
avatar_url = get_qq_avatar_url(email)
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'邮箱': email,
'用户名': username,
'密码': password_hash,
'头像': avatar_url,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active',
'等级': 0,
'经验': 0,
'萌芽币': 0,
'签到系统': {
'连续签到天数': 0,
'今日是否已签到': False,
'签到时间': '2025-01-01'
}
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!',
'user': {
'email': email,
'username': username,
'avatar': avatar_url
}
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
current_app.logger.error(f"注册失败: {str(e)}")
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
#用户登录
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录(支持邮箱+验证码或邮箱+密码)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱登录'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 查找用户
user = users_collection.find_one({'邮箱': email})
if not user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 检查用户状态
if user.get('用户状态') != 'active':
return jsonify({
'success': False,
'message': '账号已被禁用,请联系管理员'
}), 403
# 验证方式:验证码登录或密码登录
if code:
# 验证码登录
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'login':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
elif password:
# 密码登录
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
else:
return jsonify({
'success': False,
'message': '请输入密码或验证码'
}), 400
# 登录成功,更新用户信息
users_collection.update_one(
{'邮箱': email},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
# 生成JWT token
user_data = {
'user_id': str(user['_id']),
'email': email,
'username': user.get('用户名', '')
}
token = generate_token(user_data)
return jsonify({
'success': True,
'message': '登录成功!',
'token': token,
'user': {
'id': str(user['_id']),
'email': email,
'username': user.get('用户名', ''),
'avatar': user.get('头像', ''),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
current_app.logger.error(f"登录失败: {str(e)}")
return jsonify({
'success': False,
'message': '登录失败,请稍后重试'
}), 500
# 登录成功,创建会话
hwt = getattr(request, 'hwt', {})
hwt['user_id'] = str(user['_id'])
hwt['account'] = user['账号']
hwt['logged_in'] = True
# 更新登录信息
users_collection.update_one(
{'_id': user['_id']},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'account': user['账号'],
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#用户登出
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
# JWT是无状态的客户端删除token即可
return jsonify({
'success': True,
'message': '已成功登出'
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#检查登录状态
@auth_bp.route('/check', methods=['GET'])
def check_login():
"""检查登录状态"""
try:
token = request.headers.get('Authorization')
if not token:
return jsonify({
'success': True,
'logged_in': False
}), 200
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if result['success']:
user_data = result['data']
return jsonify({
'success': True,
'logged_in': True,
'user': {
'id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username']
}
}), 200
else:
return jsonify({
'success': True,
'logged_in': False
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#==========================对外暴露的HTTP接口==========================

View File

@@ -1,283 +1,283 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件发送模块
负责处理用户注册、登录验证邮件
"""
import random
import string
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.header import Header
from flask import current_app
import logging
import os
# 验证码存储生产环境建议使用Redis
verification_codes = {}
# 初始化日志
def init_mail(app):
"""初始化邮件配置"""
# 使用smtplib直接发送不需要Flask-Mail
pass
# 生成验证码
def generate_verification_code(length=6):
"""生成验证码"""
return ''.join(random.choices(string.digits, k=length))
# 发送验证邮件
def send_verification_email(email, verification_type='register'):
"""
发送验证邮件
Args:
email: 收件人邮箱
verification_type: 验证类型 ('register', 'login', 'reset_password')
Returns:
dict: 发送结果
"""
try:
# 验证QQ邮箱格式
if not is_qq_email(email):
return {
'success': False,
'message': '仅支持QQ邮箱注册登录'
}
# 生成验证码
code = generate_verification_code()
# 存储验证码5分钟有效期
verification_codes[email] = {
'code': code,
'type': verification_type,
'expires_at': datetime.now() + timedelta(minutes=5),
'attempts': 0
}
# 获取邮件配置 - 使用与QQEmailSendAPI相同的配置
sender_email = os.environ.get('MAIL_USERNAME', '3205788256@qq.com')
sender_password = os.environ.get('MAIL_PASSWORD', 'szcaxvbftusqddhi')
# 邮件模板
if verification_type == 'register':
subject = '【万象口袋】注册验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">万象口袋</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">欢迎注册万象口袋</p>
</div>
<div style="background: linear-gradient(135deg, #a8e6cf 0%, #dcedc1 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: #2e7d32; margin-bottom: 20px;">验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: #4a4a4a; margin: 15px 0;">请在5分钟内输入此验证码完成注册</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果您没有申请注册,请忽略此邮件<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
else: # login
subject = '【InfoGenie】登录验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">InfoGenie 万象口袋</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">安全登录验证</p>
</div>
<div style="background: linear-gradient(135deg, #81c784 0%, #66bb6a 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: white; margin-bottom: 20px;">登录验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: white; margin: 15px 0;">请在5分钟内输入此验证码完成登录</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果不是您本人操作,请检查账户安全<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
# 创建邮件 - 使用与QQEmailSendAPI相同的方式
message = MIMEText(html_content, 'html', 'utf-8')
message['From'] = sender_email # 直接使用邮箱地址不使用Header包装
message['To'] = email
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件 - 使用SSL端口465
try:
# 使用与QQEmailSendAPI相同的连接方式
smtp_obj = smtplib.SMTP_SSL('smtp.qq.com', 465)
smtp_obj.login(sender_email, sender_password)
smtp_obj.sendmail(sender_email, [email], message.as_string())
smtp_obj.quit()
print(f"验证码邮件发送成功: {email}")
return {
'success': True,
'message': '验证码已发送到您的邮箱',
'email': email
}
except smtplib.SMTPAuthenticationError as auth_error:
print(f"SMTP认证失败: {str(auth_error)}")
return {
'success': False,
'message': 'SMTP认证失败请检查邮箱配置'
}
except smtplib.SMTPConnectError as conn_error:
print(f"SMTP连接失败: {str(conn_error)}")
return {
'success': False,
'message': 'SMTP服务器连接失败'
}
except Exception as smtp_error:
print(f"SMTP发送失败: {str(smtp_error)}")
return {
'success': False,
'message': f'邮件发送失败: {str(smtp_error)}'
}
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return {
'success': False,
'message': '邮件发送失败,请稍后重试'
}
# 验证验证码
def verify_code(email, code):
"""
验证验证码
Args:
email: 邮箱地址
code: 验证码
Returns:
dict: 验证结果
"""
if email not in verification_codes:
return {
'success': False,
'message': '验证码不存在或已过期'
}
stored_info = verification_codes[email]
# 检查过期时间
if datetime.now() > stored_info['expires_at']:
del verification_codes[email]
return {
'success': False,
'message': '验证码已过期,请重新获取'
}
# 检查尝试次数
if stored_info['attempts'] >= 3:
del verification_codes[email]
return {
'success': False,
'message': '验证码输入错误次数过多,请重新获取'
}
# 验证码校验
if stored_info['code'] != code:
stored_info['attempts'] += 1
return {
'success': False,
'message': f'验证码错误,还可尝试{3 - stored_info["attempts"]}'
}
# 验证成功,删除验证码
verification_type = stored_info['type']
del verification_codes[email]
return {
'success': True,
'message': '验证码验证成功',
'type': verification_type
}
# 验证QQ邮箱格式
def is_qq_email(email):
"""
验证是否为QQ邮箱
Args:
email: 邮箱地址
Returns:
bool: 是否为QQ邮箱
"""
if not email or '@' not in email:
return False
domain = email.split('@')[1].lower()
qq_domains = ['qq.com', 'vip.qq.com', 'foxmail.com']
return domain in qq_domains
# 获取QQ头像URL
def get_qq_avatar_url(email):
"""
根据QQ邮箱获取QQ头像URL
Args:
email: QQ邮箱地址
Returns:
str: QQ头像URL
"""
if not is_qq_email(email):
return None
# 提取QQ号码
qq_number = email.split('@')[0]
# 验证是否为纯数字QQ号
if not qq_number.isdigit():
return None
# 返回QQ头像API URL
return f"http://q1.qlogo.cn/g?b=qq&nk={qq_number}&s=100"
# 清理过期验证码
def cleanup_expired_codes():
"""清理过期的验证码"""
current_time = datetime.now()
expired_emails = [
email for email, info in verification_codes.items()
if current_time > info['expires_at']
]
for email in expired_emails:
del verification_codes[email]
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件发送模块
负责处理用户注册、登录验证邮件
"""
import random
import string
import smtplib
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.header import Header
from flask import current_app
import logging
import os
# 验证码存储生产环境建议使用Redis
verification_codes = {}
# 初始化日志
def init_mail(app):
"""初始化邮件配置"""
# 使用smtplib直接发送不需要Flask-Mail
pass
# 生成验证码
def generate_verification_code(length=6):
"""生成验证码"""
return ''.join(random.choices(string.digits, k=length))
# 发送验证邮件
def send_verification_email(email, verification_type='register'):
"""
发送验证邮件
Args:
email: 收件人邮箱
verification_type: 验证类型 ('register', 'login', 'reset_password')
Returns:
dict: 发送结果
"""
try:
# 验证QQ邮箱格式
if not is_qq_email(email):
return {
'success': False,
'message': '仅支持QQ邮箱注册登录'
}
# 生成验证码
code = generate_verification_code()
# 存储验证码5分钟有效期
verification_codes[email] = {
'code': code,
'type': verification_type,
'expires_at': datetime.now() + timedelta(minutes=5),
'attempts': 0
}
# 获取邮件配置 - 使用与QQEmailSendAPI相同的配置
sender_email = os.environ.get('MAIL_USERNAME', '3205788256@qq.com')
sender_password = os.environ.get('MAIL_PASSWORD', 'szcaxvbftusqddhi')
# 邮件模板
if verification_type == 'register':
subject = '【万象口袋】注册验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">万象口袋</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">欢迎注册万象口袋</p>
</div>
<div style="background: linear-gradient(135deg, #a8e6cf 0%, #dcedc1 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: #2e7d32; margin-bottom: 20px;">验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: #4a4a4a; margin: 15px 0;">请在5分钟内输入此验证码完成注册</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果您没有申请注册,请忽略此邮件<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
else: # login
subject = '【InfoGenie】登录验证码'
html_content = f'''
<html>
<body>
<div style="font-family: Arial, sans-serif; max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="text-align: center; margin-bottom: 30px;">
<h1 style="color: #66bb6a; margin: 0;">InfoGenie 万象口袋</h1>
<p style="color: #666; font-size: 14px; margin: 5px 0;">安全登录验证</p>
</div>
<div style="background: linear-gradient(135deg, #81c784 0%, #66bb6a 100%); padding: 30px; border-radius: 15px; text-align: center;">
<h2 style="color: white; margin-bottom: 20px;">登录验证码</h2>
<div style="background: white; padding: 20px; border-radius: 10px; margin: 20px 0;">
<span style="font-size: 32px; font-weight: bold; color: #66bb6a; letter-spacing: 5px;">{code}</span>
</div>
<p style="color: white; margin: 15px 0;">请在5分钟内输入此验证码完成登录</p>
</div>
<div style="margin-top: 30px; padding: 20px; background: #f8f9fa; border-radius: 10px;">
<p style="color: #666; font-size: 12px; margin: 0; text-align: center;">
如果不是您本人操作,请检查账户安全<br>
此验证码5分钟内有效请勿泄露给他人
</p>
</div>
</div>
</body>
</html>
'''
# 创建邮件 - 使用与QQEmailSendAPI相同的方式
message = MIMEText(html_content, 'html', 'utf-8')
message['From'] = sender_email # 直接使用邮箱地址不使用Header包装
message['To'] = email
message['Subject'] = Header(subject, 'utf-8')
# 发送邮件 - 使用SSL端口465
try:
# 使用与QQEmailSendAPI相同的连接方式
smtp_obj = smtplib.SMTP_SSL('smtp.qq.com', 465)
smtp_obj.login(sender_email, sender_password)
smtp_obj.sendmail(sender_email, [email], message.as_string())
smtp_obj.quit()
print(f"验证码邮件发送成功: {email}")
return {
'success': True,
'message': '验证码已发送到您的邮箱',
'email': email
}
except smtplib.SMTPAuthenticationError as auth_error:
print(f"SMTP认证失败: {str(auth_error)}")
return {
'success': False,
'message': 'SMTP认证失败请检查邮箱配置'
}
except smtplib.SMTPConnectError as conn_error:
print(f"SMTP连接失败: {str(conn_error)}")
return {
'success': False,
'message': 'SMTP服务器连接失败'
}
except Exception as smtp_error:
print(f"SMTP发送失败: {str(smtp_error)}")
return {
'success': False,
'message': f'邮件发送失败: {str(smtp_error)}'
}
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return {
'success': False,
'message': '邮件发送失败,请稍后重试'
}
# 验证验证码
def verify_code(email, code):
"""
验证验证码
Args:
email: 邮箱地址
code: 验证码
Returns:
dict: 验证结果
"""
if email not in verification_codes:
return {
'success': False,
'message': '验证码不存在或已过期'
}
stored_info = verification_codes[email]
# 检查过期时间
if datetime.now() > stored_info['expires_at']:
del verification_codes[email]
return {
'success': False,
'message': '验证码已过期,请重新获取'
}
# 检查尝试次数
if stored_info['attempts'] >= 3:
del verification_codes[email]
return {
'success': False,
'message': '验证码输入错误次数过多,请重新获取'
}
# 验证码校验
if stored_info['code'] != code:
stored_info['attempts'] += 1
return {
'success': False,
'message': f'验证码错误,还可尝试{3 - stored_info["attempts"]}'
}
# 验证成功,删除验证码
verification_type = stored_info['type']
del verification_codes[email]
return {
'success': True,
'message': '验证码验证成功',
'type': verification_type
}
# 验证QQ邮箱格式
def is_qq_email(email):
"""
验证是否为QQ邮箱
Args:
email: 邮箱地址
Returns:
bool: 是否为QQ邮箱
"""
if not email or '@' not in email:
return False
domain = email.split('@')[1].lower()
qq_domains = ['qq.com', 'vip.qq.com', 'foxmail.com']
return domain in qq_domains
# 获取QQ头像URL
def get_qq_avatar_url(email):
"""
根据QQ邮箱获取QQ头像URL
Args:
email: QQ邮箱地址
Returns:
str: QQ头像URL
"""
if not is_qq_email(email):
return None
# 提取QQ号码
qq_number = email.split('@')[0]
# 验证是否为纯数字QQ号
if not qq_number.isdigit():
return None
# 返回QQ头像API URL
return f"https://q1.qlogo.cn/g?b=qq&nk={qq_number}&s=100"
# 清理过期验证码
def cleanup_expired_codes():
"""清理过期的验证码"""
current_time = datetime.now()
expired_emails = [
email for email, info in verification_codes.items()
if current_time > info['expires_at']
]
for email in expired_emails:
del verification_codes[email]
return len(expired_emails)

File diff suppressed because it is too large Load Diff