166 lines
4.9 KiB
Python
166 lines
4.9 KiB
Python
from models import db, User, APIKey
|
|
from datetime import datetime
|
|
|
|
class APIKeyService:
|
|
@staticmethod
|
|
def list_api_keys(user_id):
|
|
"""获取用户的所有 API Key"""
|
|
user = User.query.get(user_id)
|
|
|
|
if not user:
|
|
return {'error': '用户不存在'}, 404
|
|
|
|
keys = APIKey.query.filter_by(user_id=user_id).all()
|
|
|
|
return {
|
|
'total': len(keys),
|
|
'keys': [key.to_dict() for key in keys]
|
|
}, 200
|
|
|
|
@staticmethod
|
|
def create_api_key(user_id, data):
|
|
"""创建新的 API Key"""
|
|
user = User.query.get(user_id)
|
|
|
|
if not user:
|
|
return {'error': '用户不存在'}, 404
|
|
|
|
name = data.get('name', '').strip()
|
|
|
|
if not name:
|
|
return {'error': 'API Key 名称不能为空'}, 400
|
|
|
|
if len(name) > 100:
|
|
return {'error': 'API Key 名称长度不能超过100个字符'}, 400
|
|
|
|
# 生成 API Key
|
|
api_key = APIKey.generate_key()
|
|
|
|
# 创建数据库记录
|
|
new_key = APIKey(
|
|
user_id=user_id,
|
|
name=name,
|
|
api_key=api_key
|
|
)
|
|
|
|
try:
|
|
db.session.add(new_key)
|
|
db.session.commit()
|
|
|
|
return {
|
|
'message': 'API Key 创建成功',
|
|
'key': new_key.to_dict()
|
|
}, 201
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return {'error': '创建失败,请稍后重试'}, 500
|
|
|
|
@staticmethod
|
|
def get_api_key(user_id, key_id):
|
|
"""获取单个 API Key 详情"""
|
|
key = APIKey.query.filter_by(id=key_id, user_id=user_id).first()
|
|
|
|
if not key:
|
|
return {'error': 'API Key 不存在'}, 404
|
|
|
|
return key.to_dict(), 200
|
|
|
|
@staticmethod
|
|
def update_api_key(user_id, key_id, data):
|
|
"""更新 API Key 名称或状态"""
|
|
key = APIKey.query.filter_by(id=key_id, user_id=user_id).first()
|
|
|
|
if not key:
|
|
return {'error': 'API Key 不存在'}, 404
|
|
|
|
if 'name' in data:
|
|
name = data.get('name', '').strip()
|
|
if not name or len(name) > 100:
|
|
return {'error': 'API Key 名称无效'}, 400
|
|
key.name = name
|
|
|
|
if 'is_active' in data:
|
|
key.is_active = bool(data.get('is_active'))
|
|
|
|
try:
|
|
db.session.commit()
|
|
return {
|
|
'message': 'API Key 更新成功',
|
|
'key': key.to_dict()
|
|
}, 200
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return {'error': '更新失败,请稍后重试'}, 500
|
|
|
|
@staticmethod
|
|
def delete_api_key(user_id, key_id):
|
|
"""删除 API Key"""
|
|
key = APIKey.query.filter_by(id=key_id, user_id=user_id).first()
|
|
|
|
if not key:
|
|
return {'error': 'API Key 不存在'}, 404
|
|
|
|
try:
|
|
db.session.delete(key)
|
|
db.session.commit()
|
|
return {'message': 'API Key 已删除'}, 200
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return {'error': '删除失败,请稍后重试'}, 500
|
|
|
|
@staticmethod
|
|
def regenerate_api_key(user_id, key_id):
|
|
"""重置/轮换 API Key"""
|
|
key = APIKey.query.filter_by(id=key_id, user_id=user_id).first()
|
|
|
|
if not key:
|
|
return {'error': 'API Key 不存在'}, 404
|
|
|
|
# 生成新的 API Key
|
|
new_api_key = APIKey.generate_key()
|
|
key.api_key = new_api_key
|
|
|
|
try:
|
|
db.session.commit()
|
|
return {
|
|
'message': 'API Key 已重置',
|
|
'key': key.to_dict()
|
|
}, 200
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return {'error': '重置失败,请稍后重试'}, 500
|
|
|
|
@staticmethod
|
|
def authenticate_api_key(auth_header):
|
|
"""验证 API Key 并返回用户"""
|
|
if not auth_header:
|
|
return None, "缺少 Authorization 头"
|
|
|
|
parts = auth_header.split()
|
|
if parts[0].lower() != "bearer":
|
|
return None, "Authorization 头格式错误"
|
|
|
|
if len(parts) == 1:
|
|
return None, "无效的 Token"
|
|
|
|
api_key_str = parts[1]
|
|
|
|
# 查找 API Key
|
|
api_key = APIKey.query.filter_by(api_key=api_key_str).first()
|
|
|
|
if not api_key:
|
|
return None, "无效的 API Key"
|
|
|
|
if not api_key.is_active:
|
|
return None, "API Key 已被禁用"
|
|
|
|
# 更新最后使用时间
|
|
api_key.last_used_at = datetime.utcnow()
|
|
db.session.commit()
|
|
|
|
user = User.query.get(api_key.user_id)
|
|
if not user or not user.is_active:
|
|
return None, "账户不存在或已被禁用"
|
|
|
|
return user, None
|