Spaces:
Sleeping
Sleeping
File size: 5,285 Bytes
84ed1d1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# '''
# Author: mdhuang555 67590178+mdhuang555@users.noreply.github.com
# Date: 2025-03-30 15:57:22
# LastEditors: mdhuang555 67590178+mdhuang555@users.noreply.github.com
# LastEditTime: 2025-04-03 11:32:30
# FilePath: \Notyif\dataBaseConnecter.py
# Description: 数据库连接器,支持SSL连接
# '''
import socket
import json
import mysql.connector
from typing import Dict, Any, Optional
import yaml
from pathlib import Path
class DatabaseConnector:
def __init__(self, port: int = 3306): # Removed host parameter
# self.host is now determined after loading config
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.config = self._load_config()
# Set server host from config or default to '0.0.0.0' for broader accessibility
self.host = self.config.get('server', {}).get('host', '0.0.0.0')
def _load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
try:
config_path = Path(__file__).parent / "Notify_config.yaml" # Load from Notify_config.yaml
with open(config_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
except Exception as e:
print(f"加载配置文件错误: {e}")
return {}
def connect_db(self) -> Optional[mysql.connector.MySQLConnection]:
"""连接到MySQL数据库,使用SSL连接"""
try:
# 获取SSL证书路径
current_dir = Path(__file__).parent.absolute()
ssl_ca_path = current_dir / "DigiCertGlobalRootCA.crt.pem"
# 确保SSL证书文件存在
if not ssl_ca_path.exists():
raise FileNotFoundError(f"SSL证书文件未找到: {ssl_ca_path}")
# 建立数据库连接
conn = mysql.connector.connect(
host=self.config["mysql"]["host"],
port=self.config["mysql"].get("port", 3306),
user=self.config["mysql"]["user"],
password=self.config["mysql"]["password"],
database=self.config["mysql"]["database"],
ssl_ca=str(ssl_ca_path),
ssl_disabled=False,
charset='utf8mb4',
collation='utf8mb4_unicode_ci'
)
return conn
except Exception as e:
print(f"数据库连接错误: {e}")
return None
def extract_text(self, conn: mysql.connector.MySQLConnection, table: str, column: str) -> list:
"""从指定表格和列中提取文本"""
try:
cursor = conn.cursor(dictionary=True)
# 如果请求所有列,则获取完整的行数据
if column == '*':
query = f"SELECT * FROM {table}"
else:
query = f"SELECT {column} FROM {table}"
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
return results
except Exception as e:
print(f"提取文本错误: {e}")
return []
def start_server(self):
"""启动服务器监听请求"""
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
print(f"服务器启动在 {self.host}:{self.port}")
while True:
try:
client_socket, address = self.server_socket.accept()
print(f"接受来自 {address} 的连接")
# 接收客户端请求
data = client_socket.recv(1024).decode('utf-8')
request = json.loads(data)
# 处理请求
table = request.get('table')
column = request.get('column')
# 连接数据库并提取文本
conn = self.connect_db()
if conn:
try:
results = self.extract_text(conn, table, column)
response = {'status': 'success', 'data': results}
except Exception as e:
response = {'status': 'error', 'message': str(e)}
finally:
conn.close()
else:
response = {'status': 'error', 'message': '数据库连接失败'}
# 发送响应
response_data = json.dumps(response, ensure_ascii=False)
response_bytes = response_data.encode('utf-8')
# 先发送数据长度
length_prefix = len(response_bytes).to_bytes(4, byteorder='big')
client_socket.send(length_prefix)
# 再发送实际数据
client_socket.send(response_bytes)
client_socket.close()
except Exception as e:
print(f"处理请求错误: {e}")
continue
if __name__ == "__main__":
server = DatabaseConnector()
server.start_server() |