XHS / orchestrator /feishu_sync.py
Trae Bot
Upload Spider_XHS project
c481f8a
import sqlite3
import os
import sys
from datetime import datetime
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from orchestrator.config import DB_PATH
def ensure_last_sync_at_column():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Check if last_sync_at exists
cursor.execute("PRAGMA table_info(generated_post)")
columns = [info[1] for info in cursor.fetchall()]
if 'last_sync_at' not in columns:
cursor.execute("ALTER TABLE generated_post ADD COLUMN last_sync_at TEXT")
conn.commit()
cursor.execute("PRAGMA table_info(lead)")
columns = [info[1] for info in cursor.fetchall()]
if 'last_sync_at' not in columns:
cursor.execute("ALTER TABLE lead ADD COLUMN last_sync_at TEXT")
conn.commit()
conn.close()
def sync_to_feishu():
ensure_last_sync_at_column()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# 1. Sync generated_post
cursor.execute("SELECT * FROM generated_post WHERE last_sync_at IS NULL")
records = cursor.fetchall()
if records:
print(f"[Feishu Sync] 找到 {len(records)} 条新生成的帖子需要同步至飞书多维表格。")
for row in records:
record_id = row['id']
print(f" -> 模拟同步帖子 ID: {record_id}")
current_time = datetime.now().isoformat()
cursor.execute("UPDATE generated_post SET last_sync_at = ? WHERE id = ?", (current_time, record_id))
# 2. Sync leads
cursor.execute("SELECT * FROM lead WHERE last_sync_at IS NULL")
lead_records = cursor.fetchall()
if lead_records:
print(f"[Feishu Sync] 找到 {len(lead_records)} 条新线索需要同步至飞书多维表格。")
for row in lead_records:
record_id = row['id']
print(f" -> 模拟同步线索 ID: {record_id}, 状态: {row['status']}")
current_time = datetime.now().isoformat()
cursor.execute("UPDATE lead SET last_sync_at = ? WHERE id = ?", (current_time, record_id))
if not records and not lead_records:
print("[Feishu Sync] 没有需要同步的新记录。")
conn.commit()
print("[Feishu Sync] 同步检查完成。")
conn.close()
if __name__ == '__main__':
sync_to_feishu()