from fastapi import FastAPI, BackgroundTasks, HTTPException, Request from pydantic import BaseModel from typing import List, Optional, Dict, Any import logging import os import json # Import agents (we will create these files next) from agents.technical_auditor import TechnicalAuditorAgent from agents.content_optimizer import ContentOptimizationAgent from agents.competitor_intelligence import CompetitorIntelligenceAgent from agents.backlink_indexing import BacklinkIndexingAgent from agents.performance_analytics import PerformanceAnalyticsAgent from agents.orchestrator import OrchestratorAgent logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="SEO Multi-Agent System", version="1.0.0") def setup_credentials_from_env(): """Security: Write env vars to files so agents can use them without committing secrets to repo""" os.makedirs("/app/credentials", exist_ok=True) # GSC gsc_json = os.environ.get("GSC_CREDENTIALS") if gsc_json: print("Loading GSC credentials from Environment Variable") with open("/app/credentials/gsc-credentials.json", "w") as f: f.write(gsc_json) # GA4 ga4_json = os.environ.get("GA4_CREDENTIALS") if ga4_json: print("Loading GA4 credentials from Environment Variable") with open("/app/credentials/ga4-credentials.json", "w") as f: f.write(ga4_json) # DEBUG: Print the email being used try: with open("/app/credentials/gsc-credentials.json", "r") as f: creds = json.load(f) print(f"🕵️ DEBUG: Using Service Account Email: {creds.get('client_email')}") print(f"🕵️ DEBUG: Make sure THIS email is an Owner in GSC for https://fixyfile.com") except Exception as e: print(f"Could not read credentials for debug: {e}") # Run setup immediately setup_credentials_from_env() def get_credential_path(filename): """Helper to find credentials in common paths""" possible_paths = [ f"/app/credentials/{filename}", f"/app/{filename}", f"./{filename}", filename ] for path in possible_paths: if os.path.exists(path): return path return f"/app/credentials/{filename}" # Default # Initialize agents # Note: In a real deployment, credentials would be loaded from env vars or mounted secrets technical_agent = TechnicalAuditorAgent() content_agent = ContentOptimizationAgent() competitor_agent = CompetitorIntelligenceAgent() indexing_agent = BacklinkIndexingAgent( gsc_credentials_path=get_credential_path("gsc-credentials.json"), site_url="https://fixyfile.com" ) analytics_agent = PerformanceAnalyticsAgent( ga4_property_id="YOUR_GA4_ID", # Replace with actual if known ga4_credentials_path=get_credential_path("ga4-credentials.json"), gsc_credentials_path=get_credential_path("gsc-credentials.json"), # CRITICAL: For Domain Properties (fixyfile.com), use 'sc-domain:' prefix site_url="sc-domain:fixyfile.com" ) # Orchestrator no longer needs Redis as we moved queue logic to Cloudflare D1 orchestrator = OrchestratorAgent(redis_host=None, redis_port=None, agents={}) # --- MODELS --- class PageAuditRequest(BaseModel): url: str checks: Optional[List[str]] = ["all"] class ContentOptimizeRequest(BaseModel): url: str tasks: Optional[List[str]] = ["all"] class CompetitorAnalysisRequest(BaseModel): keyword: str class IndexingRequest(BaseModel): urls: List[str] class WorkflowTrigger(BaseModel): workflow_name: str payload: Dict[str, Any] # --- ENDPOINTS --- @app.get("/") async def root(): return { "service": "SEO Multi-Agent System", "status": "running", "agents": ["technical", "content", "competitor", "indexing", "performance"] } @app.get("/health") async def health_check(): # Basic check, can expand to check agents return {"status": "healthy", "orchestrator": "online"} # TECHNICAL AUDITOR @app.post("/audit/execute") async def execute_audit(request: PageAuditRequest, background_tasks: BackgroundTasks): background_tasks.add_task(technical_agent.audit_page, request.url) return {"status": "started", "url": request.url} # CONTENT OPTIMIZER @app.post("/optimize") async def optimize_content(request: ContentOptimizeRequest): result = content_agent.analyze_page(request.url) return result # COMPETITOR INTELLIGENCE @app.post("/competitor/analyze") async def analyze_competitor(request: CompetitorAnalysisRequest, background_tasks: BackgroundTasks): background_tasks.add_task(competitor_agent.generate_competitive_report, request.keyword) return {"status": "started", "keyword": request.keyword} # INDEXING @app.post("/index/submit") async def submit_indexing(request: IndexingRequest): results = indexing_agent.auto_submit_new_pages(request.urls) return results @app.get("/index/status") async def indexing_status(): status = indexing_agent.check_indexing_status() errors = indexing_agent.get_indexing_errors() return {"indexed_pages": status, "errors": errors} # PERFORMANCE & ANALYTICS @app.get("/analytics/underperforming") async def get_underperforming(): try: pages = analytics_agent.get_underperforming_pages(days=30) return pages except Exception as e: logger.error(f"Error getting underperforming pages: {e}") return [] @app.get("/report/weekly") async def weekly_report(): try: report = analytics_agent.generate_weekly_report() return report except Exception as e: logger.error(f"Error generating weekly report: {e}") return {"error": str(e)} @app.get("/monitor/algorithm") async def check_algorithm(): try: status = analytics_agent.detect_algorithm_update() return status except Exception as e: return {"volatility_detected": False, "error": str(e)} @app.get("/monitor/cwv") async def check_cwv(url: str): result = analytics_agent.monitor_core_web_vitals(url) return result # ORCHESTRATOR @app.post("/workflow/trigger") async def trigger_workflow(request: WorkflowTrigger): workflow_id = orchestrator.create_workflow( request.workflow_name, request.payload ) return {"workflow_id": workflow_id, "status": "started"}