Files
2025-12-13 20:53:50 +08:00

455 lines
14 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户认证模块
Created by: 万象口袋
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
import re
import jwt
from datetime import datetime, timedelta
from functools import wraps
from .email_service import send_verification_email, verify_code, is_qq_email, get_qq_avatar_url
auth_bp = Blueprint('auth', __name__)
#生成JWT token
def generate_token(user_data):
"""生成JWT token"""
payload = {
'user_id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username'],
'exp': datetime.utcnow() + timedelta(days=7), # 7天过期
'iat': datetime.utcnow()
}
return jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256')
#验证JWT token
def verify_token(token):
"""验证JWT token"""
try:
payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256'])
return {'success': True, 'data': payload}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Token已过期'}
except jwt.InvalidTokenError:
return {'success': False, 'message': 'Token无效'}
#JWT token验证装饰器
def token_required(f):
"""JWT token验证装饰器"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'success': False, 'message': '缺少认证token'}), 401
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if not result['success']:
return jsonify({'success': False, 'message': result['message']}), 401
request.current_user = result['data']
return f(*args, **kwargs)
return decorated
#验证QQ邮箱格式
def validate_qq_email(email):
"""验证QQ邮箱格式"""
return is_qq_email(email)
#验证密码格式
def validate_password(password):
"""验证密码格式6-20位"""
return 6 <= len(password) <= 20
#==========================对外暴露的HTTP接口==========================
#发送验证码邮件
@auth_bp.route('/send-verification', methods=['POST'])
def send_verification():
"""发送验证码邮件"""
try:
data = request.get_json()
email = data.get('email', '').strip()
verification_type = data.get('type', 'register') # register, login
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱qq.com、vip.qq.com、foxmail.com'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已注册
existing_user = users_collection.find_one({'邮箱': email})
if verification_type == 'register' and existing_user:
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
if verification_type == 'login' and not existing_user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 发送验证码
result = send_verification_email(email, verification_type)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
current_app.logger.error(f"发送验证码失败: {str(e)}")
return jsonify({
'success': False,
'message': '发送失败,请稍后重试'
}), 500
#验证验证码
@auth_bp.route('/verify-code', methods=['POST'])
def verify_verification_code():
"""验证验证码"""
try:
data = request.get_json()
email = data.get('email', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email or not code:
return jsonify({
'success': False,
'message': '邮箱和验证码不能为空'
}), 400
# 验证码校验
result = verify_code(email, code)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"验证码校验失败: {str(e)}")
return jsonify({
'success': False,
'message': '验证失败,请稍后重试'
}), 500
#用户注册
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册(需要先验证邮箱)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
username = data.get('username', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not all([email, username, password, code]):
return jsonify({
'success': False,
'message': '所有字段都不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱注册'
}), 400
if not validate_password(password):
return jsonify({
'success': False,
'message': '密码长度必须在6-20位之间'
}), 400
# 验证验证码
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'register':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已被注册
if users_collection.find_one({'邮箱': email}):
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
# 检查用户名是否已被使用
if users_collection.find_one({'用户名': username}):
return jsonify({
'success': False,
'message': '该用户名已被使用'
}), 409
# 获取QQ头像
avatar_url = get_qq_avatar_url(email)
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'邮箱': email,
'用户名': username,
'密码': password_hash,
'头像': avatar_url,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active',
'等级': 0,
'经验': 0,
'萌芽币': 0,
'签到系统': {
'连续签到天数': 0,
'今日是否已签到': False,
'签到时间': '2025-01-01'
}
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!',
'user': {
'email': email,
'username': username,
'avatar': avatar_url
}
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
current_app.logger.error(f"注册失败: {str(e)}")
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
#用户登录
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录(支持邮箱+验证码或邮箱+密码)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱登录'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 查找用户
user = users_collection.find_one({'邮箱': email})
if not user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 检查用户状态
if user.get('用户状态') != 'active':
return jsonify({
'success': False,
'message': '账号已被禁用,请联系管理员'
}), 403
# 验证方式:验证码登录或密码登录
if code:
# 验证码登录
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'login':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
elif password:
# 密码登录
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
else:
return jsonify({
'success': False,
'message': '请输入密码或验证码'
}), 400
# 登录成功,更新用户信息
users_collection.update_one(
{'邮箱': email},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
# 生成JWT token
user_data = {
'user_id': str(user['_id']),
'email': email,
'username': user.get('用户名', '')
}
token = generate_token(user_data)
return jsonify({
'success': True,
'message': '登录成功!',
'token': token,
'user': {
'id': str(user['_id']),
'email': email,
'username': user.get('用户名', ''),
'avatar': user.get('头像', ''),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
current_app.logger.error(f"登录失败: {str(e)}")
return jsonify({
'success': False,
'message': '登录失败,请稍后重试'
}), 500
# 登录成功,创建会话
hwt = getattr(request, 'hwt', {})
hwt['user_id'] = str(user['_id'])
hwt['account'] = user['账号']
hwt['logged_in'] = True
# 更新登录信息
users_collection.update_one(
{'_id': user['_id']},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'account': user['账号'],
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#用户登出
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
# JWT是无状态的客户端删除token即可
return jsonify({
'success': True,
'message': '已成功登出'
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#检查登录状态
@auth_bp.route('/check', methods=['GET'])
def check_login():
"""检查登录状态"""
try:
token = request.headers.get('Authorization')
if not token:
return jsonify({
'success': True,
'logged_in': False
}), 200
if token.startswith('Bearer '):
token = token[7:]
result = verify_token(token)
if result['success']:
user_data = result['data']
return jsonify({
'success': True,
'logged_in': True,
'user': {
'id': user_data['user_id'],
'email': user_data['email'],
'username': user_data['username']
}
}), 200
else:
return jsonify({
'success': True,
'logged_in': False
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
#==========================对外暴露的HTTP接口==========================