ToDoAgent / Notify /dataBaseConnecter.py
Siyu Wang
updated to KK_Server
84ed1d1
# '''
# Author: mdhuang555 67590178+mdhuang555@users.noreply.github.com
# Date: 2025-03-30 15:57:22
# LastEditors: mdhuang555 67590178+mdhuang555@users.noreply.github.com
# LastEditTime: 2025-04-03 11:32:30
# FilePath: \Notyif\dataBaseConnecter.py
# Description: 数据库连接器,支持SSL连接
# '''
import socket
import json
import mysql.connector
from typing import Dict, Any, Optional
import yaml
from pathlib import Path
class DatabaseConnector:
def __init__(self, port: int = 3306): # Removed host parameter
# self.host is now determined after loading config
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.config = self._load_config()
# Set server host from config or default to '0.0.0.0' for broader accessibility
self.host = self.config.get('server', {}).get('host', '0.0.0.0')
def _load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
try:
config_path = Path(__file__).parent / "Notify_config.yaml" # Load from Notify_config.yaml
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
except Exception as e:
print(f"加载配置文件错误: {e}")
return {}
def connect_db(self) -> Optional[mysql.connector.MySQLConnection]:
"""连接到MySQL数据库,使用SSL连接"""
try:
# 获取SSL证书路径
current_dir = Path(__file__).parent.absolute()
ssl_ca_path = current_dir / "DigiCertGlobalRootCA.crt.pem"
# 确保SSL证书文件存在
if not ssl_ca_path.exists():
raise FileNotFoundError(f"SSL证书文件未找到: {ssl_ca_path}")
# 建立数据库连接
conn = mysql.connector.connect(
host=self.config["mysql"]["host"],
port=self.config["mysql"].get("port", 3306),
user=self.config["mysql"]["user"],
password=self.config["mysql"]["password"],
database=self.config["mysql"]["database"],
ssl_ca=str(ssl_ca_path),
ssl_disabled=False,
charset='utf8mb4',
collation='utf8mb4_unicode_ci'
)
return conn
except Exception as e:
print(f"数据库连接错误: {e}")
return None
def extract_text(self, conn: mysql.connector.MySQLConnection, table: str, column: str) -> list:
"""从指定表格和列中提取文本"""
try:
cursor = conn.cursor(dictionary=True)
# 如果请求所有列,则获取完整的行数据
if column == '*':
query = f"SELECT * FROM {table}"
else:
query = f"SELECT {column} FROM {table}"
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"提取文本错误: {e}")
return []
def start_server(self):
"""启动服务器监听请求"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器启动在 {self.host}:{self.port}")
while True:
try:
client_socket, address = self.server_socket.accept()
print(f"接受来自 {address} 的连接")
# 接收客户端请求
data = client_socket.recv(1024).decode('utf-8')
request = json.loads(data)
# 处理请求
table = request.get('table')
column = request.get('column')
# 连接数据库并提取文本
conn = self.connect_db()
if conn:
try:
results = self.extract_text(conn, table, column)
response = {'status': 'success', 'data': results}
except Exception as e:
response = {'status': 'error', 'message': str(e)}
finally:
conn.close()
else:
response = {'status': 'error', 'message': '数据库连接失败'}
# 发送响应
response_data = json.dumps(response, ensure_ascii=False)
response_bytes = response_data.encode('utf-8')
# 先发送数据长度
length_prefix = len(response_bytes).to_bytes(4, byteorder='big')
client_socket.send(length_prefix)
# 再发送实际数据
client_socket.send(response_bytes)
client_socket.close()
except Exception as e:
print(f"处理请求错误: {e}")
continue
if __name__ == "__main__":
server = DatabaseConnector()
server.start_server()