|
|
from fastapi import FastAPI, BackgroundTasks, HTTPException, Request |
|
|
from pydantic import BaseModel |
|
|
from typing import List, Optional, Dict, Any |
|
|
import logging |
|
|
import os |
|
|
import json |
|
|
|
|
|
|
|
|
from agents.technical_auditor import TechnicalAuditorAgent |
|
|
from agents.content_optimizer import ContentOptimizationAgent |
|
|
from agents.competitor_intelligence import CompetitorIntelligenceAgent |
|
|
from agents.backlink_indexing import BacklinkIndexingAgent |
|
|
from agents.performance_analytics import PerformanceAnalyticsAgent |
|
|
from agents.orchestrator import OrchestratorAgent |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI(title="SEO Multi-Agent System", version="1.0.0") |
|
|
|
|
|
def setup_credentials_from_env(): |
|
|
"""Security: Write env vars to files so agents can use them without committing secrets to repo""" |
|
|
os.makedirs("/app/credentials", exist_ok=True) |
|
|
|
|
|
|
|
|
gsc_json = os.environ.get("GSC_CREDENTIALS") |
|
|
if gsc_json: |
|
|
print("Loading GSC credentials from Environment Variable") |
|
|
with open("/app/credentials/gsc-credentials.json", "w") as f: |
|
|
f.write(gsc_json) |
|
|
|
|
|
|
|
|
ga4_json = os.environ.get("GA4_CREDENTIALS") |
|
|
if ga4_json: |
|
|
print("Loading GA4 credentials from Environment Variable") |
|
|
with open("/app/credentials/ga4-credentials.json", "w") as f: |
|
|
f.write(ga4_json) |
|
|
|
|
|
|
|
|
try: |
|
|
with open("/app/credentials/gsc-credentials.json", "r") as f: |
|
|
creds = json.load(f) |
|
|
print(f"🕵️ DEBUG: Using Service Account Email: {creds.get('client_email')}") |
|
|
print(f"🕵️ DEBUG: Make sure THIS email is an Owner in GSC for https://fixyfile.com") |
|
|
except Exception as e: |
|
|
print(f"Could not read credentials for debug: {e}") |
|
|
|
|
|
|
|
|
setup_credentials_from_env() |
|
|
|
|
|
def get_credential_path(filename): |
|
|
"""Helper to find credentials in common paths""" |
|
|
possible_paths = [ |
|
|
f"/app/credentials/{filename}", |
|
|
f"/app/{filename}", |
|
|
f"./{filename}", |
|
|
filename |
|
|
] |
|
|
for path in possible_paths: |
|
|
if os.path.exists(path): |
|
|
return path |
|
|
return f"/app/credentials/{filename}" |
|
|
|
|
|
|
|
|
|
|
|
technical_agent = TechnicalAuditorAgent() |
|
|
content_agent = ContentOptimizationAgent() |
|
|
competitor_agent = CompetitorIntelligenceAgent() |
|
|
indexing_agent = BacklinkIndexingAgent( |
|
|
gsc_credentials_path=get_credential_path("gsc-credentials.json"), |
|
|
site_url="https://fixyfile.com" |
|
|
) |
|
|
analytics_agent = PerformanceAnalyticsAgent( |
|
|
ga4_property_id="YOUR_GA4_ID", |
|
|
ga4_credentials_path=get_credential_path("ga4-credentials.json"), |
|
|
gsc_credentials_path=get_credential_path("gsc-credentials.json"), |
|
|
|
|
|
site_url="sc-domain:fixyfile.com" |
|
|
) |
|
|
|
|
|
orchestrator = OrchestratorAgent(redis_host=None, redis_port=None, agents={}) |
|
|
|
|
|
|
|
|
|
|
|
class PageAuditRequest(BaseModel): |
|
|
url: str |
|
|
checks: Optional[List[str]] = ["all"] |
|
|
|
|
|
class ContentOptimizeRequest(BaseModel): |
|
|
url: str |
|
|
tasks: Optional[List[str]] = ["all"] |
|
|
|
|
|
class CompetitorAnalysisRequest(BaseModel): |
|
|
keyword: str |
|
|
|
|
|
class IndexingRequest(BaseModel): |
|
|
urls: List[str] |
|
|
|
|
|
class WorkflowTrigger(BaseModel): |
|
|
workflow_name: str |
|
|
payload: Dict[str, Any] |
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return { |
|
|
"service": "SEO Multi-Agent System", |
|
|
"status": "running", |
|
|
"agents": ["technical", "content", "competitor", "indexing", "performance"] |
|
|
} |
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
|
|
|
return {"status": "healthy", "orchestrator": "online"} |
|
|
|
|
|
|
|
|
@app.post("/audit/execute") |
|
|
async def execute_audit(request: PageAuditRequest, background_tasks: BackgroundTasks): |
|
|
background_tasks.add_task(technical_agent.audit_page, request.url) |
|
|
return {"status": "started", "url": request.url} |
|
|
|
|
|
|
|
|
@app.post("/optimize") |
|
|
async def optimize_content(request: ContentOptimizeRequest): |
|
|
result = content_agent.analyze_page(request.url) |
|
|
return result |
|
|
|
|
|
|
|
|
@app.post("/competitor/analyze") |
|
|
async def analyze_competitor(request: CompetitorAnalysisRequest, background_tasks: BackgroundTasks): |
|
|
background_tasks.add_task(competitor_agent.generate_competitive_report, request.keyword) |
|
|
return {"status": "started", "keyword": request.keyword} |
|
|
|
|
|
|
|
|
@app.post("/index/submit") |
|
|
async def submit_indexing(request: IndexingRequest): |
|
|
results = indexing_agent.auto_submit_new_pages(request.urls) |
|
|
return results |
|
|
|
|
|
@app.get("/index/status") |
|
|
async def indexing_status(): |
|
|
status = indexing_agent.check_indexing_status() |
|
|
errors = indexing_agent.get_indexing_errors() |
|
|
return {"indexed_pages": status, "errors": errors} |
|
|
|
|
|
|
|
|
@app.get("/analytics/underperforming") |
|
|
async def get_underperforming(): |
|
|
try: |
|
|
pages = analytics_agent.get_underperforming_pages(days=30) |
|
|
return pages |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting underperforming pages: {e}") |
|
|
return [] |
|
|
|
|
|
@app.get("/report/weekly") |
|
|
async def weekly_report(): |
|
|
try: |
|
|
report = analytics_agent.generate_weekly_report() |
|
|
return report |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating weekly report: {e}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
@app.get("/monitor/algorithm") |
|
|
async def check_algorithm(): |
|
|
try: |
|
|
status = analytics_agent.detect_algorithm_update() |
|
|
return status |
|
|
except Exception as e: |
|
|
return {"volatility_detected": False, "error": str(e)} |
|
|
|
|
|
@app.get("/monitor/cwv") |
|
|
async def check_cwv(url: str): |
|
|
result = analytics_agent.monitor_core_web_vitals(url) |
|
|
return result |
|
|
|
|
|
|
|
|
@app.post("/workflow/trigger") |
|
|
async def trigger_workflow(request: WorkflowTrigger): |
|
|
workflow_id = orchestrator.create_workflow( |
|
|
request.workflow_name, |
|
|
request.payload |
|
|
) |
|
|
return {"workflow_id": workflow_id, "status": "started"} |
|
|
|