Files
NBATransfer/NBATransfer-backend/services/auth_service.py
2025-12-14 15:40:49 +08:00

291 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import current_app, jsonify
from flask_mail import Message
from flask_jwt_extended import create_access_token, create_refresh_token
from models import db, User, VerificationCode
from email_validator import validate_email, EmailNotValidError
from datetime import datetime, timedelta
import re
class AuthService:
@staticmethod
def send_verification_email(email, code, purpose='register'):
"""发送验证码邮件"""
try:
subject = '验证码' if purpose == 'register' else '密码重置验证码'
body = f"""
您好!
感谢您使用 Nano Banana API 转售平台。
您的验证码是:{code}
此验证码有效期为10分钟请勿泄露给他人。
如果不是您本人操作,请忽略此邮件。
---
Nano Banana API 平台
"""
msg = Message(
subject=f'[Nano Banana] {subject}',
recipients=[email],
body=body
)
# 这里使用 Flask-Mail 发送
from extensions import mail
mail.send(msg)
return True
except Exception as e:
print(f'发送邮件失败: {str(e)}')
return False
@staticmethod
def send_verification_code(data):
"""发送验证码逻辑"""
email = data.get('email', '').strip().lower()
purpose = data.get('purpose', 'register')
if not email:
return {'error': '邮箱不能为空'}, 400
# 验证邮箱格式
try:
validate_email(email, check_deliverability=False)
except EmailNotValidError:
return {'error': '邮箱格式不正确'}, 400
# 如果是注册,检查邮箱是否已存在
if purpose == 'register':
user = User.query.filter_by(email=email).first()
if user:
# 如果用户已存在且已验证邮箱,则提示已注册
if user.email_verified:
return {'error': '该邮箱已被注册'}, 400
# 如果用户存在但未验证,可以继续发送验证码(重发)
else:
# 创建临时用户记录用于验证码关联
user = User(email=email, username=email.split('@')[0])
user.set_password('temp_pending_registration')
db.session.add(user)
db.session.commit()
else:
# 密码重置:检查用户是否存在
user = User.query.filter_by(email=email).first()
if not user:
return {'error': '用户不存在'}, 404
# 生成验证码
code = VerificationCode.generate_code()
# 删除旧的未使用验证码
VerificationCode.query.filter_by(
email=email,
purpose=purpose,
used=False
).delete()
# 创建新的验证码
verification = VerificationCode(
user_id=user.id,
email=email,
code=code,
purpose=purpose,
expired_at=datetime.utcnow() + timedelta(minutes=10)
)
db.session.add(verification)
db.session.commit()
# 发送邮件
send_success = AuthService.send_verification_email(email, code, purpose)
if send_success:
return {
'message': '验证码已发送到您的邮箱',
'email': email,
'purpose': purpose
}, 200
else:
return {'error': '邮件发送失败,请检查邮箱配置'}, 500
@staticmethod
def register(data):
"""用户注册逻辑"""
if not data or not data.get('email') or not data.get('password') or not data.get('code'):
return {'error': '邮箱、密码和验证码不能为空'}, 400
email = data.get('email').strip().lower()
password = data.get('password')
code = data.get('code')
username = data.get('username', '').strip()
try:
validate_email(email, check_deliverability=False)
except EmailNotValidError:
return {'error': '邮箱格式不正确'}, 400
if len(password) < 6:
return {'error': '密码长度至少为6位'}, 400
verification = VerificationCode.query.filter_by(
email=email,
code=code,
purpose='register',
used=False
).first()
if not verification:
return {'error': '验证码不存在或已过期'}, 400
if not verification.is_valid():
return {'error': '验证码已过期'}, 400
existing_user = User.query.filter_by(email=email).filter(
User.id != verification.user_id
).first()
if existing_user:
return {'error': '该邮箱已被注册'}, 400
user = User.query.get(verification.user_id)
user.username = username or email.split('@')[0]
user.set_password(password)
user.email_verified = True
user.email_verified_at = datetime.utcnow()
user.is_active = True
verification.used = True
try:
db.session.commit()
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
return {
'message': '注册成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}, 201
except Exception as e:
db.session.rollback()
print(f'注册错误: {str(e)}')
return {'error': '注册失败,请稍后重试'}, 500
@staticmethod
def login(data):
"""用户登录逻辑"""
if not data or not data.get('email') or not data.get('password'):
return {'error': '邮箱和密码不能为空'}, 400
email = data.get('email').strip().lower()
password = data.get('password')
user = User.query.filter_by(email=email).first()
if not user or not user.check_password(password):
return {'error': '邮箱或密码错误'}, 401
if not user.is_active:
return {'error': '账户已被禁用'}, 403
# 如果用户未验证邮箱 (理论上注册流程保证了已验证,但为了兼容旧数据)
if not user.email_verified:
return {'error': '邮箱未验证,请先完成注册验证'}, 403
access_token = create_access_token(identity=user.id)
refresh_token = create_refresh_token(identity=user.id)
return {
'message': '登录成功',
'access_token': access_token,
'refresh_token': refresh_token,
'user': user.to_dict()
}, 200
@staticmethod
def change_password(user_id, data):
"""修改密码"""
user = User.query.get(user_id)
if not user:
return {'error': '用户不存在'}, 404
old_password = data.get('old_password')
new_password = data.get('new_password')
if not old_password or not new_password:
return {'error': '参数不能为空'}, 400
if not user.check_password(old_password):
return {'error': '原密码错误'}, 401
if len(new_password) < 6:
return {'error': '新密码长度至少为6位'}, 400
user.set_password(new_password)
db.session.commit()
return {'message': '密码修改成功'}, 200
@staticmethod
def reset_password(data):
"""重置密码"""
email = data.get('email', '').strip().lower()
code = data.get('code')
new_password = data.get('new_password')
if not email or not code or not new_password:
return {'error': '参数不能为空'}, 400
# 验证验证码
verification = VerificationCode.query.filter_by(
email=email,
code=code,
purpose='password_reset',
used=False
).first()
if not verification or not verification.is_valid():
return {'error': '验证码不存在或已过期'}, 400
user = User.query.get(verification.user_id)
if not user:
return {'error': '用户不存在'}, 404
if len(new_password) < 6:
return {'error': '密码长度至少为6位'}, 400
# 更新密码
user.set_password(new_password)
verification.used = True
db.session.commit()
return {'message': '密码重置成功'}, 200
@staticmethod
def verify_code(data):
"""验证验证码是否有效"""
email = data.get('email', '').strip().lower()
code = data.get('code')
purpose = data.get('purpose', 'register')
if not email or not code:
return {'error': '参数不能为空'}, 400
verification = VerificationCode.query.filter_by(
email=email,
code=code,
purpose=purpose,
used=False
).first()
if not verification:
return {'error': '验证码错误'}, 400
if not verification.is_valid():
return {'error': '验证码已过期'}, 400
return {'message': '验证码有效'}, 200