Spaces:
Sleeping
Sleeping
File size: 4,323 Bytes
14589fa |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
import os
import subprocess
def update_requirements():
req_file = "requirements.txt"
if not os.path.exists(req_file):
with open(req_file, "w") as f:
f.write("httpx\n")
print(f"Created {req_file} with httpx.")
return
with open(req_file, "r") as f:
content = f.read()
if "httpx" not in content:
with open(req_file, "a") as f:
f.write("\nhttpx\n")
print("Appended httpx to requirements.txt.")
else:
print("httpx already in requirements.txt.")
def update_main():
main_content = r'''import os
import httpx
import asyncio
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import HTMLResponse, JSONResponse
from dotenv import load_dotenv
# Import Agents
from agents.visual_analyst import VisualAnalyst
from agents.memory_agent import MemoryAgent
from agents.writer_agent import WriterAgent
load_dotenv()
app = FastAPI()
# Initialize Agents
try:
visual_agent = VisualAnalyst()
memory_agent = MemoryAgent()
writer_agent = WriterAgent()
memory_agent.seed_database()
print("β
All Agents Online")
except Exception as e:
print(f"β οΈ Agent Startup Warning: {e}")
@app.get("/", response_class=HTMLResponse)
async def read_root():
try:
with open("dashboard.html", "r") as f:
return f.read()
except FileNotFoundError:
return "<h1>Error: dashboard.html not found</h1>"
@app.post("/generate-catalog")
async def generate_catalog(file: UploadFile = File(...)):
try:
# 1. Save Temp File
os.makedirs("uploads", exist_ok=True)
file_path = f"uploads/{file.filename}"
with open(file_path, "wb") as f:
f.write(await file.read())
# 2. Run AI Pipeline
visual_data = await visual_agent.analyze_image(file_path)
query = f"{visual_data.get('main_color', '')} {visual_data.get('product_type', 'product')}"
seo_keywords = memory_agent.retrieve_keywords(query)
listing = writer_agent.write_listing(visual_data, seo_keywords)
# 3. Construct Final Payload
final_data = {
"visual_data": visual_data,
"seo_keywords": seo_keywords,
"listing": listing
}
# 4. β‘ N8N AUTOMATION TRIGGER β‘
n8n_url = os.getenv("N8N_WEBHOOK_URL")
if n8n_url:
print(f"π Sending data to N8N: {n8n_url}")
# Fire and forget (don't make the user wait for n8n)
asyncio.create_task(send_to_n8n(n8n_url, final_data))
# Cleanup
if os.path.exists(file_path):
os.remove(file_path)
return JSONResponse(content=final_data)
except Exception as e:
return JSONResponse(content={"error": str(e)}, status_code=500)
# Async Helper to send data without blocking
async def send_to_n8n(url, data):
try:
async with httpx.AsyncClient() as client:
await client.post(url, json=data, timeout=5.0)
print("β
N8N Webhook Sent Successfully")
except Exception as e:
print(f"β N8N Webhook Failed: {e}")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)
'''
with open("main.py", "w", encoding="utf-8") as f:
f.write(main_content)
print("Updated main.py with N8N integration logic.")
def deploy():
try:
subprocess.run(["git", "add", "."], check=True)
# Check if there are changes to commit
status = subprocess.run(["git", "status", "--porcelain"], capture_output=True, text=True)
if status.stdout.strip():
subprocess.run(["git", "commit", "-m", "Add N8N Integration"], check=True)
print("Git commit successful.")
else:
print("No changes to commit.")
print("Pushing to space...")
subprocess.run(["git", "push", "space", "clean_deploy:main"], check=True)
print("β
Successfully deployed to Hugging Face Space.")
except subprocess.CalledProcessError as e:
print(f"β Deployment failed: {e}")
if __name__ == "__main__":
print("Starting N8N Integration Setup...")
update_requirements()
update_main()
deploy()
print("β
connect_n8n.py completed.")
|