commit c9259b51c7e0d8bc67663550a61a0e726273a830 Author: 树萌芽 <3205788256@qq.com> Date: Mon Dec 29 22:08:58 2025 +0800 初始化提交 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..169ce1b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__ +test \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..a782864 --- /dev/null +++ b/README.md @@ -0,0 +1,214 @@ +# 多数据库交互脚本 + +一个支持多种数据库的 Python 交互式命令行工具,可以方便地连接和操作各种数据库。 + +## 支持的数据库 + +- ✅ MySQL +- ✅ MongoDB +- ✅ Redis +- ✅ SQLite +- ✅ PostgreSQL + +## 系统支持 + +- ✅ Windows +- ✅ Linux +- ✅ macOS +- ✅ Termux (Android) + +## 安装依赖 + +```bash +# 安装所有依赖 +pip install -r requirements.txt + +# 或者单独安装需要的数据库驱动 +pip install pymysql # MySQL +pip install pymongo # MongoDB +pip install redis # Redis +pip install psycopg2-binary # PostgreSQL +# SQLite 是 Python 内置模块,无需安装 +``` + +## 使用方法 + +### 1. 启动脚本 + +```bash +python main.py +``` + +### 2. 选择数据库类型 + +启动后会提示选择数据库类型: + +``` +请选择数据库类型: +1. MySQL +2. MongoDB +3. Redis +4. SQLite +5. PostgreSQL +``` + +### 3. 输入连接信息 + +根据提示输入数据库的连接信息(主机、端口、用户名、密码等)。 + +### 4. 执行命令 + +连接成功后,就可以输入相应的命令进行数据库操作。 + +## 命令示例 + +### MySQL / PostgreSQL / SQLite + +```sql +-- 查询 +SELECT * FROM users; +SELECT name, age FROM users WHERE age > 18; + +-- 插入 +INSERT INTO users (name, age) VALUES ('张三', 25); + +-- 更新 +UPDATE users SET age = 26 WHERE name = '张三'; + +-- 删除 +DELETE FROM users WHERE name = '张三'; + +-- 创建表 +CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER); +``` + +### MongoDB + +MongoDB 使用 JSON 格式的命令: + +```json +// 查询所有文档 +{"collection": "users", "operation": "find", "query": {}} + +// 条件查询 +{"collection": "users", "operation": "find", "query": {"age": {"$gt": 18}}} + +// 插入文档 +{"collection": "users", "operation": "insert", "document": {"name": "张三", "age": 25}} + +// 更新文档 +{"collection": "users", "operation": "update", "query": {"name": "张三"}, "update": {"$set": {"age": 26}}} + +// 删除文档 +{"collection": "users", "operation": "delete", "query": {"name": "张三"}} + +// 统计数量 +{"collection": "users", "operation": "count", "query": {}} +``` + +### Redis + +``` +# 字符串操作 +SET name 张三 +GET name +DEL name + +# 哈希操作 +HSET user:1 name 张三 +HSET user:1 age 25 +HGET user:1 name +HGETALL user:1 + +# 列表操作 +LPUSH mylist item1 item2 item3 +LRANGE mylist 0 -1 + +# 集合操作 +SADD myset member1 member2 +SMEMBERS myset + +# 查询所有键 +KEYS * + +# 测试连接 +PING +``` + +## 内置命令 + +- `help` - 显示帮助信息 +- `connect` - 连接到数据库 +- `disconnect` - 断开数据库连接 +- `status` - 查看连接状态 +- `switch ` - 切换数据库类型 (mysql/mongodb/redis/sqlite/postgres) +- `quit` 或 `exit` - 退出程序 + +## 文件说明 + +- `main.py` - 主程序入口 +- `mysql.py` - MySQL 数据库操作模块 +- `mongodb.py` - MongoDB 数据库操作模块 +- `redis.py` - Redis 数据库操作模块 +- `sqlite.py` - SQLite 数据库操作模块 +- `postgres.py` - PostgreSQL 数据库操作模块 +- `requirements.txt` - 依赖包列表 + +## 注意事项 + +1. 首次使用前请确保已安装相应的数据库驱动 +2. SQLite 会在当前目录创建数据库文件 +3. 确保数据库服务已启动并可以访问 +4. 使用完毕后建议执行 `disconnect` 命令断开连接 + +## 常见问题 + +### Q: 连接失败怎么办? + +A: 请检查: +- 数据库服务是否已启动 +- 主机地址和端口是否正确 +- 用户名和密码是否正确 +- 防火墙是否允许连接 + +### Q: Redis 连接时提示 "module 'redis' has no attribute 'Redis'" 怎么办? + +A: 这个问题已经修复。如果仍然遇到,请: +1. 确保安装了正确的 redis 包:`pip install redis` +2. 检查是否有其他名为 redis.py 的文件冲突 +3. 运行测试脚本检查依赖:`python test_import.py` + +### Q: 如何检查依赖是否正确安装? + +A: 运行测试脚本: +```bash +python test_import.py +``` +这将检查所有必需的数据库驱动是否正确安装。 + +### Q: 如何在 Termux 中使用? + +A: 在 Termux 中需要先安装 Python 和必要的依赖: + +```bash +pkg install python +pkg install build-essential # 编译某些包需要 +pip install -r requirements.txt +``` + +### Q: 如何切换数据库? + +A: 使用 `switch <数据库类型>` 命令,例如: + +``` +switch mysql +switch mongodb +``` + +### Q: MongoDB 需要用户认证怎么办? + +A: 在配置时会询问是否需要认证,选择 y 后输入用户名和密码即可。 + +## 许可证 + +MIT License diff --git a/main.py b/main.py new file mode 100644 index 0000000..262a456 --- /dev/null +++ b/main.py @@ -0,0 +1,447 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +多数据库交互脚本 +支持 MySQL, MongoDB, Redis, SQLite, PostgreSQL +跨平台支持: Windows, Linux, Termux 等 +""" + +import sys +import json + +# 导入数据库模块(处理可能的导入错误) +try: + from mysql_db import MySQLDatabase + from mongodb_db import MongoDatabase + from redis_db import RedisDatabase + from sqlite_db import SQLiteDatabase + from postgres_db import PostgreSQLDatabase +except ImportError as e: + print(f"错误: 导入数据库模块失败") + print(f"详细信息: {e}") + print("\n请确保已安装所需的依赖包:") + print(" pip install -r requirements.txt") + sys.exit(1) + + +def print_banner(): + """打印欢迎信息""" + banner = """ + ╔══════════════════════════════════════════╗ + ║ 多数据库交互工具 v1.0 ║ + ║ 支持: MySQL, MongoDB, Redis, ║ + ║ SQLite, PostgreSQL ║ + ╚══════════════════════════════════════════╝ + """ + print(banner) + + +def print_help(): + """打印帮助信息""" + help_text = """ +======================================================================== +脚本通用命令: + - help : 显示帮助信息 + - connect : 连接到数据库 + - disconnect : 断开数据库连接 + - status : 查看连接状态 + - switch : 切换数据库类型 (mysql/mongodb/redis/sqlite/postgres) + - show databases: 列出所有数据库 (仅MySQL/PostgreSQL) + - use : 切换到指定数据库 (仅MySQL/PostgreSQL) + - quit/exit : 退出程序 +======================================================================== +MySQL/PostgreSQL/SQLite 通用SQL语句: + SELECT * FROM users; + INSERT INTO users (name, age) VALUES ('张三', 25); + UPDATE users SET age = 26 WHERE name = '张三'; + DELETE FROM users WHERE name = '张三'; +======================================================================== +SQLite 点命令: + .tables - 列出所有表 + .schema - 显示所有表的结构 + .schema - 显示特定表的结构 + .databases - 列出数据库信息 + .indexes - 列出所有索引 + .indexes
- 列出特定表的索引 +======================================================================== +MongoDB Shell 命令: + show dbs - 列出所有数据库 + show collections - 列出当前数据库的集合 + use - 切换数据库 +======================================================================== +MongoDB JSON 命令: + {"collection": "users", "operation": "find", "query": {}} + {"collection": "users", "operation": "insert", "document": {"name": "张三", "age": 25}} + {"collection": "users", "operation": "update", "query": {"name": "张三"}, "update": {"$set": {"age": 26}}} + {"collection": "users", "operation": "delete", "query": {"name": "张三"}} +======================================================================== +Redis 键值命令: + SET name 张三 + GET name + HSET user:1 name 张三 age 25 + HGETALL user:1 + KEYS * +======================================================================== + """ + print(help_text) + + +def get_database_config(db_type): + """ + 获取数据库配置 + :param db_type: 数据库类型 + :return: 配置字典 + """ + config = {} + + if db_type == 'mysql': + print("\n请输入 MySQL 配置信息:") + config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost' + config['port'] = int(input("端口 [3306]: ").strip() or '3306') + config['user'] = input("用户名 [root]: ").strip() or 'root' + config['password'] = input("密码: ").strip() + + # 默认先列出数据库,然后让用户选择 + config['database'] = None # 先不指定数据库 + config['_list_databases'] = True # 标记需要列出数据库 + + elif db_type == 'mongodb': + print("\n请输入 MongoDB 配置信息:") + config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost' + config['port'] = int(input("端口 [27017]: ").strip() or '27017') + config['database'] = input("数据库名 [test]: ").strip() or 'test' + + # 询问是否需要认证 + need_auth = input("是否需要用户认证? (y/n) [n]: ").strip().lower() + if need_auth == 'y': + config['username'] = input("用户名: ").strip() + config['password'] = input("密码: ").strip() + else: + config['username'] = None + config['password'] = None + + elif db_type == 'redis': + print("\n请输入 Redis 配置信息:") + config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost' + config['port'] = int(input("端口 [6379]: ").strip() or '6379') + config['db'] = int(input("数据库编号 [0]: ").strip() or '0') + password = input("密码 (留空表示无密码): ").strip() + config['password'] = password if password else None + + elif db_type == 'sqlite': + print("\n请输入 SQLite 配置信息:") + config['database'] = input("数据库文件路径 [database.db]: ").strip() or 'database.db' + + elif db_type == 'postgres': + print("\n请输入 PostgreSQL 配置信息:") + config['host'] = input("主机地址 [localhost]: ").strip() or 'localhost' + config['port'] = int(input("端口 [5432]: ").strip() or '5432') + config['user'] = input("用户名 [postgres]: ").strip() or 'postgres' + config['password'] = input("密码: ").strip() + + # 默认先列出数据库,然后让用户选择 + config['database'] = 'postgres' # 先连接到默认数据库 + config['_list_databases'] = True # 标记需要列出数据库 + + return config + + +def create_database_instance(db_type, config): + """ + 创建数据库实例 + :param db_type: 数据库类型 + :param config: 配置字典 + :return: 数据库实例 + """ + if db_type == 'mysql': + return MySQLDatabase(**config) + elif db_type == 'mongodb': + return MongoDatabase(**config) + elif db_type == 'redis': + return RedisDatabase(**config) + elif db_type == 'sqlite': + return SQLiteDatabase(**config) + elif db_type == 'postgres': + return PostgreSQLDatabase(**config) + else: + return None + + +def format_output(data): + """ + 格式化输出结果 + :param data: 数据 + :return: 格式化后的字符串 + """ + if isinstance(data, list): + # 检查是否是字符串列表(如 .schema 的输出) + if data and all(isinstance(item, str) for item in data): + # 直接输出,保留换行符 + return '\n\n'.join(data) + else: + # JSON 格式化 + return json.dumps(data, ensure_ascii=False, indent=2) + elif isinstance(data, dict): + return json.dumps(data, ensure_ascii=False, indent=2) + else: + return str(data) + + +def main(): + """主函数""" + print_banner() + + # 选择数据库类型 + print("请选择数据库类型:") + print("1. MySQL") + print("2. MongoDB") + print("3. Redis") + print("4. SQLite") + print("5. PostgreSQL") + + choice = input("\n请输入选项 (1-5): ").strip() + + db_types = { + '1': 'mysql', + '2': 'mongodb', + '3': 'redis', + '4': 'sqlite', + '5': 'postgres' + } + + if choice not in db_types: + print("无效的选项!") + return + + current_db_type = db_types[choice] + print(f"\n已选择: {current_db_type.upper()}") + + # 获取配置并创建数据库实例 + config = get_database_config(current_db_type) + list_databases = config.pop('_list_databases', False) + db = create_database_instance(current_db_type, config) + + if not db: + print("创建数据库实例失败!") + return + + # 是否自动连接 + #auto_connect = input("\n是否立即连接? (y/n) [y]: ").strip().lower() + #if auto_connect != 'n': + success, message = db.connect() + print(f"\n{message}") + connected = success + + # 如果需要列出数据库 + if connected and list_databases and hasattr(db, 'list_databases'): + success_list, result = db.list_databases() + if success_list: + print("\n可用的数据库:") + for i, db_name in enumerate(result, 1): + print(f" {i}. {db_name}") + + # 让用户选择数据库 + if current_db_type in ['mysql', 'postgres']: + while True: + db_choice = input("\n请输入数据库名或编号 (直接回车跳过): ").strip() + if not db_choice: + break + + # 判断是数字还是数据库名 + selected_db = None + if db_choice.isdigit(): + index = int(db_choice) - 1 + if 0 <= index < len(result): + selected_db = result[index] + else: + print(f"错误: 编号超出范围,请输入 1-{len(result)} 之间的数字") + retry = input("是否重新输入? (y/n) [y]: ").strip().lower() + if retry == 'n': + break + continue + else: + selected_db = db_choice + + # 尝试切换数据库 + use_success, use_msg = db.use_database(selected_db) + print(use_msg) + + if use_success: + break + else: + retry = input("是否重新输入? (y/n) [y]: ").strip().lower() + if retry == 'n': + break + else: + print(f"\n{result}") + + + print("\n输入 'help' 查看帮助信息") + print("=" * 50) + + # 主循环 + while True: + try: + # 显示提示符 + prompt = f"\n[{current_db_type.upper()}{'*' if connected else ''}]> " + command = input(prompt).strip() + + if not command: + continue + + # 处理特殊命令 + cmd_lower = command.lower() + + if cmd_lower in ['quit', 'exit']: + if connected: + print(db.close()) + print("\n再见!") + break + + elif cmd_lower == 'help': + print_help() + continue + + elif cmd_lower == 'connect': + if connected: + print("已经连接到数据库") + else: + success, message = db.connect() + print(message) + connected = success + continue + + elif cmd_lower == 'disconnect': + if not connected: + print("未连接到数据库") + else: + print(db.close()) + connected = False + continue + + elif cmd_lower == 'status': + status = "已连接" if connected else "未连接" + print(f"数据库类型: {current_db_type.upper()}") + print(f"连接状态: {status}") + if connected and hasattr(db, 'database') and db.database: + print(f"当前数据库: {db.database}") + continue + + elif cmd_lower == 'show databases': + if not connected: + print("请先连接到数据库 (使用 'connect' 命令)") + elif not hasattr(db, 'list_databases'): + print("当前数据库类型不支持此命令") + else: + success, result = db.list_databases() + if success: + print("\n可用的数据库:") + for i, db_name in enumerate(result, 1): + print(f" {i}. {db_name}") + else: + print(f"\n{result}") + continue + + elif cmd_lower.startswith('use '): + if not connected: + print("请先连接到数据库 (使用 'connect' 命令)") + elif not hasattr(db, 'use_database'): + print("当前数据库类型不支持此命令") + else: + db_input = command.split(maxsplit=1)[1].strip() + + # 先获取数据库列表 + list_success, db_list = db.list_databases() + + # 判断是数字还是数据库名 + selected_db = None + if db_input.isdigit() and list_success: + index = int(db_input) - 1 + if 0 <= index < len(db_list): + selected_db = db_list[index] + else: + print(f"错误: 编号超出范围,请使用 'show databases' 查看可用数据库") + continue + else: + selected_db = db_input + + # 尝试切换数据库 + success, message = db.use_database(selected_db) + print(message) + + # 如果失败,询问是否重试 + if not success: + retry = input("是否重新输入数据库名? (y/n) [y]: ").strip().lower() + if retry != 'n': + while True: + new_db = input("请输入数据库名或编号: ").strip() + if not new_db: + break + + # 再次判断数字或名称 + if new_db.isdigit() and list_success: + idx = int(new_db) - 1 + if 0 <= idx < len(db_list): + new_db = db_list[idx] + else: + print(f"错误: 编号超出范围") + continue + + success2, message2 = db.use_database(new_db) + print(message2) + if success2: + break + + retry_again = input("是否继续重试? (y/n) [y]: ").strip().lower() + if retry_again == 'n': + break + continue + + elif cmd_lower.startswith('switch '): + new_type = cmd_lower.split()[1] + if new_type not in ['mysql', 'mongodb', 'redis', 'sqlite', 'postgres']: + print("无效的数据库类型!") + continue + + # 关闭当前连接 + if connected: + print(db.close()) + connected = False + + # 切换数据库 + current_db_type = new_type + config = get_database_config(current_db_type) + db = create_database_instance(current_db_type, config) + print(f"已切换到 {current_db_type.upper()}") + continue + + # 执行数据库命令 + if not connected: + print("请先连接到数据库 (使用 'connect' 命令)") + continue + + success, result = db.execute(command) + + if success: + print(f"\n执行成功:") + print(format_output(result)) + else: + print(f"\n执行失败:") + print(result) + + except KeyboardInterrupt: + print("\n\n操作已取消") + continue + except EOFError: + print("\n\n再见!") + break + except Exception as e: + print(f"\n发生错误: {str(e)}") + + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print("\n\n程序已退出") + sys.exit(0) diff --git a/mongodb_db.py b/mongodb_db.py new file mode 100644 index 0000000..5a09ada --- /dev/null +++ b/mongodb_db.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +MongoDB 数据库操作模块 +支持基本的文档操作 +""" + +import sys +import json + +try: + from pymongo import MongoClient +except ImportError as e: + print(f"错误: 无法导入 pymongo 库,请确保已安装: pip install pymongo") + print(f"详细错误: {e}") + sys.exit(1) + + +class MongoDatabase: + """MongoDB 数据库连接和操作类""" + + def __init__(self, host='localhost', port=27017, database='test', username=None, password=None): + """ + 初始化 MongoDB 连接参数 + :param host: 数据库主机地址 + :param port: 数据库端口 + :param database: 数据库名 + :param username: 用户名(可选) + :param password: 密码(可选) + """ + self.host = host + self.port = port + self.database_name = database + self.username = username + self.password = password + self.client = None + self.db = None + + def connect(self): + """建立数据库连接""" + try: + # 构建连接参数 + if self.username and self.password: + # 使用用户名密码认证 + self.client = MongoClient( + host=self.host, + port=self.port, + username=self.username, + password=self.password, + authSource=self.database_name, # 认证数据库 + serverSelectionTimeoutMS=5000 + ) + else: + # 无需认证 + self.client = MongoClient( + host=self.host, + port=self.port, + serverSelectionTimeoutMS=5000 + ) + + # 测试连接 + self.client.admin.command('ping') + self.db = self.client[self.database_name] + return True, f"MongoDB 连接成功 (数据库: {self.database_name})" + except Exception as e: + return False, f"MongoDB 连接失败: {str(e)}" + + def execute(self, command): + """ + 执行 MongoDB 命令 + :param command: MongoDB 命令字符串(JSON 格式或 shell 命令) + 示例: {"collection": "users", "operation": "find", "query": {}} + 或: show dbs, show collections, use + """ + if self.db is None: + return False, "未连接到数据库" + + # 处理 MongoDB shell 命令 + cmd_lower = command.strip().lower() + + # show dbs - 列出所有数据库 + if cmd_lower in ['show dbs', 'show databases']: + try: + db_list = self.client.list_database_names() + result = [] + for db_name in db_list: + db_stats = self.client[db_name].command('dbstats') + size_mb = db_stats.get('dataSize', 0) / (1024 * 1024) + result.append(f"{db_name}: {size_mb:.2f} MB") + return True, result + except Exception as e: + return False, f"执行失败: {str(e)}" + + # show collections - 列出当前数据库的所有集合 + elif cmd_lower in ['show collections', 'show tables']: + try: + collections = self.db.list_collection_names() + return True, collections + except Exception as e: + return False, f"执行失败: {str(e)}" + + # use - 切换数据库 + elif cmd_lower.startswith('use '): + try: + db_name = command.strip().split(maxsplit=1)[1] + self.db = self.client[db_name] + self.database_name = db_name + return True, f"已切换到数据库: {db_name}" + except Exception as e: + return False, f"切换数据库失败: {str(e)}" + + try: + # 解析命令 + if isinstance(command, str): + cmd = json.loads(command) + else: + cmd = command + + collection_name = cmd.get('collection') + operation = cmd.get('operation') + + if not collection_name or not operation: + return False, "命令必须包含 collection 和 operation 字段" + + collection = self.db[collection_name] + + # 根据操作类型执行不同的命令 + if operation == 'find': + query = cmd.get('query', {}) + limit = cmd.get('limit', 0) + result = list(collection.find(query).limit(limit)) + # 转换 ObjectId 为字符串 + for doc in result: + if '_id' in doc: + doc['_id'] = str(doc['_id']) + return True, result + + elif operation == 'insert': + document = cmd.get('document') + if not document: + return False, "插入操作需要 document 字段" + result = collection.insert_one(document) + return True, f"插入成功, ID: {result.inserted_id}" + + elif operation == 'update': + query = cmd.get('query', {}) + update = cmd.get('update') + if not update: + return False, "更新操作需要 update 字段" + result = collection.update_many(query, update) + return True, f"更新了 {result.modified_count} 条记录" + + elif operation == 'delete': + query = cmd.get('query', {}) + result = collection.delete_many(query) + return True, f"删除了 {result.deleted_count} 条记录" + + elif operation == 'count': + query = cmd.get('query', {}) + count = collection.count_documents(query) + return True, f"共有 {count} 条记录" + + else: + return False, f"不支持的操作: {operation}" + + except json.JSONDecodeError: + return False, "命令格式错误,请使用 JSON 格式" + except Exception as e: + return False, f"执行失败: {str(e)}" + + def close(self): + """关闭数据库连接""" + if self.client: + self.client.close() + self.client = None + self.db = None + return "MongoDB 连接已关闭" + return "连接已经关闭" diff --git a/mysql_db.py b/mysql_db.py new file mode 100644 index 0000000..4369b5c --- /dev/null +++ b/mysql_db.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +MySQL 数据库操作模块 +支持基本的增删改查操作 +""" + +import sys +try: + import pymysql + from pymysql.cursors import DictCursor +except ImportError as e: + print(f"错误: 无法导入 pymysql 库,请确保已安装: pip install pymysql") + print(f"详细错误: {e}") + sys.exit(1) + + +class MySQLDatabase: + """MySQL 数据库连接和操作类""" + + def __init__(self, host='localhost', port=3306, user='root', password='', database=None): + """ + 初始化 MySQL 连接参数 + :param host: 数据库主机地址 + :param port: 数据库端口 + :param user: 用户名 + :param password: 密码 + :param database: 数据库名(可选,不指定则只连接到服务器) + """ + self.host = host + self.port = port + self.user = user + self.password = password + self.database = database + self.connection = None + + def connect(self): + """建立数据库连接""" + try: + conn_params = { + 'host': self.host, + 'port': self.port, + 'user': self.user, + 'password': self.password, + 'charset': 'utf8mb4', + 'cursorclass': DictCursor + } + # 如果指定了数据库,则连接到该数据库 + if self.database: + conn_params['database'] = self.database + + self.connection = pymysql.connect(**conn_params) + msg = f"MySQL 连接成功" + (f" (数据库: {self.database})" if self.database else " (未选择数据库)") + return True, msg + except Exception as e: + return False, f"MySQL 连接失败: {str(e)}" + + def execute(self, sql, params=None): + """ + 执行 SQL 语句 + :param sql: SQL 语句 + :param params: 参数(可选) + :return: 执行结果 + """ + if not self.connection: + return False, "未连接到数据库" + + try: + with self.connection.cursor() as cursor: + cursor.execute(sql, params) + + # 判断是查询还是修改操作 + if sql.strip().upper().startswith(('SELECT', 'SHOW', 'DESC', 'DESCRIBE')): + result = cursor.fetchall() + return True, result + else: + self.connection.commit() + return True, f"影响行数: {cursor.rowcount}" + except Exception as e: + self.connection.rollback() + return False, f"执行失败: {str(e)}" + + def list_databases(self): + """列出所有数据库""" + if not self.connection: + return False, "未连接到数据库" + try: + with self.connection.cursor() as cursor: + cursor.execute("SHOW DATABASES") + result = cursor.fetchall() + databases = [db['Database'] for db in result] + return True, databases + except Exception as e: + return False, f"获取数据库列表失败: {str(e)}" + + def use_database(self, database): + """切换到指定数据库""" + if not self.connection: + return False, "未连接到数据库" + try: + self.connection.select_db(database) + self.database = database + return True, f"已切换到数据库: {database}" + except Exception as e: + return False, f"切换数据库失败: {str(e)}" + + def close(self): + """关闭数据库连接""" + if self.connection: + self.connection.close() + self.connection = None + return "MySQL 连接已关闭" + return "连接已经关闭" diff --git a/postgres_db.py b/postgres_db.py new file mode 100644 index 0000000..d6bc295 --- /dev/null +++ b/postgres_db.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +PostgreSQL 数据库操作模块 +支持基本的增删改查操作 +""" + +import sys +try: + import psycopg2 + from psycopg2.extras import RealDictCursor +except ImportError as e: + print(f"错误: 无法导入 psycopg2 库,请确保已安装: pip install psycopg2-binary") + print(f"详细错误: {e}") + sys.exit(1) + + +class PostgreSQLDatabase: + """PostgreSQL 数据库连接和操作类""" + + def __init__(self, host='localhost', port=5432, user='postgres', password='', database='postgres'): + """ + 初始化 PostgreSQL 连接参数 + :param host: 数据库主机地址 + :param port: 数据库端口 + :param user: 用户名 + :param password: 密码 + :param database: 数据库名(默认postgres系统数据库) + """ + self.host = host + self.port = port + self.user = user + self.password = password + self.database = database + self.connection = None + + def connect(self): + """建立数据库连接""" + try: + self.connection = psycopg2.connect( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.database, + cursor_factory=RealDictCursor, + connect_timeout=5 + ) + return True, "PostgreSQL 连接成功" + except Exception as e: + return False, f"PostgreSQL 连接失败: {str(e)}" + + def execute(self, sql, params=None): + """ + 执行 SQL 语句 + :param sql: SQL 语句 + :param params: 参数(可选) + :return: 执行结果 + """ + if not self.connection: + return False, "未连接到数据库" + + try: + with self.connection.cursor() as cursor: + cursor.execute(sql, params) + + # 判断是查询还是修改操作 + if sql.strip().upper().startswith(('SELECT', 'SHOW', 'WITH')): + result = cursor.fetchall() + # 转换为字典列表 + return True, [dict(row) for row in result] + else: + self.connection.commit() + return True, f"影响行数: {cursor.rowcount}" + except Exception as e: + self.connection.rollback() + return False, f"执行失败: {str(e)}" + + def list_databases(self): + """列出所有数据库""" + if not self.connection: + return False, "未连接到数据库" + try: + with self.connection.cursor() as cursor: + cursor.execute("SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname") + result = cursor.fetchall() + databases = [db['datname'] for db in result] + return True, databases + except Exception as e: + return False, f"获取数据库列表失败: {str(e)}" + + def use_database(self, database): + """切换到指定数据库(需要重新连接)""" + if not self.connection: + return False, "未连接到数据库" + try: + # PostgreSQL 需要重新连接来切换数据库 + self.connection.close() + self.database = database + return self.connect() + except Exception as e: + return False, f"切换数据库失败: {str(e)}" + + def close(self): + """关闭数据库连接""" + if self.connection: + self.connection.close() + self.connection = None + return "PostgreSQL 连接已关闭" + return "连接已经关闭" diff --git a/redis_db.py b/redis_db.py new file mode 100644 index 0000000..ed14016 --- /dev/null +++ b/redis_db.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Redis 数据库操作模块 +支持基本的键值操作 +""" + +import sys +import os +import json + +# 避免文件名冲突,确保导入正确的redis库 +try: + from redis import Redis as RedisClient + from redis import ConnectionError as RedisConnectionError +except ImportError as e: + print(f"错误: 无法导入 redis 库,请确保已安装: pip install redis") + print(f"详细错误: {e}") + sys.exit(1) + + +class RedisDatabase: + """Redis 数据库连接和操作类""" + + def __init__(self, host='localhost', port=6379, db=0, password=None): + """ + 初始化 Redis 连接参数 + :param host: Redis 主机地址 + :param port: Redis 端口 + :param db: 数据库编号(0-15) + :param password: 密码(可选) + """ + self.host = host + self.port = port + self.db = db + self.password = password + self.client = None + + def connect(self): + """建立数据库连接""" + try: + self.client = RedisClient( + host=self.host, + port=self.port, + db=self.db, + password=self.password, + decode_responses=True, + socket_connect_timeout=5 + ) + # 测试连接 + self.client.ping() + return True, "Redis 连接成功" + except RedisConnectionError as e: + return False, f"Redis 连接失败: 无法连接到 Redis 服务器 - {str(e)}" + except Exception as e: + return False, f"Redis 连接失败: {str(e)}" + + def execute(self, command): + """ + 执行 Redis 命令 + :param command: Redis 命令字符串 + 示例: "SET key value" 或 "GET key" + """ + if not self.client: + return False, "未连接到数据库" + + try: + # 解析命令 + parts = command.strip().split() + if not parts: + return False, "命令不能为空" + + cmd = parts[0].upper() + args = parts[1:] + + # 执行常用命令 + if cmd == 'SET' and len(args) >= 2: + key = args[0] + value = ' '.join(args[1:]) + result = self.client.set(key, value) + return True, "OK" if result else "FAIL" + + elif cmd == 'GET' and len(args) >= 1: + key = args[0] + result = self.client.get(key) + return True, result if result is not None else "(nil)" + + elif cmd == 'DEL' and len(args) >= 1: + result = self.client.delete(*args) + return True, f"删除了 {result} 个键" + + elif cmd == 'EXISTS' and len(args) >= 1: + result = self.client.exists(*args) + return True, f"存在 {result} 个键" + + elif cmd == 'KEYS' and len(args) >= 1: + pattern = args[0] + result = self.client.keys(pattern) + return True, result + + elif cmd == 'HSET' and len(args) >= 3: + key = args[0] + field = args[1] + value = ' '.join(args[2:]) + result = self.client.hset(key, field, value) + return True, f"设置字段成功: {result}" + + elif cmd == 'HGET' and len(args) >= 2: + key = args[0] + field = args[1] + result = self.client.hget(key, field) + return True, result if result is not None else "(nil)" + + elif cmd == 'HGETALL' and len(args) >= 1: + key = args[0] + result = self.client.hgetall(key) + return True, result + + elif cmd == 'LPUSH' and len(args) >= 2: + key = args[0] + result = self.client.lpush(key, *args[1:]) + return True, f"列表长度: {result}" + + elif cmd == 'LRANGE' and len(args) >= 3: + key = args[0] + start = int(args[1]) + stop = int(args[2]) + result = self.client.lrange(key, start, stop) + return True, result + + elif cmd == 'SADD' and len(args) >= 2: + key = args[0] + result = self.client.sadd(key, *args[1:]) + return True, f"添加了 {result} 个元素" + + elif cmd == 'SMEMBERS' and len(args) >= 1: + key = args[0] + result = self.client.smembers(key) + return True, list(result) + + elif cmd == 'PING': + result = self.client.ping() + return True, "PONG" if result else "FAIL" + + elif cmd == 'FLUSHDB': + result = self.client.flushdb() + return True, "OK" if result else "FAIL" + + elif cmd == 'DBSIZE': + result = self.client.dbsize() + return True, f"数据库键数量: {result}" + + else: + # 尝试执行原始命令 + result = self.client.execute_command(*parts) + return True, result + + except Exception as e: + return False, f"执行失败: {str(e)}" + + def close(self): + """关闭数据库连接""" + if self.client: + self.client.close() + self.client = None + return "Redis 连接已关闭" + return "连接已经关闭" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ae881e6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,16 @@ +# 数据库交互脚本依赖包 +# 安装命令: pip install -r requirements.txt + +# MySQL 数据库 +pymysql>=1.0.2 + +# MongoDB 数据库 +pymongo>=4.0.0 + +# Redis 数据库 +redis>=4.0.0 + +# PostgreSQL 数据库 +psycopg2-binary>=2.9.0 + +# SQLite 是 Python 内置模块,无需额外安装 diff --git a/sqlite_db.py b/sqlite_db.py new file mode 100644 index 0000000..14c1318 --- /dev/null +++ b/sqlite_db.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +SQLite 数据库操作模块 +支持基本的增删改查操作 +""" + +import sqlite3 + + +class SQLiteDatabase: + """SQLite 数据库连接和操作类""" + + def __init__(self, database='database.db'): + """ + 初始化 SQLite 连接参数 + :param database: 数据库文件路径 + """ + self.database = database + self.connection = None + + def connect(self): + """建立数据库连接""" + try: + self.connection = sqlite3.connect(self.database) + # 设置行工厂,使查询结果以字典形式返回 + self.connection.row_factory = sqlite3.Row + return True, f"SQLite 连接成功 (数据库: {self.database})" + except Exception as e: + return False, f"SQLite 连接失败: {str(e)}" + + def execute(self, sql, params=None): + """ + 执行 SQL 语句或 SQLite 点命令 + :param sql: SQL 语句或点命令(如 .tables, .schema) + :param params: 参数(可选) + :return: 执行结果 + """ + if not self.connection: + return False, "未连接到数据库" + + # 处理 SQLite 点命令 + cmd = sql.strip() + if cmd.startswith('.'): + try: + # .tables - 列出所有表 + if cmd in ['.tables', '.table']: + cursor = self.connection.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + tables = [row[0] for row in cursor.fetchall()] + return True, tables + + # .schema - 显示所有表的结构 + elif cmd == '.schema': + cursor = self.connection.cursor() + cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND sql IS NOT NULL ORDER BY name") + schemas = [row[0] for row in cursor.fetchall()] + return True, schemas + + # .schema
- 显示特定表的结构 + elif cmd.startswith('.schema '): + table_name = cmd.split(maxsplit=1)[1] + cursor = self.connection.cursor() + cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) + result = cursor.fetchone() + if result: + return True, result[0] + else: + return False, f"表 '{table_name}' 不存在" + + # .databases - 列出数据库信息 + elif cmd in ['.databases', '.database']: + return True, [f"main: {self.database}"] + + # .indexes - 列出所有索引 + elif cmd in ['.indexes', '.index']: + cursor = self.connection.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='index' ORDER BY name") + indexes = [row[0] for row in cursor.fetchall()] + return True, indexes + + # .indexes
- 列出特定表的索引 + elif cmd.startswith('.indexes ') or cmd.startswith('.index '): + table_name = cmd.split(maxsplit=1)[1] + cursor = self.connection.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND tbl_name=? ORDER BY name", (table_name,)) + indexes = [row[0] for row in cursor.fetchall()] + return True, indexes + + else: + return False, f"不支持的点命令: {cmd.split()[0]}" + + except Exception as e: + return False, f"执行失败: {str(e)}" + + # 执行标准 SQL 语句 + try: + cursor = self.connection.cursor() + cursor.execute(sql, params or ()) + + # 判断是查询还是修改操作 + if sql.strip().upper().startswith(('SELECT', 'PRAGMA')): + rows = cursor.fetchall() + # 转换为字典列表 + result = [dict(row) for row in rows] + return True, result + else: + self.connection.commit() + return True, f"影响行数: {cursor.rowcount}" + except Exception as e: + self.connection.rollback() + return False, f"执行失败: {str(e)}" + + def close(self): + """关闭数据库连接""" + if self.connection: + self.connection.close() + self.connection = None + return "SQLite 连接已关闭" + return "连接已经关闭"