Spaces:
Running
Running
File size: 5,201 Bytes
b08d8a8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
#!/usr/bin/env python3
"""
Automation Runner
Main orchestrator that runs all automation tasks:
- File watching
- Scheduled Google Sheets sync
- Initial conversion
"""
import json
import time
import signal
import sys
import threading
import subprocess
from pathlib import Path
from datetime import datetime
# Paths
BASE_DIR = Path(__file__).parent.parent
CONFIG_FILE = BASE_DIR / "config.json"
SHEETS_SYNC_SCRIPT = BASE_DIR / "scripts" / "sheets_sync.py"
FILE_WATCHER_SCRIPT = BASE_DIR / "scripts" / "file_watcher.py"
CSV_TO_JSON_SCRIPT = BASE_DIR / "scripts" / "csv_to_json.py"
class AutomationRunner:
"""Manages all automation tasks"""
def __init__(self):
self.config = self.load_config()
self.running = True
self.watcher_process = None
self.sync_thread = None
# Set up signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, sig, frame):
"""Handle shutdown signals"""
print("\n\n🛑 Shutdown signal received...")
self.running = False
def load_config(self):
"""Load configuration"""
if not CONFIG_FILE.exists():
print(f"❌ Config file not found: {CONFIG_FILE}")
sys.exit(1)
with open(CONFIG_FILE, 'r') as f:
return json.load(f)
def run_initial_conversion(self):
"""Run initial CSV-to-JSON conversion"""
print("🔄 Running initial CSV-to-JSON conversion...")
try:
subprocess.run([sys.executable, str(CSV_TO_JSON_SCRIPT)], check=True)
except subprocess.CalledProcessError:
print("⚠️ Initial conversion failed (this is OK if CSV files don't exist yet)")
def start_file_watcher(self):
"""Start the file watcher in a separate process"""
if not self.config.get('file_watcher', {}).get('enabled', True):
print("⚠️ File watcher disabled in config")
return
print("👀 Starting file watcher...")
self.watcher_process = subprocess.Popen(
[sys.executable, str(FILE_WATCHER_SCRIPT)],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
# Stream output in a separate thread
def stream_output():
for line in self.watcher_process.stdout:
print(line, end='')
threading.Thread(target=stream_output, daemon=True).start()
def sync_sheets_once(self):
"""Run Google Sheets sync once"""
print(f"\n⏰ [{datetime.now().strftime('%H:%M:%S')}] Running Google Sheets sync...")
try:
result = subprocess.run(
[sys.executable, str(SHEETS_SYNC_SCRIPT)],
capture_output=True,
text=True
)
print(result.stdout)
if result.returncode != 0:
print(result.stderr)
except Exception as e:
print(f"❌ Sync error: {e}")
def start_scheduled_sync(self):
"""Start scheduled Google Sheets sync"""
sheets_config = self.config.get('google_sheets', {})
if not sheets_config.get('enabled', False):
print("⚠️ Google Sheets sync disabled in config")
return
interval_minutes = sheets_config.get('sync_interval_minutes', 5)
print(f"⏰ Starting scheduled sync (every {interval_minutes} minutes)...")
def sync_loop():
# Run initial sync
self.sync_sheets_once()
# Continue syncing at intervals
while self.running:
time.sleep(interval_minutes * 60)
if self.running:
self.sync_sheets_once()
self.sync_thread = threading.Thread(target=sync_loop, daemon=True)
self.sync_thread.start()
def run(self):
"""Main run loop"""
print("=" * 60)
print("🚀 Dialect Map Automation System")
print("=" * 60)
print()
# Run initial conversion
self.run_initial_conversion()
print()
# Start file watcher
self.start_file_watcher()
# Start scheduled sync
self.start_scheduled_sync()
print()
print("=" * 60)
print("✅ All automation tasks started!")
print("=" * 60)
print("\n💡 Press Ctrl+C to stop all automation\n")
# Keep running until stopped
try:
while self.running:
time.sleep(1)
except KeyboardInterrupt:
pass
# Cleanup
print("\n🧹 Cleaning up...")
if self.watcher_process:
self.watcher_process.terminate()
self.watcher_process.wait()
print("✅ Automation stopped gracefully\n")
def main():
"""Main entry point"""
runner = AutomationRunner()
runner.run()
if __name__ == "__main__":
main()
|