from models import db, User, APIKey from datetime import datetime class APIKeyService: @staticmethod def list_api_keys(user_id): """获取用户的所有 API Key""" user = User.query.get(user_id) if not user: return {'error': '用户不存在'}, 404 keys = APIKey.query.filter_by(user_id=user_id).all() return { 'total': len(keys), 'keys': [key.to_dict() for key in keys] }, 200 @staticmethod def create_api_key(user_id, data): """创建新的 API Key""" user = User.query.get(user_id) if not user: return {'error': '用户不存在'}, 404 name = data.get('name', '').strip() if not name: return {'error': 'API Key 名称不能为空'}, 400 if len(name) > 100: return {'error': 'API Key 名称长度不能超过100个字符'}, 400 # 生成 API Key api_key = APIKey.generate_key() # 创建数据库记录 new_key = APIKey( user_id=user_id, name=name, api_key=api_key ) try: db.session.add(new_key) db.session.commit() return { 'message': 'API Key 创建成功', 'key': new_key.to_dict() }, 201 except Exception as e: db.session.rollback() return {'error': '创建失败,请稍后重试'}, 500 @staticmethod def get_api_key(user_id, key_id): """获取单个 API Key 详情""" key = APIKey.query.filter_by(id=key_id, user_id=user_id).first() if not key: return {'error': 'API Key 不存在'}, 404 return key.to_dict(), 200 @staticmethod def update_api_key(user_id, key_id, data): """更新 API Key 名称或状态""" key = APIKey.query.filter_by(id=key_id, user_id=user_id).first() if not key: return {'error': 'API Key 不存在'}, 404 if 'name' in data: name = data.get('name', '').strip() if not name or len(name) > 100: return {'error': 'API Key 名称无效'}, 400 key.name = name if 'is_active' in data: key.is_active = bool(data.get('is_active')) try: db.session.commit() return { 'message': 'API Key 更新成功', 'key': key.to_dict() }, 200 except Exception as e: db.session.rollback() return {'error': '更新失败,请稍后重试'}, 500 @staticmethod def delete_api_key(user_id, key_id): """删除 API Key""" key = APIKey.query.filter_by(id=key_id, user_id=user_id).first() if not key: return {'error': 'API Key 不存在'}, 404 try: db.session.delete(key) db.session.commit() return {'message': 'API Key 已删除'}, 200 except Exception as e: db.session.rollback() return {'error': '删除失败,请稍后重试'}, 500 @staticmethod def regenerate_api_key(user_id, key_id): """重置/轮换 API Key""" key = APIKey.query.filter_by(id=key_id, user_id=user_id).first() if not key: return {'error': 'API Key 不存在'}, 404 # 生成新的 API Key new_api_key = APIKey.generate_key() key.api_key = new_api_key try: db.session.commit() return { 'message': 'API Key 已重置', 'key': key.to_dict() }, 200 except Exception as e: db.session.rollback() return {'error': '重置失败,请稍后重试'}, 500 @staticmethod def authenticate_api_key(auth_header): """验证 API Key 并返回用户""" if not auth_header: return None, "缺少 Authorization 头" parts = auth_header.split() if parts[0].lower() != "bearer": return None, "Authorization 头格式错误" if len(parts) == 1: return None, "无效的 Token" api_key_str = parts[1] # 查找 API Key api_key = APIKey.query.filter_by(api_key=api_key_str).first() if not api_key: return None, "无效的 API Key" if not api_key.is_active: return None, "API Key 已被禁用" # 更新最后使用时间 api_key.last_used_at = datetime.utcnow() db.session.commit() user = User.query.get(api_key.user_id) if not user or not user.is_active: return None, "账户不存在或已被禁用" return user, None