114 lines
3.8 KiB
Python
114 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
MySQL 数据库操作模块
|
|
支持基本的增删改查操作
|
|
"""
|
|
|
|
import sys
|
|
try:
|
|
import pymysql
|
|
from pymysql.cursors import DictCursor
|
|
except ImportError as e:
|
|
print(f"错误: 无法导入 pymysql 库,请确保已安装: pip install pymysql")
|
|
print(f"详细错误: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
class MySQLDatabase:
|
|
"""MySQL 数据库连接和操作类"""
|
|
|
|
def __init__(self, host='localhost', port=3306, user='root', password='', database=None):
|
|
"""
|
|
初始化 MySQL 连接参数
|
|
:param host: 数据库主机地址
|
|
:param port: 数据库端口
|
|
:param user: 用户名
|
|
:param password: 密码
|
|
:param database: 数据库名(可选,不指定则只连接到服务器)
|
|
"""
|
|
self.host = host
|
|
self.port = port
|
|
self.user = user
|
|
self.password = password
|
|
self.database = database
|
|
self.connection = None
|
|
|
|
def connect(self):
|
|
"""建立数据库连接"""
|
|
try:
|
|
conn_params = {
|
|
'host': self.host,
|
|
'port': self.port,
|
|
'user': self.user,
|
|
'password': self.password,
|
|
'charset': 'utf8mb4',
|
|
'cursorclass': DictCursor
|
|
}
|
|
# 如果指定了数据库,则连接到该数据库
|
|
if self.database:
|
|
conn_params['database'] = self.database
|
|
|
|
self.connection = pymysql.connect(**conn_params)
|
|
msg = f"MySQL 连接成功" + (f" (数据库: {self.database})" if self.database else " (未选择数据库)")
|
|
return True, msg
|
|
except Exception as e:
|
|
return False, f"MySQL 连接失败: {str(e)}"
|
|
|
|
def execute(self, sql, params=None):
|
|
"""
|
|
执行 SQL 语句
|
|
:param sql: SQL 语句
|
|
:param params: 参数(可选)
|
|
:return: 执行结果
|
|
"""
|
|
if not self.connection:
|
|
return False, "未连接到数据库"
|
|
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute(sql, params)
|
|
|
|
# 判断是查询还是修改操作
|
|
if sql.strip().upper().startswith(('SELECT', 'SHOW', 'DESC', 'DESCRIBE')):
|
|
result = cursor.fetchall()
|
|
return True, result
|
|
else:
|
|
self.connection.commit()
|
|
return True, f"影响行数: {cursor.rowcount}"
|
|
except Exception as e:
|
|
self.connection.rollback()
|
|
return False, f"执行失败: {str(e)}"
|
|
|
|
def list_databases(self):
|
|
"""列出所有数据库"""
|
|
if not self.connection:
|
|
return False, "未连接到数据库"
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute("SHOW DATABASES")
|
|
result = cursor.fetchall()
|
|
databases = [db['Database'] for db in result]
|
|
return True, databases
|
|
except Exception as e:
|
|
return False, f"获取数据库列表失败: {str(e)}"
|
|
|
|
def use_database(self, database):
|
|
"""切换到指定数据库"""
|
|
if not self.connection:
|
|
return False, "未连接到数据库"
|
|
try:
|
|
self.connection.select_db(database)
|
|
self.database = database
|
|
return True, f"已切换到数据库: {database}"
|
|
except Exception as e:
|
|
return False, f"切换数据库失败: {str(e)}"
|
|
|
|
def close(self):
|
|
"""关闭数据库连接"""
|
|
if self.connection:
|
|
self.connection.close()
|
|
self.connection = None
|
|
return "MySQL 连接已关闭"
|
|
return "连接已经关闭"
|