import os import time from server.celery_app import celery_app from server.db import SessionLocal, Action from datetime import datetime @celery_app.task(name="server.executor.execute_content_task", bind=True, max_retries=3) def execute_content_task(self, action_id: int): """Executes a content-related task (e.g. Generate Article).""" db = SessionLocal() try: action = db.query(Action).filter(Action.id == action_id).first() if not action: return {"status": "error", "message": "Action not found"} action.status = "executing" db.commit() # Phase 3.5: Record Snapshot BEFORE from server import impact_engine impact_engine.record_initial_metrics(action_id) # Simulate execution (Here we would call OpenAI/Groq to write an article) print(f"Executing content task: {action.task}") time.sleep(3) # Simulate Impact Tracking & Result action.status = "done" action.result = { "message": "Content generated successfully", "traffic_gain_est": "15%", "words": 1200 } # Phase 3.5: Update Snapshot AFTER from server import impact_engine impact_engine.update_action_impact(action_id) db.commit() return {"status": "success", "action_id": action_id} except Exception as e: db.rollback() raise self.retry(exc=e, countdown=5) finally: db.close() @celery_app.task(name="server.executor.execute_technical_task", bind=True, max_retries=3) def execute_technical_task(self, action_id: int): """Executes SEO/technical/outreach task.""" db = SessionLocal() try: action = db.query(Action).filter(Action.id == action_id).first() if not action: return {"status": "error", "message": "Action not found"} action.status = "executing" db.commit() # Simulate execution print(f"Executing technical task: {action.task}") time.sleep(2) action.status = "done" action.result = { "message": "Task processed successfully", "impact_est": "High" } # Phase 3.5: Update Impact for tech task as well from server import impact_engine impact_engine.update_action_impact(action_id) # Handle Outreach Specifically if action.type == 'outreach': from server import outreach_engine # Simplification: we mark it sent if it's an outreach action # In a real system, we'd lookup the Lead by action context print("Processing outreach campaign step...") db.commit() return {"status": "success"} except Exception as e: db.rollback() raise self.retry(exc=e, countdown=5) finally: db.close() def dispatch_action(action_id: int, action_type: str): """Dispatches the action to the right executor based on type.""" # Ensure celery is loaded act_type = str(action_type).lower() if act_type == 'content': execute_content_task.delay(action_id) else: execute_technical_task.delay(action_id) return True