import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from email.header import Header import random import string import json import os # 邮件发送配置 SENDER_EMAIL = '3205788256@qq.com' # 发件人邮箱 SENDER_AUTH_CODE = 'szcaxvbftusqddhi' # 授权码 SMTP_SERVER = 'smtp.qq.com' # QQ邮箱SMTP服务器 SMTP_PORT = 465 # QQ邮箱SSL端口 # 验证码缓存文件 VERIFICATION_CACHE_FILE = os.path.join("config", "verification_codes.json") class QQMailAPI: """QQ邮箱发送邮件API类""" def __init__(self, sender_email, authorization_code): """ 初始化邮箱配置 :param sender_email: 发送方QQ邮箱地址 :param authorization_code: QQ邮箱授权码 """ self.sender_email = sender_email self.authorization_code = authorization_code self.smtp_server = 'smtp.qq.com' self.smtp_port = 465 # SSL端口 def send_text_email(self, receiver_email, subject, content, cc_emails=None): """ 发送纯文本邮件 :param receiver_email: 接收方邮箱地址(单个) :param subject: 邮件主题 :param content: 邮件正文内容 :param cc_emails: 抄送邮箱列表 :return: 发送成功返回True,失败返回False """ try: # 创建邮件对象 message = MIMEText(content, 'plain', 'utf-8') message['From'] = Header(self.sender_email, 'utf-8') message['To'] = Header(receiver_email, 'utf-8') message['Subject'] = Header(subject, 'utf-8') # 添加抄送 if cc_emails: message['Cc'] = Header(",".join(cc_emails), 'utf-8') all_receivers = [receiver_email] + cc_emails else: all_receivers = [receiver_email] # 连接SMTP服务器并发送邮件 with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: server.login(self.sender_email, self.authorization_code) server.sendmail(self.sender_email, all_receivers, message.as_string()) print(f"邮件发送成功:主题='{subject}', 收件人='{receiver_email}'") return True except Exception as e: print(f"邮件发送失败:{str(e)}") return False def send_html_email(self, receiver_email, subject, html_content, cc_emails=None, attachments=None): """ 发送HTML格式邮件,可带附件 :param receiver_email: 接收方邮箱地址(单个) :param subject: 邮件主题 :param html_content: HTML格式的邮件正文 :param cc_emails: 抄送邮箱列表 :param attachments: 附件文件路径列表 :return: 发送成功返回True,失败返回False """ try: # 创建带附件的邮件对象 message = MIMEMultipart() message['From'] = Header(self.sender_email, 'utf-8') message['To'] = Header(receiver_email, 'utf-8') message['Subject'] = Header(subject, 'utf-8') # 添加抄送 if cc_emails: message['Cc'] = Header(",".join(cc_emails), 'utf-8') all_receivers = [receiver_email] + cc_emails else: all_receivers = [receiver_email] # 添加HTML正文 message.attach(MIMEText(html_content, 'html', 'utf-8')) # 添加附件 if attachments: for file_path in attachments: try: with open(file_path, 'rb') as file: attachment = MIMEApplication(file.read(), _subtype="octet-stream") attachment.add_header('Content-Disposition', 'attachment', filename=file_path.split("/")[-1]) message.attach(attachment) except Exception as e: print(f"添加附件失败 {file_path}: {str(e)}") # 连接SMTP服务器并发送邮件 with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: server.login(self.sender_email, self.authorization_code) server.sendmail(self.sender_email, all_receivers, message.as_string()) print(f"HTML邮件发送成功:主题='{subject}', 收件人='{receiver_email}'") return True except Exception as e: print(f"HTML邮件发送失败:{str(e)}") return False class EmailVerification: @staticmethod def generate_verification_code(length=6): """ 生成指定长度的随机验证码 参数: length (int): 验证码长度,默认6位 返回: str: 生成的验证码 """ # 生成包含大写字母和数字的验证码 chars = string.ascii_uppercase + string.digits return ''.join(random.choice(chars) for _ in range(length)) @staticmethod def send_verification_email(qq_number, verification_code, email_type="register"): """ 发送验证码邮件到QQ邮箱 参数: qq_number (str): 接收者QQ号 verification_code (str): 验证码 email_type (str): 邮件类型,"register" 或 "reset_password" 返回: bool: 发送成功返回True,否则返回False str: 成功或错误信息 """ receiver_email = f"{qq_number}@qq.com" # 根据邮件类型设置不同的内容 if email_type == "reset_password": email_title = "【萌芽农场】密码重置验证码" email_purpose = "重置萌芽农场游戏账号密码" email_color = "#FF6B35" # 橙红色,表示警告性操作 else: email_title = "【萌芽农场】注册验证码" email_purpose = "注册萌芽农场游戏账号" email_color = "#4CAF50" # 绿色,表示正常操作 # 创建邮件内容 message = MIMEText(f'''

萌芽农场 - 邮箱验证码

亲爱的玩家,您好!

您正在{email_purpose},您的验证码是:

{verification_code}

该验证码有效期为5分钟,请勿泄露给他人。

如果这不是您本人的操作,请忽略此邮件。

本邮件由系统自动发送,请勿直接回复。

''', 'html', 'utf-8') # 修正From头格式,符合QQ邮箱的要求 message['From'] = SENDER_EMAIL message['To'] = receiver_email message['Subject'] = Header(email_title, 'utf-8') try: # 使用SSL/TLS连接而不是STARTTLS smtp_obj = smtplib.SMTP_SSL(SMTP_SERVER, 465) smtp_obj.login(SENDER_EMAIL, SENDER_AUTH_CODE) smtp_obj.sendmail(SENDER_EMAIL, [receiver_email], message.as_string()) smtp_obj.quit() return True, "验证码发送成功" except Exception as e: return False, f"发送验证码失败: {str(e)}" @staticmethod def save_verification_code(qq_number, verification_code, expiry_time=300, code_type="register"): """ 保存验证码到缓存文件 参数: qq_number (str): QQ号 verification_code (str): 验证码 expiry_time (int): 过期时间(秒),默认5分钟 code_type (str): 验证码类型,"register" 或 "reset_password" 返回: bool: 保存成功返回True,否则返回False """ import time # 创建目录(如果不存在) os.makedirs(os.path.dirname(VERIFICATION_CACHE_FILE), exist_ok=True) # 读取现有的验证码数据 verification_data = {} if os.path.exists(VERIFICATION_CACHE_FILE): try: with open(VERIFICATION_CACHE_FILE, 'r', encoding='utf-8') as file: verification_data = json.load(file) except Exception as e: print(f"读取验证码文件失败: {str(e)}") verification_data = {} # 添加新的验证码 expire_at = time.time() + expiry_time current_time = time.time() # 创建验证码记录,包含更多信息用于调试 verification_data[qq_number] = { "code": verification_code, "expire_at": expire_at, "code_type": code_type, "created_at": current_time, "used": False # 新增:标记验证码是否已使用 } # 保存到文件 try: with open(VERIFICATION_CACHE_FILE, 'w', encoding='utf-8') as file: json.dump(verification_data, file, indent=2, ensure_ascii=False) print(f"[验证码系统] 为QQ {qq_number} 保存{code_type}验证码: {verification_code}, 过期时间: {expire_at}") return True except Exception as e: print(f"保存验证码失败: {str(e)}") return False @staticmethod def verify_code(qq_number, input_code, code_type="register"): """ 验证用户输入的验证码 参数: qq_number (str): QQ号 input_code (str): 用户输入的验证码 code_type (str): 验证码类型,"register" 或 "reset_password" 返回: bool: 验证成功返回True,否则返回False str: 成功或错误信息 """ import time # 检查缓存文件是否存在 if not os.path.exists(VERIFICATION_CACHE_FILE): print(f"[验证码系统] QQ {qq_number} 验证失败: 缓存文件不存在") return False, "验证码不存在或已过期" # 读取验证码数据 try: with open(VERIFICATION_CACHE_FILE, 'r', encoding='utf-8') as file: verification_data = json.load(file) except Exception as e: print(f"[验证码系统] 读取验证码文件失败: {str(e)}") return False, "验证码数据损坏" # 检查该QQ号是否有验证码 if qq_number not in verification_data: print(f"[验证码系统] QQ {qq_number} 验证失败: 没有找到验证码记录") return False, "验证码不存在,请重新获取" # 获取存储的验证码信息 code_info = verification_data[qq_number] stored_code = code_info.get("code", "") expire_at = code_info.get("expire_at", 0) stored_code_type = code_info.get("code_type", "register") is_used = code_info.get("used", False) created_at = code_info.get("created_at", 0) print(f"[验证码系统] QQ {qq_number} 验证码详情: 存储码={stored_code}, 输入码={input_code}, 类型={stored_code_type}, 已使用={is_used}, 创建时间={created_at}") # 检查验证码类型是否匹配 if stored_code_type != code_type: print(f"[验证码系统] QQ {qq_number} 验证失败: 验证码类型不匹配,存储类型={stored_code_type}, 请求类型={code_type}") return False, f"验证码类型不匹配,请重新获取{code_type}验证码" # 检查验证码是否已被使用 if is_used: print(f"[验证码系统] QQ {qq_number} 验证失败: 验证码已被使用") return False, "验证码已被使用,请重新获取" # 检查验证码是否过期 current_time = time.time() if current_time > expire_at: # 移除过期的验证码 del verification_data[qq_number] with open(VERIFICATION_CACHE_FILE, 'w', encoding='utf-8') as file: json.dump(verification_data, file, indent=2, ensure_ascii=False) print(f"[验证码系统] QQ {qq_number} 验证失败: 验证码已过期") return False, "验证码已过期,请重新获取" # 验证码比较(不区分大小写) if input_code.upper() == stored_code.upper(): # 验证成功,标记为已使用而不是删除 verification_data[qq_number]["used"] = True verification_data[qq_number]["used_at"] = current_time try: with open(VERIFICATION_CACHE_FILE, 'w', encoding='utf-8') as file: json.dump(verification_data, file, indent=2, ensure_ascii=False) print(f"[验证码系统] QQ {qq_number} 验证成功: 验证码已标记为已使用") return True, "验证码正确" except Exception as e: print(f"[验证码系统] 标记验证码已使用时失败: {str(e)}") return True, "验证码正确" # 即使标记失败,验证还是成功的 else: print(f"[验证码系统] QQ {qq_number} 验证失败: 验证码不匹配") return False, "验证码错误" @staticmethod def clean_expired_codes(): """ 清理过期的验证码和已使用的验证码 """ import time if not os.path.exists(VERIFICATION_CACHE_FILE): return try: with open(VERIFICATION_CACHE_FILE, 'r', encoding='utf-8') as file: verification_data = json.load(file) current_time = time.time() removed_keys = [] # 找出过期的验证码和已使用的验证码(超过1小时) for qq_number, code_info in verification_data.items(): expire_at = code_info.get("expire_at", 0) is_used = code_info.get("used", False) used_at = code_info.get("used_at", 0) should_remove = False # 过期的验证码 if current_time > expire_at: should_remove = True print(f"[验证码清理] 移除过期验证码: QQ {qq_number}") # 已使用超过1小时的验证码 elif is_used and used_at > 0 and (current_time - used_at) > 3600: should_remove = True print(f"[验证码清理] 移除已使用的验证码: QQ {qq_number}") if should_remove: removed_keys.append(qq_number) # 移除标记的验证码 for key in removed_keys: del verification_data[key] # 保存更新后的数据 if removed_keys: with open(VERIFICATION_CACHE_FILE, 'w', encoding='utf-8') as file: json.dump(verification_data, file, indent=2, ensure_ascii=False) print(f"[验证码清理] 共清理了 {len(removed_keys)} 个验证码") except Exception as e: print(f"清理验证码失败: {str(e)}") @staticmethod def get_verification_status(qq_number): """ 获取验证码状态(用于调试) 参数: qq_number (str): QQ号 返回: dict: 验证码状态信息 """ import time if not os.path.exists(VERIFICATION_CACHE_FILE): return {"status": "no_cache_file"} try: with open(VERIFICATION_CACHE_FILE, 'r', encoding='utf-8') as file: verification_data = json.load(file) if qq_number not in verification_data: return {"status": "no_code"} code_info = verification_data[qq_number] current_time = time.time() return { "status": "found", "code": code_info.get("code", ""), "code_type": code_info.get("code_type", "unknown"), "used": code_info.get("used", False), "expired": current_time > code_info.get("expire_at", 0), "created_at": code_info.get("created_at", 0), "expire_at": code_info.get("expire_at", 0), "used_at": code_info.get("used_at", 0) } except Exception as e: return {"status": "error", "message": str(e)} # 测试邮件发送 if __name__ == "__main__": # 清理过期验证码 EmailVerification.clean_expired_codes() # 生成验证码 test_qq = input("请输入测试QQ号: ") verification_code = EmailVerification.generate_verification_code() print(f"生成的验证码: {verification_code}") # 发送测试邮件 success, message = EmailVerification.send_verification_email(test_qq, verification_code) print(f"发送结果: {success}, 消息: {message}") if success: # 保存验证码 EmailVerification.save_verification_code(test_qq, verification_code) # 测试验证 test_input = input("请输入收到的验证码: ") verify_success, verify_message = EmailVerification.verify_code(test_qq, test_input) print(f"验证结果: {verify_success}, 消息: {verify_message}")