Files
InfoGenie/backend/modules/auth.py

417 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
用户认证模块
Created by: 神奇万事通
Date: 2025-09-02
"""
from flask import Blueprint, request, jsonify, session, current_app
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
import re
from datetime import datetime
from .email_service import send_verification_email, verify_code, is_qq_email, get_qq_avatar_url
auth_bp = Blueprint('auth', __name__)
def validate_qq_email(email):
"""验证QQ邮箱格式"""
return is_qq_email(email)
def validate_password(password):
"""验证密码格式6-20位"""
return 6 <= len(password) <= 20
@auth_bp.route('/send-verification', methods=['POST'])
def send_verification():
"""发送验证码邮件"""
try:
data = request.get_json()
email = data.get('email', '').strip()
verification_type = data.get('type', 'register') # register, login
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱qq.com、vip.qq.com、foxmail.com'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已注册
existing_user = users_collection.find_one({'邮箱': email})
if verification_type == 'register' and existing_user:
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
if verification_type == 'login' and not existing_user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 发送验证码
result = send_verification_email(email, verification_type)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 500
except Exception as e:
current_app.logger.error(f"发送验证码失败: {str(e)}")
return jsonify({
'success': False,
'message': '发送失败,请稍后重试'
}), 500
@auth_bp.route('/verify-code', methods=['POST'])
def verify_verification_code():
"""验证验证码"""
try:
data = request.get_json()
email = data.get('email', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email or not code:
return jsonify({
'success': False,
'message': '邮箱和验证码不能为空'
}), 400
# 验证码校验
result = verify_code(email, code)
if result['success']:
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
current_app.logger.error(f"验证码校验失败: {str(e)}")
return jsonify({
'success': False,
'message': '验证失败,请稍后重试'
}), 500
@auth_bp.route('/register', methods=['POST'])
def register():
"""用户注册(需要先验证邮箱)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
username = data.get('username', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not all([email, username, password, code]):
return jsonify({
'success': False,
'message': '所有字段都不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱注册'
}), 400
if not validate_password(password):
return jsonify({
'success': False,
'message': '密码长度必须在6-20位之间'
}), 400
# 验证验证码
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'register':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 检查邮箱是否已被注册
if users_collection.find_one({'邮箱': email}):
return jsonify({
'success': False,
'message': '该邮箱已被注册'
}), 409
# 检查用户名是否已被使用
if users_collection.find_one({'用户名': username}):
return jsonify({
'success': False,
'message': '该用户名已被使用'
}), 409
# 获取QQ头像
avatar_url = get_qq_avatar_url(email)
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'邮箱': email,
'用户名': username,
'密码': password_hash,
'头像': avatar_url,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active'
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!',
'user': {
'email': email,
'username': username,
'avatar': avatar_url
}
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
current_app.logger.error(f"注册失败: {str(e)}")
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
if existing_user:
return jsonify({
'success': False,
'message': '该账号已被注册'
}), 409
# 创建新用户
password_hash = generate_password_hash(password)
user_data = {
'账号': account,
'密码': password_hash,
'注册时间': datetime.now().isoformat(),
'最后登录': None,
'登录次数': 0,
'用户状态': 'active'
}
result = users_collection.insert_one(user_data)
if result.inserted_id:
return jsonify({
'success': True,
'message': '注册成功!'
}), 201
else:
return jsonify({
'success': False,
'message': '注册失败,请稍后重试'
}), 500
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/login', methods=['POST'])
def login():
"""用户登录(支持邮箱+验证码或邮箱+密码)"""
try:
data = request.get_json()
email = data.get('email', '').strip()
password = data.get('password', '').strip()
code = data.get('code', '').strip()
# 参数验证
if not email:
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
if not validate_qq_email(email):
return jsonify({
'success': False,
'message': '仅支持QQ邮箱登录'
}), 400
# 获取数据库集合
db = current_app.mongo.db
users_collection = db.userdata
# 查找用户
user = users_collection.find_one({'邮箱': email})
if not user:
return jsonify({
'success': False,
'message': '该邮箱尚未注册'
}), 404
# 检查用户状态
if user.get('用户状态') != 'active':
return jsonify({
'success': False,
'message': '账号已被禁用,请联系管理员'
}), 403
# 验证方式:验证码登录或密码登录
if code:
# 验证码登录
verify_result = verify_code(email, code)
if not verify_result['success'] or verify_result.get('type') != 'login':
return jsonify({
'success': False,
'message': '验证码无效或已过期'
}), 400
elif password:
# 密码登录
if not check_password_hash(user['密码'], password):
return jsonify({
'success': False,
'message': '密码错误'
}), 401
else:
return jsonify({
'success': False,
'message': '请输入密码或验证码'
}), 400
# 登录成功,更新用户信息
users_collection.update_one(
{'邮箱': email},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
# 设置会话
session['user_id'] = str(user['_id'])
session['email'] = email
session['username'] = user.get('用户名', '')
session.permanent = True
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'id': str(user['_id']),
'email': email,
'username': user.get('用户名', ''),
'avatar': user.get('头像', ''),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
current_app.logger.error(f"登录失败: {str(e)}")
return jsonify({
'success': False,
'message': '登录失败,请稍后重试'
}), 500
# 登录成功,创建会话
session['user_id'] = str(user['_id'])
session['account'] = user['账号']
session['logged_in'] = True
# 更新登录信息
users_collection.update_one(
{'_id': user['_id']},
{
'$set': {'最后登录': datetime.now().isoformat()},
'$inc': {'登录次数': 1}
}
)
return jsonify({
'success': True,
'message': '登录成功!',
'user': {
'account': user['账号'],
'last_login': user.get('最后登录'),
'login_count': user.get('登录次数', 0) + 1
}
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
if 'logged_in' in session:
session.clear()
return jsonify({
'success': True,
'message': '已成功登出'
}), 200
else:
return jsonify({
'success': False,
'message': '用户未登录'
}), 401
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500
@auth_bp.route('/check', methods=['GET'])
def check_login():
"""检查登录状态"""
try:
if session.get('logged_in'):
return jsonify({
'success': True,
'logged_in': True,
'user': {
'account': session.get('account'),
'user_id': session.get('user_id')
}
}), 200
else:
return jsonify({
'success': True,
'logged_in': False
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': f'服务器错误: {str(e)}'
}), 500