File size: 6,363 Bytes
e5ab217 2af9cda 17cf1d9 2af9cda e5ab217 2af9cda ac4e854 e5ab217 ac4e854 2af9cda 527b9a1 e5ab217 ae5a136 e5ab217 b1617f1 e5ab217 b1617f1 e5ab217 b1617f1 e5ab217 b1617f1 e5ab217 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
from fastapi import FastAPI, BackgroundTasks, HTTPException, Request
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
import logging
import os
import json
# Import agents (we will create these files next)
from agents.technical_auditor import TechnicalAuditorAgent
from agents.content_optimizer import ContentOptimizationAgent
from agents.competitor_intelligence import CompetitorIntelligenceAgent
from agents.backlink_indexing import BacklinkIndexingAgent
from agents.performance_analytics import PerformanceAnalyticsAgent
from agents.orchestrator import OrchestratorAgent
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="SEO Multi-Agent System", version="1.0.0")
def setup_credentials_from_env():
"""Security: Write env vars to files so agents can use them without committing secrets to repo"""
os.makedirs("/app/credentials", exist_ok=True)
# GSC
gsc_json = os.environ.get("GSC_CREDENTIALS")
if gsc_json:
print("Loading GSC credentials from Environment Variable")
with open("/app/credentials/gsc-credentials.json", "w") as f:
f.write(gsc_json)
# GA4
ga4_json = os.environ.get("GA4_CREDENTIALS")
if ga4_json:
print("Loading GA4 credentials from Environment Variable")
with open("/app/credentials/ga4-credentials.json", "w") as f:
f.write(ga4_json)
# DEBUG: Print the email being used
try:
with open("/app/credentials/gsc-credentials.json", "r") as f:
creds = json.load(f)
print(f"🕵️ DEBUG: Using Service Account Email: {creds.get('client_email')}")
print(f"🕵️ DEBUG: Make sure THIS email is an Owner in GSC for https://fixyfile.com")
except Exception as e:
print(f"Could not read credentials for debug: {e}")
# Run setup immediately
setup_credentials_from_env()
def get_credential_path(filename):
"""Helper to find credentials in common paths"""
possible_paths = [
f"/app/credentials/{filename}",
f"/app/{filename}",
f"./{filename}",
filename
]
for path in possible_paths:
if os.path.exists(path):
return path
return f"/app/credentials/{filename}" # Default
# Initialize agents
# Note: In a real deployment, credentials would be loaded from env vars or mounted secrets
technical_agent = TechnicalAuditorAgent()
content_agent = ContentOptimizationAgent()
competitor_agent = CompetitorIntelligenceAgent()
indexing_agent = BacklinkIndexingAgent(
gsc_credentials_path=get_credential_path("gsc-credentials.json"),
site_url="https://fixyfile.com"
)
analytics_agent = PerformanceAnalyticsAgent(
ga4_property_id="YOUR_GA4_ID", # Replace with actual if known
ga4_credentials_path=get_credential_path("ga4-credentials.json"),
gsc_credentials_path=get_credential_path("gsc-credentials.json"),
# CRITICAL: For Domain Properties (fixyfile.com), use 'sc-domain:' prefix
site_url="sc-domain:fixyfile.com"
)
# Orchestrator no longer needs Redis as we moved queue logic to Cloudflare D1
orchestrator = OrchestratorAgent(redis_host=None, redis_port=None, agents={})
# --- MODELS ---
class PageAuditRequest(BaseModel):
url: str
checks: Optional[List[str]] = ["all"]
class ContentOptimizeRequest(BaseModel):
url: str
tasks: Optional[List[str]] = ["all"]
class CompetitorAnalysisRequest(BaseModel):
keyword: str
class IndexingRequest(BaseModel):
urls: List[str]
class WorkflowTrigger(BaseModel):
workflow_name: str
payload: Dict[str, Any]
# --- ENDPOINTS ---
@app.get("/")
async def root():
return {
"service": "SEO Multi-Agent System",
"status": "running",
"agents": ["technical", "content", "competitor", "indexing", "performance"]
}
@app.get("/health")
async def health_check():
# Basic check, can expand to check agents
return {"status": "healthy", "orchestrator": "online"}
# TECHNICAL AUDITOR
@app.post("/audit/execute")
async def execute_audit(request: PageAuditRequest, background_tasks: BackgroundTasks):
background_tasks.add_task(technical_agent.audit_page, request.url)
return {"status": "started", "url": request.url}
# CONTENT OPTIMIZER
@app.post("/optimize")
async def optimize_content(request: ContentOptimizeRequest):
result = content_agent.analyze_page(request.url)
return result
# COMPETITOR INTELLIGENCE
@app.post("/competitor/analyze")
async def analyze_competitor(request: CompetitorAnalysisRequest, background_tasks: BackgroundTasks):
background_tasks.add_task(competitor_agent.generate_competitive_report, request.keyword)
return {"status": "started", "keyword": request.keyword}
# INDEXING
@app.post("/index/submit")
async def submit_indexing(request: IndexingRequest):
results = indexing_agent.auto_submit_new_pages(request.urls)
return results
@app.get("/index/status")
async def indexing_status():
status = indexing_agent.check_indexing_status()
errors = indexing_agent.get_indexing_errors()
return {"indexed_pages": status, "errors": errors}
# PERFORMANCE & ANALYTICS
@app.get("/analytics/underperforming")
async def get_underperforming():
try:
pages = analytics_agent.get_underperforming_pages(days=30)
return pages
except Exception as e:
logger.error(f"Error getting underperforming pages: {e}")
return []
@app.get("/report/weekly")
async def weekly_report():
try:
report = analytics_agent.generate_weekly_report()
return report
except Exception as e:
logger.error(f"Error generating weekly report: {e}")
return {"error": str(e)}
@app.get("/monitor/algorithm")
async def check_algorithm():
try:
status = analytics_agent.detect_algorithm_update()
return status
except Exception as e:
return {"volatility_detected": False, "error": str(e)}
@app.get("/monitor/cwv")
async def check_cwv(url: str):
result = analytics_agent.monitor_core_web_vitals(url)
return result
# ORCHESTRATOR
@app.post("/workflow/trigger")
async def trigger_workflow(request: WorkflowTrigger):
workflow_id = orchestrator.create_workflow(
request.workflow_name,
request.payload
)
return {"workflow_id": workflow_id, "status": "started"}
|