You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
69 lines
2.3 KiB
69 lines
2.3 KiB
import pymysql
|
|
from typing import List, Dict, Any
|
|
from config import MYSQL_CONFIG
|
|
|
|
class MySQLClient:
|
|
"""MySQL客户端类"""
|
|
|
|
def __init__(self):
|
|
self.connection = None
|
|
|
|
def connect(self):
|
|
"""建立数据库连接"""
|
|
try:
|
|
self.connection = pymysql.connect(
|
|
host=MYSQL_CONFIG["host"],
|
|
port=MYSQL_CONFIG["port"],
|
|
user=MYSQL_CONFIG["user"],
|
|
password=MYSQL_CONFIG["password"],
|
|
database=MYSQL_CONFIG["database"],
|
|
charset=MYSQL_CONFIG["charset"],
|
|
cursorclass=pymysql.cursors.DictCursor
|
|
)
|
|
print(f"MySQL数据库连接成功 - {MYSQL_CONFIG['host']}:{MYSQL_CONFIG['port']}/{MYSQL_CONFIG['database']}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"MySQL数据库连接失败: {e}")
|
|
return False
|
|
|
|
def execute_query(self, sql: str, params: tuple = None) -> List[Dict[str, Any]]:
|
|
"""执行查询语句"""
|
|
if not self.connection and not self.connect():
|
|
return []
|
|
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
cursor.execute(sql, params)
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
print(f"执行查询失败: {e}")
|
|
return []
|
|
|
|
def execute_update(self, sql: str, params: tuple = None) -> int:
|
|
"""执行更新语句(INSERT, UPDATE, DELETE)"""
|
|
if not self.connection and not self.connect():
|
|
return 0
|
|
|
|
try:
|
|
with self.connection.cursor() as cursor:
|
|
result = cursor.execute(sql, params)
|
|
self.connection.commit()
|
|
print(f"执行更新成功,影响行数: {result}")
|
|
return result
|
|
except Exception as e:
|
|
print(f"执行更新失败: {e}")
|
|
self.connection.rollback()
|
|
return 0
|
|
|
|
def is_connected(self) -> bool:
|
|
"""检查数据库连接状态"""
|
|
if not self.connection:
|
|
return False
|
|
try:
|
|
self.connection.ping(reconnect=True)
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
# 创建全局MySQL客户端实例
|
|
mysql_client = MySQLClient()
|