| import os |
| import io |
| import json |
| from fastapi import FastAPI, HTTPException, Request, Header, UploadFile, File, Depends |
| from fastapi.responses import HTMLResponse, JSONResponse, FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel |
| from typing import List, Optional |
| import httpx |
| from groq import AsyncGroq |
| from dotenv import load_dotenv |
| from supabase import create_client, Client |
| import logging |
| from logging.handlers import RotatingFileHandler |
| import time |
| import traceback |
|
|
| load_dotenv() |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger("HomeStack") |
| log_handler = RotatingFileHandler("app.log", maxBytes=5000000, backupCount=5) |
| log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
| log_handler.setFormatter(log_formatter) |
| logger.addHandler(log_handler) |
| logger.propagate = False |
|
|
| app = FastAPI() |
|
|
| |
| @app.middleware("http") |
| async def log_requests(request: Request, call_next): |
| start_time = time.time() |
| response = await call_next(request) |
| duration = time.time() - start_time |
| logger.info(f"REQ: {request.method} {request.url.path} | STATUS: {response.status_code} | DONE IN: {duration:.4f}s") |
| return response |
|
|
| |
| @app.get("/", response_class=FileResponse) |
| async def root(): |
| return FileResponse("index.html") |
|
|
| |
| INVENTORY_PASSWORD = os.getenv("INVENTORY_PASSWORD") |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
| SUPABASE_URL = os.getenv("SUPABASE_URL") |
| SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
|
|
| |
| if not SUPABASE_SERVICE_ROLE_KEY: |
| logger.error("SUPABASE_SERVICE_ROLE_KEY not found in environment variables.") |
| supabase: Optional[Client] = None |
| else: |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY) |
|
|
| |
| if not GROQ_API_KEY: |
| logger.error("GROQ_API_KEY not found in environment variables.") |
| groq_client: Optional[AsyncGroq] = None |
| else: |
| groq_client = AsyncGroq(api_key=GROQ_API_KEY) |
|
|
| |
| async def verify_internal_password( |
| x_api_password: Optional[str] = Header(None), |
| authorization: Optional[str] = Header(None) |
| ): |
| |
| if x_api_password == INVENTORY_PASSWORD: |
| return True |
| if authorization == "Bearer vault-unlocked-session-token": |
| return True |
| raise HTTPException(status_code=401, detail="Unauthorized access") |
|
|
| |
| class LoginRequest(BaseModel): |
| password: str |
|
|
| class ChatMessage(BaseModel): |
| role: str |
| content: Optional[str] = "" |
|
|
| class ChatRequest(BaseModel): |
| messages: List[ChatMessage] |
|
|
| class InventoryItem(BaseModel): |
| name: str |
| quantity: float |
| unit: Optional[str] = "UNIT" |
| container_id: Optional[str] = None |
| location_id: Optional[str] = None |
|
|
| class ContainerItem(BaseModel): |
| name: str |
| location_id: str |
|
|
| class LocationItem(BaseModel): |
| name: str |
|
|
| |
| @app.post("/api/auth/unlock") |
| async def unlock(request: LoginRequest): |
| if request.password == INVENTORY_PASSWORD: |
| return {"success": True, "token": "vault-unlocked-session-token"} |
| raise HTTPException(status_code=401, detail="Invalid Access Key") |
|
|
| @app.get("/api/inventory") |
| async def get_inventory(auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| response = supabase.table("inventory").select("*").order("created_at", desc=True).execute() |
| return response.data |
| except Exception as e: |
| logger.exception("Fetch Inventory Error") |
| raise HTTPException(status_code=500, detail="Failed to fetch inventory") |
|
|
| @app.get("/api/locations") |
| async def get_locations(auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| response = supabase.table("locations").select("*").execute() |
| return response.data |
| except Exception as e: |
| logger.exception("Fetch Locations Error") |
| raise HTTPException(status_code=500, detail="Failed to fetch locations") |
|
|
| @app.post("/api/inventory") |
| async def add_inventory(item: InventoryItem, auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| response = supabase.table("inventory").insert(item.model_dump()).execute() |
| return response.data |
| except Exception as e: |
| logger.exception("Add Inventory Error") |
| raise HTTPException(status_code=500, detail="Failed to add item") |
|
|
| @app.delete("/api/inventory/{item_id}") |
| async def delete_inventory(item_id: str, auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| supabase.table("inventory").delete().eq("id", item_id).execute() |
| return {"success": True} |
| except Exception as e: |
| print(f"Delete Inventory Error: {e}") |
| raise HTTPException(status_code=500, detail="Failed to delete item") |
|
|
| @app.post("/api/locations") |
| async def add_location(location: LocationItem, auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| response = supabase.table("locations").insert(location.model_dump()).execute() |
| return response.data |
| except Exception as e: |
| logger.exception("Add Location Error") |
| raise HTTPException(status_code=500, detail="Failed to add location") |
|
|
| @app.get("/api/containers") |
| async def get_containers(auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| response = supabase.table("containers").select("*").execute() |
| return response.data |
| except Exception as e: |
| logger.exception("Fetch Containers Error") |
| raise HTTPException(status_code=500, detail="Failed to fetch containers") |
|
|
| @app.post("/api/containers") |
| async def add_container(container: ContainerItem, auth: bool = Depends(verify_internal_password)): |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
| try: |
| response = supabase.table("containers").insert(container.model_dump()).execute() |
| return response.data |
| except Exception as e: |
| logger.exception("Add Container Error") |
| raise HTTPException(status_code=500, detail="Failed to add container") |
|
|
| @app.post("/api/ai/chat") |
| async def ai_chat(request: ChatRequest, auth: bool = Depends(verify_internal_password)): |
| if not groq_client: |
| raise HTTPException(status_code=500, detail="Groq API Key not configured") |
| if not supabase: |
| raise HTTPException(status_code=500, detail="Supabase not initialized.") |
|
|
| try: |
| messages = [ |
| { |
| "role": "system", |
| "content": ( |
| "You are HomeStack Assistant, a friendly and extremely concise digital inventory curator for an elderly user.\n" |
| "Hierarchy: Physical Location -> Container (Box/Bin) -> Item.\n" |
| "Capabilities: Add/Bulk-add/Retrieve items, locations, and containers.\n" |
| "Data Integrity Protocol:\n" |
| "- STANDARD CAPITALIZATION: Always use Title Case for Item Names, Categories, Containers, and Locations.\n" |
| "- ERROR CORRECTION: Silently correct likely transcription errors (e.g., 'sum glu' -> 'Superglue', 'to big' -> 'Too Big'). Filter out filler words.\n" |
| "- DETAIL ENRICHMENT: Ensure items have specific names and appropriate units (e.g., 'Milk' -> 'Gallon of Milk' or simply 'Milk' with quantity 1). If details are missing, use sensible defaults based on context.\n" |
| "Tool Protocol:\n" |
| "- PROACTIVE HELP: If the user asks for advice, how to fix something, or what to pack, ALWAYS use 'get_inventory' FIRST to see what they own.\n" |
| "- ONLY suggest solutions or recommend items that currently exist in their inventory. If they lack the items, ask what they want to do.\n" |
| "- PROACTIVE LOCATION: When you mention an item the user owns, ALWAYS proactively tell them exactly where it is located.\n" |
| "- DO NOT give generic advice using items the user does not possess.\n" |
| "- DO NOT mention the names of the tools you are using. The user does not understand technical jargon.\n" |
| "- NEVER show raw database IDs (UUIDs) to the user. Always look up and use the actual readable names of locations and boxes.\n" |
| "- LANGUAGE TRANSLATION: If the user's input is transcribed in a language other than English, silently translate it to English to understand their intent, and ALWAYS conduct the conversation and respond exclusively in English.\n" |
| "- Tone: Very brief, simple language, no technical jargon. Use Markdown.\n" |
| "IMPORTANT: Always ask for confirmation before executing mutating actions (adding items/creating boxes)" |
| ) |
| } |
| ] |
| |
| |
| for m in request.messages: |
| msg_dict = m.model_dump() |
| if msg_dict.get("content") is None: |
| msg_dict["content"] = "" |
| messages.append(msg_dict) |
|
|
| tools = [ |
| { |
| "type": "function", |
| "function": { |
| "name": "add_inventory_item", |
| "description": "Adds a new item to the digital inventory.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "name": { "type": "string" }, |
| "quantity": { "type": "number" }, |
| "unit": { "type": "string", "description": "Unit of measurement e.g. UNIT, Pack, Pair, Kg, L" }, |
| "container_id": { "type": "string", "description": "UUID of the container/box to place item in" }, |
| "location_id": { "type": "string", "description": "UUID of the location if not placing in a container" } |
| }, |
| "required": ["name", "quantity"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "add_location", |
| "description": "Adds a new storage location.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "name": { "type": "string" } |
| }, |
| "required": ["name"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "bulk_add_items", |
| "description": "Adds multiple items to the inventory in a single batch operation.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "items": { |
| "type": "array", |
| "items": { |
| "type": "object", |
| "properties": { |
| "name": { "type": "string" }, |
| "quantity": { "type": "number" }, |
| "unit": { "type": "string" }, |
| "container_id": { "type": "string" }, |
| "location_id": { "type": "string" } |
| }, |
| "required": ["name", "quantity"] |
| } |
| } |
| }, |
| "required": ["items"] |
| } |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "get_locations", |
| "description": "Retrieves all storage locations." |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "get_inventory", |
| "description": "Retrieves the full list of inventory items." |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "get_containers", |
| "description": "Retrieves all storage containers (boxes, bins)." |
| } |
| }, |
| { |
| "type": "function", |
| "function": { |
| "name": "add_container", |
| "description": "Adds a new storage container (box/bin) to a location.", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "name": { "type": "string" }, |
| "location_id": { "type": "string" } |
| }, |
| "required": ["name", "location_id"] |
| } |
| } |
| } |
| ] |
|
|
| |
| turn_count = 0 |
| max_turns = 100 |
| |
| while turn_count < max_turns: |
| turn_count += 1 |
| completion = await groq_client.chat.completions.create( |
| model="openai/gpt-oss-120b", |
| messages=messages, |
| tools=tools, |
| tool_choice="auto", |
| temperature=0.3, |
| max_tokens=2048 |
| ) |
| |
| response_message = completion.choices[0].message |
| if not response_message.tool_calls: |
| return completion.model_dump() |
|
|
| messages.append(response_message) |
| for tool_call in response_message.tool_calls: |
| tool_name = tool_call.function.name |
| logger.info(f"AI Strategy: Executing tool '{tool_name}' on turn {turn_count}") |
| args = {} |
| if getattr(tool_call.function, 'arguments', None): |
| try: |
| args = json.loads(tool_call.function.arguments) |
| except json.JSONDecodeError: |
| logger.error(f"Failed to parse args for {tool_name}") |
| |
| try: |
| if tool_name == "add_inventory_item": |
| supabase.table("inventory").insert(args).execute() |
| content = f"Success: Added {args.get('name')}." |
| elif tool_name == "add_location": |
| supabase.table("locations").insert(args).execute() |
| content = f"Success: Created location {args.get('name')}." |
| elif tool_name == "bulk_add_items": |
| supabase.table("inventory").insert(args["items"]).execute() |
| content = f"Success: Bulk added {len(args['items'])} items." |
| elif tool_name == "add_container": |
| supabase.table("containers").insert(args).execute() |
| content = f"Success: Created container {args.get('name')}." |
| elif tool_name == "get_locations": |
| res = supabase.table("locations").select("*").execute() |
| content = json.dumps(res.data) |
| elif tool_name == "get_containers": |
| res = supabase.table("containers").select("*").execute() |
| content = json.dumps(res.data) |
| elif tool_name == "get_inventory": |
| res = supabase.table("inventory").select("*").execute() |
| content = json.dumps(res.data) |
| except Exception as tool_err: |
| logger.error(f"Tool Execution Error: {str(tool_err)}") |
| content = f"Error executing {tool_name}: {str(tool_err)}" |
| if "23505" in str(tool_err): |
| content = f"Note: '{args.get('name')}' already exists in the database. You can proceed using it." |
|
|
| messages.append({ |
| "role": "tool", |
| "tool_call_id": tool_call.id, |
| "name": tool_name, |
| "content": content |
| }) |
| |
| |
| logger.warning(f"Reached max turns limit ({max_turns}) for AI chat") |
| return { |
| "choices": [{ |
| "message": { |
| "role": "assistant", |
| "content": "I'm still analyzing your complex request. I've performed multiple checks but might need more time. What would you like to verify?" |
| } |
| }] |
| } |
|
|
|
|
| except Exception as e: |
| logger.exception("AI Chat Error") |
| raise HTTPException(status_code=500, detail="AI communication failed") |
|
|
| @app.post("/api/audio/transcribe") |
| async def transcribe_audio(file: UploadFile = File(...), auth: bool = Depends(verify_internal_password)): |
| if not GROQ_API_KEY: |
| raise HTTPException(status_code=500, detail="Groq API Key not configured") |
| |
| async with httpx.AsyncClient() as client: |
| try: |
| content = await file.read() |
| files = {"file": (file.filename, content, file.content_type)} |
| data = { |
| "model": "whisper-large-v3-turbo", |
| "response_format": "verbose_json", |
| "temperature": "0" |
| } |
| response = await client.post( |
| "https://api.groq.com/openai/v1/audio/transcriptions", |
| headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, |
| data=data, |
| files=files, |
| timeout=60.0 |
| ) |
| return response.json() |
| except Exception as e: |
| logger.exception("Transcription Error") |
| raise HTTPException(status_code=500, detail="Transcription failed") |
|
|
| @app.post("/api/audio/process") |
| async def process_audio(request: Request, auth: bool = Depends(verify_internal_password)): |
| if not GROQ_API_KEY: |
| raise HTTPException(status_code=500, detail="Groq API Key not configured") |
| |
| body = await request.json() |
| transcript = body.get("transcript") |
| |
| async with httpx.AsyncClient() as client: |
| try: |
| response = await client.post( |
| "https://api.groq.com/openai/v1/chat/completions", |
| headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, |
| json={ |
| "messages": [ |
| {"role": "system", "content": "Extract items from the user's transcript. Return JSON with 'items' array. Each item: 'name', 'quantity' (number), 'unit', 'category', 'location'."}, |
| {"role": "user", "content": transcript} |
| ], |
| "model": "openai/gpt-oss-120b" |
| }, |
| timeout=60.0 |
| ) |
| data = response.json() |
| |
| if "choices" in data and data["choices"]: |
| processed = json.loads(data["choices"][0]["message"]["content"]) |
| if "items" in processed and isinstance(processed["items"], list): |
| supabase.table("inventory").insert(processed["items"]).execute() |
| |
| return data |
| except Exception as e: |
| logger.exception("Processing Error") |
| raise HTTPException(status_code=500, detail="Processing failed") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=8000) |
|
|