import os import io import json from fastapi import FastAPI, HTTPException, Request, Header, UploadFile, File, Depends from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from typing import List, Optional import httpx from groq import AsyncGroq from dotenv import load_dotenv from supabase import create_client, Client import logging from logging.handlers import RotatingFileHandler import time import traceback load_dotenv() # Logging Configuration logging.basicConfig(level=logging.INFO) logger = logging.getLogger("HomeStack") log_handler = RotatingFileHandler("app.log", maxBytes=5000000, backupCount=5) log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') log_handler.setFormatter(log_formatter) logger.addHandler(log_handler) logger.propagate = False app = FastAPI() # Middleware for Logging @app.middleware("http") async def log_requests(request: Request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time logger.info(f"REQ: {request.method} {request.url.path} | STATUS: {response.status_code} | DONE IN: {duration:.4f}s") return response # Root Status @app.get("/", response_class=FileResponse) async def root(): return FileResponse("index.html") # Configuration INVENTORY_PASSWORD = os.getenv("INVENTORY_PASSWORD") GROQ_API_KEY = os.getenv("GROQ_API_KEY") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Supabase Client if not SUPABASE_SERVICE_ROLE_KEY: logger.error("SUPABASE_SERVICE_ROLE_KEY not found in environment variables.") supabase: Optional[Client] = None else: supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) # Groq Client if not GROQ_API_KEY: logger.error("GROQ_API_KEY not found in environment variables.") groq_client: Optional[AsyncGroq] = None else: groq_client = AsyncGroq(api_key=GROQ_API_KEY) # Security Dependency async def verify_internal_password( x_api_password: Optional[str] = Header(None), authorization: Optional[str] = Header(None) ): # Allow either internal X-API-Password or the frontend's Authorization token if x_api_password == INVENTORY_PASSWORD: return True if authorization == "Bearer vault-unlocked-session-token": return True raise HTTPException(status_code=401, detail="Unauthorized access") # Models class LoginRequest(BaseModel): password: str class ChatMessage(BaseModel): role: str content: Optional[str] = "" class ChatRequest(BaseModel): messages: List[ChatMessage] class InventoryItem(BaseModel): name: str quantity: float unit: Optional[str] = "UNIT" container_id: Optional[str] = None location_id: Optional[str] = None class ContainerItem(BaseModel): name: str location_id: str class LocationItem(BaseModel): name: str # Endpoints @app.post("/api/auth/unlock") async def unlock(request: LoginRequest): if request.password == INVENTORY_PASSWORD: return {"success": True, "token": "vault-unlocked-session-token"} raise HTTPException(status_code=401, detail="Invalid Access Key") @app.get("/api/inventory") async def get_inventory(auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: response = supabase.table("inventory").select("*").order("created_at", desc=True).execute() return response.data except Exception as e: logger.exception("Fetch Inventory Error") raise HTTPException(status_code=500, detail="Failed to fetch inventory") @app.get("/api/locations") async def get_locations(auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: response = supabase.table("locations").select("*").execute() return response.data except Exception as e: logger.exception("Fetch Locations Error") raise HTTPException(status_code=500, detail="Failed to fetch locations") @app.post("/api/inventory") async def add_inventory(item: InventoryItem, auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: response = supabase.table("inventory").insert(item.model_dump()).execute() return response.data except Exception as e: logger.exception("Add Inventory Error") raise HTTPException(status_code=500, detail="Failed to add item") @app.delete("/api/inventory/{item_id}") async def delete_inventory(item_id: str, auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: supabase.table("inventory").delete().eq("id", item_id).execute() return {"success": True} except Exception as e: print(f"Delete Inventory Error: {e}") raise HTTPException(status_code=500, detail="Failed to delete item") @app.post("/api/locations") async def add_location(location: LocationItem, auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: response = supabase.table("locations").insert(location.model_dump()).execute() return response.data except Exception as e: logger.exception("Add Location Error") raise HTTPException(status_code=500, detail="Failed to add location") @app.get("/api/containers") async def get_containers(auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: response = supabase.table("containers").select("*").execute() return response.data except Exception as e: logger.exception("Fetch Containers Error") raise HTTPException(status_code=500, detail="Failed to fetch containers") @app.post("/api/containers") async def add_container(container: ContainerItem, auth: bool = Depends(verify_internal_password)): if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: response = supabase.table("containers").insert(container.model_dump()).execute() return response.data except Exception as e: logger.exception("Add Container Error") raise HTTPException(status_code=500, detail="Failed to add container") @app.post("/api/ai/chat") async def ai_chat(request: ChatRequest, auth: bool = Depends(verify_internal_password)): if not groq_client: raise HTTPException(status_code=500, detail="Groq API Key not configured") if not supabase: raise HTTPException(status_code=500, detail="Supabase not initialized.") try: messages = [ { "role": "system", "content": ( "You are HomeStack Assistant, a friendly and extremely concise digital inventory curator for an elderly user.\n" "Hierarchy: Physical Location -> Container (Box/Bin) -> Item.\n" "Capabilities: Add/Bulk-add/Retrieve items, locations, and containers.\n" "Data Integrity Protocol:\n" "- STANDARD CAPITALIZATION: Always use Title Case for Item Names, Categories, Containers, and Locations.\n" "- ERROR CORRECTION: Silently correct likely transcription errors (e.g., 'sum glu' -> 'Superglue', 'to big' -> 'Too Big'). Filter out filler words.\n" "- DETAIL ENRICHMENT: Ensure items have specific names and appropriate units (e.g., 'Milk' -> 'Gallon of Milk' or simply 'Milk' with quantity 1). If details are missing, use sensible defaults based on context.\n" "Tool Protocol:\n" "- PROACTIVE HELP: If the user asks for advice, how to fix something, or what to pack, ALWAYS use 'get_inventory' FIRST to see what they own.\n" "- ONLY suggest solutions or recommend items that currently exist in their inventory. If they lack the items, ask what they want to do.\n" "- PROACTIVE LOCATION: When you mention an item the user owns, ALWAYS proactively tell them exactly where it is located.\n" "- DO NOT give generic advice using items the user does not possess.\n" "- DO NOT mention the names of the tools you are using. The user does not understand technical jargon.\n" "- NEVER show raw database IDs (UUIDs) to the user. Always look up and use the actual readable names of locations and boxes.\n" "- LANGUAGE TRANSLATION: If the user's input is transcribed in a language other than English, silently translate it to English to understand their intent, and ALWAYS conduct the conversation and respond exclusively in English.\n" "- Tone: Very brief, simple language, no technical jargon. Use Markdown.\n" "IMPORTANT: Always ask for confirmation before executing mutating actions (adding items/creating boxes)" ) } ] # Sanitize incoming history: Ensure content is not None for m in request.messages: msg_dict = m.model_dump() if msg_dict.get("content") is None: msg_dict["content"] = "" messages.append(msg_dict) tools = [ { "type": "function", "function": { "name": "add_inventory_item", "description": "Adds a new item to the digital inventory.", "parameters": { "type": "object", "properties": { "name": { "type": "string" }, "quantity": { "type": "number" }, "unit": { "type": "string", "description": "Unit of measurement e.g. UNIT, Pack, Pair, Kg, L" }, "container_id": { "type": "string", "description": "UUID of the container/box to place item in" }, "location_id": { "type": "string", "description": "UUID of the location if not placing in a container" } }, "required": ["name", "quantity"] } } }, { "type": "function", "function": { "name": "add_location", "description": "Adds a new storage location.", "parameters": { "type": "object", "properties": { "name": { "type": "string" } }, "required": ["name"] } } }, { "type": "function", "function": { "name": "bulk_add_items", "description": "Adds multiple items to the inventory in a single batch operation.", "parameters": { "type": "object", "properties": { "items": { "type": "array", "items": { "type": "object", "properties": { "name": { "type": "string" }, "quantity": { "type": "number" }, "unit": { "type": "string" }, "container_id": { "type": "string" }, "location_id": { "type": "string" } }, "required": ["name", "quantity"] } } }, "required": ["items"] } } }, { "type": "function", "function": { "name": "get_locations", "description": "Retrieves all storage locations." } }, { "type": "function", "function": { "name": "get_inventory", "description": "Retrieves the full list of inventory items." } }, { "type": "function", "function": { "name": "get_containers", "description": "Retrieves all storage containers (boxes, bins)." } }, { "type": "function", "function": { "name": "add_container", "description": "Adds a new storage container (box/bin) to a location.", "parameters": { "type": "object", "properties": { "name": { "type": "string" }, "location_id": { "type": "string" } }, "required": ["name", "location_id"] } } } ] # Standard Multi-turn loop turn_count = 0 max_turns = 100 # High safety limit to allow complex loops while turn_count < max_turns: turn_count += 1 completion = await groq_client.chat.completions.create( model="openai/gpt-oss-120b", messages=messages, tools=tools, tool_choice="auto", temperature=0.3, max_tokens=2048 ) response_message = completion.choices[0].message if not response_message.tool_calls: return completion.model_dump() messages.append(response_message) for tool_call in response_message.tool_calls: tool_name = tool_call.function.name logger.info(f"AI Strategy: Executing tool '{tool_name}' on turn {turn_count}") args = {} if getattr(tool_call.function, 'arguments', None): try: args = json.loads(tool_call.function.arguments) except json.JSONDecodeError: logger.error(f"Failed to parse args for {tool_name}") try: if tool_name == "add_inventory_item": supabase.table("inventory").insert(args).execute() content = f"Success: Added {args.get('name')}." elif tool_name == "add_location": supabase.table("locations").insert(args).execute() content = f"Success: Created location {args.get('name')}." elif tool_name == "bulk_add_items": supabase.table("inventory").insert(args["items"]).execute() content = f"Success: Bulk added {len(args['items'])} items." elif tool_name == "add_container": supabase.table("containers").insert(args).execute() content = f"Success: Created container {args.get('name')}." elif tool_name == "get_locations": res = supabase.table("locations").select("*").execute() content = json.dumps(res.data) elif tool_name == "get_containers": res = supabase.table("containers").select("*").execute() content = json.dumps(res.data) elif tool_name == "get_inventory": res = supabase.table("inventory").select("*").execute() content = json.dumps(res.data) except Exception as tool_err: logger.error(f"Tool Execution Error: {str(tool_err)}") content = f"Error executing {tool_name}: {str(tool_err)}" if "23505" in str(tool_err): content = f"Note: '{args.get('name')}' already exists in the database. You can proceed using it." messages.append({ "role": "tool", "tool_call_id": tool_call.id, "name": tool_name, "content": content }) # If we exhaust max_turns logger.warning(f"Reached max turns limit ({max_turns}) for AI chat") return { "choices": [{ "message": { "role": "assistant", "content": "I'm still analyzing your complex request. I've performed multiple checks but might need more time. What would you like to verify?" } }] } except Exception as e: logger.exception("AI Chat Error") raise HTTPException(status_code=500, detail="AI communication failed") @app.post("/api/audio/transcribe") async def transcribe_audio(file: UploadFile = File(...), auth: bool = Depends(verify_internal_password)): if not GROQ_API_KEY: raise HTTPException(status_code=500, detail="Groq API Key not configured") async with httpx.AsyncClient() as client: try: content = await file.read() files = {"file": (file.filename, content, file.content_type)} data = { "model": "whisper-large-v3-turbo", "response_format": "verbose_json", "temperature": "0" } response = await client.post( "https://api.groq.com/openai/v1/audio/transcriptions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, data=data, files=files, timeout=60.0 ) return response.json() except Exception as e: logger.exception("Transcription Error") raise HTTPException(status_code=500, detail="Transcription failed") @app.post("/api/audio/process") async def process_audio(request: Request, auth: bool = Depends(verify_internal_password)): if not GROQ_API_KEY: raise HTTPException(status_code=500, detail="Groq API Key not configured") body = await request.json() transcript = body.get("transcript") async with httpx.AsyncClient() as client: try: response = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json={ "messages": [ {"role": "system", "content": "Extract items from the user's transcript. Return JSON with 'items' array. Each item: 'name', 'quantity' (number), 'unit', 'category', 'location'."}, {"role": "user", "content": transcript} ], "model": "openai/gpt-oss-120b" }, timeout=60.0 ) data = response.json() # Save items to Supabase if "choices" in data and data["choices"]: processed = json.loads(data["choices"][0]["message"]["content"]) if "items" in processed and isinstance(processed["items"], list): supabase.table("inventory").insert(processed["items"]).execute() return data except Exception as e: logger.exception("Processing Error") raise HTTPException(status_code=500, detail="Processing failed") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)