SolarumAsteridion
fixes
70483cf
import os
import io
import json
from fastapi import FastAPI, HTTPException, Request, Header, UploadFile, File, Depends
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Optional
import httpx
from groq import AsyncGroq
from dotenv import load_dotenv
from supabase import create_client, Client
import logging
from logging.handlers import RotatingFileHandler
import time
import traceback
load_dotenv()
# Logging Configuration
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("HomeStack")
log_handler = RotatingFileHandler("app.log", maxBytes=5000000, backupCount=5)
log_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_handler.setFormatter(log_formatter)
logger.addHandler(log_handler)
logger.propagate = False
app = FastAPI()
# Middleware for Logging
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
logger.info(f"REQ: {request.method} {request.url.path} | STATUS: {response.status_code} | DONE IN: {duration:.4f}s")
return response
# Root Status
@app.get("/", response_class=FileResponse)
async def root():
return FileResponse("index.html")
# Configuration
INVENTORY_PASSWORD = os.getenv("INVENTORY_PASSWORD")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
# Supabase Client
if not SUPABASE_SERVICE_ROLE_KEY:
logger.error("SUPABASE_SERVICE_ROLE_KEY not found in environment variables.")
supabase: Optional[Client] = None
else:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# Groq Client
if not GROQ_API_KEY:
logger.error("GROQ_API_KEY not found in environment variables.")
groq_client: Optional[AsyncGroq] = None
else:
groq_client = AsyncGroq(api_key=GROQ_API_KEY)
# Security Dependency
async def verify_internal_password(
x_api_password: Optional[str] = Header(None),
authorization: Optional[str] = Header(None)
):
# Allow either internal X-API-Password or the frontend's Authorization token
if x_api_password == INVENTORY_PASSWORD:
return True
if authorization == "Bearer vault-unlocked-session-token":
return True
raise HTTPException(status_code=401, detail="Unauthorized access")
# Models
class LoginRequest(BaseModel):
password: str
class ChatMessage(BaseModel):
role: str
content: Optional[str] = ""
class ChatRequest(BaseModel):
messages: List[ChatMessage]
class InventoryItem(BaseModel):
name: str
quantity: float
unit: Optional[str] = "UNIT"
container_id: Optional[str] = None
location_id: Optional[str] = None
class ContainerItem(BaseModel):
name: str
location_id: str
class LocationItem(BaseModel):
name: str
# Endpoints
@app.post("/api/auth/unlock")
async def unlock(request: LoginRequest):
if request.password == INVENTORY_PASSWORD:
return {"success": True, "token": "vault-unlocked-session-token"}
raise HTTPException(status_code=401, detail="Invalid Access Key")
@app.get("/api/inventory")
async def get_inventory(auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
response = supabase.table("inventory").select("*").order("created_at", desc=True).execute()
return response.data
except Exception as e:
logger.exception("Fetch Inventory Error")
raise HTTPException(status_code=500, detail="Failed to fetch inventory")
@app.get("/api/locations")
async def get_locations(auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
response = supabase.table("locations").select("*").execute()
return response.data
except Exception as e:
logger.exception("Fetch Locations Error")
raise HTTPException(status_code=500, detail="Failed to fetch locations")
@app.post("/api/inventory")
async def add_inventory(item: InventoryItem, auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
response = supabase.table("inventory").insert(item.model_dump()).execute()
return response.data
except Exception as e:
logger.exception("Add Inventory Error")
raise HTTPException(status_code=500, detail="Failed to add item")
@app.delete("/api/inventory/{item_id}")
async def delete_inventory(item_id: str, auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
supabase.table("inventory").delete().eq("id", item_id).execute()
return {"success": True}
except Exception as e:
print(f"Delete Inventory Error: {e}")
raise HTTPException(status_code=500, detail="Failed to delete item")
@app.post("/api/locations")
async def add_location(location: LocationItem, auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
response = supabase.table("locations").insert(location.model_dump()).execute()
return response.data
except Exception as e:
logger.exception("Add Location Error")
raise HTTPException(status_code=500, detail="Failed to add location")
@app.get("/api/containers")
async def get_containers(auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
response = supabase.table("containers").select("*").execute()
return response.data
except Exception as e:
logger.exception("Fetch Containers Error")
raise HTTPException(status_code=500, detail="Failed to fetch containers")
@app.post("/api/containers")
async def add_container(container: ContainerItem, auth: bool = Depends(verify_internal_password)):
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
response = supabase.table("containers").insert(container.model_dump()).execute()
return response.data
except Exception as e:
logger.exception("Add Container Error")
raise HTTPException(status_code=500, detail="Failed to add container")
@app.post("/api/ai/chat")
async def ai_chat(request: ChatRequest, auth: bool = Depends(verify_internal_password)):
if not groq_client:
raise HTTPException(status_code=500, detail="Groq API Key not configured")
if not supabase:
raise HTTPException(status_code=500, detail="Supabase not initialized.")
try:
messages = [
{
"role": "system",
"content": (
"You are HomeStack Assistant, a friendly and extremely concise digital inventory curator for an elderly user.\n"
"Hierarchy: Physical Location -> Container (Box/Bin) -> Item.\n"
"Capabilities: Add/Bulk-add/Retrieve items, locations, and containers.\n"
"Data Integrity Protocol:\n"
"- STANDARD CAPITALIZATION: Always use Title Case for Item Names, Categories, Containers, and Locations.\n"
"- ERROR CORRECTION: Silently correct likely transcription errors (e.g., 'sum glu' -> 'Superglue', 'to big' -> 'Too Big'). Filter out filler words.\n"
"- DETAIL ENRICHMENT: Ensure items have specific names and appropriate units (e.g., 'Milk' -> 'Gallon of Milk' or simply 'Milk' with quantity 1). If details are missing, use sensible defaults based on context.\n"
"Tool Protocol:\n"
"- PROACTIVE HELP: If the user asks for advice, how to fix something, or what to pack, ALWAYS use 'get_inventory' FIRST to see what they own.\n"
"- ONLY suggest solutions or recommend items that currently exist in their inventory. If they lack the items, ask what they want to do.\n"
"- PROACTIVE LOCATION: When you mention an item the user owns, ALWAYS proactively tell them exactly where it is located.\n"
"- DO NOT give generic advice using items the user does not possess.\n"
"- DO NOT mention the names of the tools you are using. The user does not understand technical jargon.\n"
"- NEVER show raw database IDs (UUIDs) to the user. Always look up and use the actual readable names of locations and boxes.\n"
"- LANGUAGE TRANSLATION: If the user's input is transcribed in a language other than English, silently translate it to English to understand their intent, and ALWAYS conduct the conversation and respond exclusively in English.\n"
"- Tone: Very brief, simple language, no technical jargon. Use Markdown.\n"
"IMPORTANT: Always ask for confirmation before executing mutating actions (adding items/creating boxes)"
)
}
]
# Sanitize incoming history: Ensure content is not None
for m in request.messages:
msg_dict = m.model_dump()
if msg_dict.get("content") is None:
msg_dict["content"] = ""
messages.append(msg_dict)
tools = [
{
"type": "function",
"function": {
"name": "add_inventory_item",
"description": "Adds a new item to the digital inventory.",
"parameters": {
"type": "object",
"properties": {
"name": { "type": "string" },
"quantity": { "type": "number" },
"unit": { "type": "string", "description": "Unit of measurement e.g. UNIT, Pack, Pair, Kg, L" },
"container_id": { "type": "string", "description": "UUID of the container/box to place item in" },
"location_id": { "type": "string", "description": "UUID of the location if not placing in a container" }
},
"required": ["name", "quantity"]
}
}
},
{
"type": "function",
"function": {
"name": "add_location",
"description": "Adds a new storage location.",
"parameters": {
"type": "object",
"properties": {
"name": { "type": "string" }
},
"required": ["name"]
}
}
},
{
"type": "function",
"function": {
"name": "bulk_add_items",
"description": "Adds multiple items to the inventory in a single batch operation.",
"parameters": {
"type": "object",
"properties": {
"items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": { "type": "string" },
"quantity": { "type": "number" },
"unit": { "type": "string" },
"container_id": { "type": "string" },
"location_id": { "type": "string" }
},
"required": ["name", "quantity"]
}
}
},
"required": ["items"]
}
}
},
{
"type": "function",
"function": {
"name": "get_locations",
"description": "Retrieves all storage locations."
}
},
{
"type": "function",
"function": {
"name": "get_inventory",
"description": "Retrieves the full list of inventory items."
}
},
{
"type": "function",
"function": {
"name": "get_containers",
"description": "Retrieves all storage containers (boxes, bins)."
}
},
{
"type": "function",
"function": {
"name": "add_container",
"description": "Adds a new storage container (box/bin) to a location.",
"parameters": {
"type": "object",
"properties": {
"name": { "type": "string" },
"location_id": { "type": "string" }
},
"required": ["name", "location_id"]
}
}
}
]
# Standard Multi-turn loop
turn_count = 0
max_turns = 100 # High safety limit to allow complex loops
while turn_count < max_turns:
turn_count += 1
completion = await groq_client.chat.completions.create(
model="openai/gpt-oss-120b",
messages=messages,
tools=tools,
tool_choice="auto",
temperature=0.3,
max_tokens=2048
)
response_message = completion.choices[0].message
if not response_message.tool_calls:
return completion.model_dump()
messages.append(response_message)
for tool_call in response_message.tool_calls:
tool_name = tool_call.function.name
logger.info(f"AI Strategy: Executing tool '{tool_name}' on turn {turn_count}")
args = {}
if getattr(tool_call.function, 'arguments', None):
try:
args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError:
logger.error(f"Failed to parse args for {tool_name}")
try:
if tool_name == "add_inventory_item":
supabase.table("inventory").insert(args).execute()
content = f"Success: Added {args.get('name')}."
elif tool_name == "add_location":
supabase.table("locations").insert(args).execute()
content = f"Success: Created location {args.get('name')}."
elif tool_name == "bulk_add_items":
supabase.table("inventory").insert(args["items"]).execute()
content = f"Success: Bulk added {len(args['items'])} items."
elif tool_name == "add_container":
supabase.table("containers").insert(args).execute()
content = f"Success: Created container {args.get('name')}."
elif tool_name == "get_locations":
res = supabase.table("locations").select("*").execute()
content = json.dumps(res.data)
elif tool_name == "get_containers":
res = supabase.table("containers").select("*").execute()
content = json.dumps(res.data)
elif tool_name == "get_inventory":
res = supabase.table("inventory").select("*").execute()
content = json.dumps(res.data)
except Exception as tool_err:
logger.error(f"Tool Execution Error: {str(tool_err)}")
content = f"Error executing {tool_name}: {str(tool_err)}"
if "23505" in str(tool_err):
content = f"Note: '{args.get('name')}' already exists in the database. You can proceed using it."
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"name": tool_name,
"content": content
})
# If we exhaust max_turns
logger.warning(f"Reached max turns limit ({max_turns}) for AI chat")
return {
"choices": [{
"message": {
"role": "assistant",
"content": "I'm still analyzing your complex request. I've performed multiple checks but might need more time. What would you like to verify?"
}
}]
}
except Exception as e:
logger.exception("AI Chat Error")
raise HTTPException(status_code=500, detail="AI communication failed")
@app.post("/api/audio/transcribe")
async def transcribe_audio(file: UploadFile = File(...), auth: bool = Depends(verify_internal_password)):
if not GROQ_API_KEY:
raise HTTPException(status_code=500, detail="Groq API Key not configured")
async with httpx.AsyncClient() as client:
try:
content = await file.read()
files = {"file": (file.filename, content, file.content_type)}
data = {
"model": "whisper-large-v3-turbo",
"response_format": "verbose_json",
"temperature": "0"
}
response = await client.post(
"https://api.groq.com/openai/v1/audio/transcriptions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
data=data,
files=files,
timeout=60.0
)
return response.json()
except Exception as e:
logger.exception("Transcription Error")
raise HTTPException(status_code=500, detail="Transcription failed")
@app.post("/api/audio/process")
async def process_audio(request: Request, auth: bool = Depends(verify_internal_password)):
if not GROQ_API_KEY:
raise HTTPException(status_code=500, detail="Groq API Key not configured")
body = await request.json()
transcript = body.get("transcript")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
json={
"messages": [
{"role": "system", "content": "Extract items from the user's transcript. Return JSON with 'items' array. Each item: 'name', 'quantity' (number), 'unit', 'category', 'location'."},
{"role": "user", "content": transcript}
],
"model": "openai/gpt-oss-120b"
},
timeout=60.0
)
data = response.json()
# Save items to Supabase
if "choices" in data and data["choices"]:
processed = json.loads(data["choices"][0]["message"]["content"])
if "items" in processed and isinstance(processed["items"], list):
supabase.table("inventory").insert(processed["items"]).execute()
return data
except Exception as e:
logger.exception("Processing Error")
raise HTTPException(status_code=500, detail="Processing failed")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)