Call_Agent_API / main.py
martinbrahm's picture
Upload main.py
eeb5b5c verified
raw
history blame
5.44 kB
from fastapi import FastAPI, Request
import json
import os
import firebase_admin
from firebase_admin import credentials, firestore
app = FastAPI()
# ------------------------------------------------------
# 1. VERBINDUNG ZU FIREBASE HERSTELLEN
# ------------------------------------------------------
try:
# Wir holen den Schlüssel aus den Hugging Face Secrets
firebase_key_json = os.environ.get("FIREBASE_KEY")
if firebase_key_json:
# Den String wieder in ein echtes JSON-Objekt verwandeln
cred_dict = json.loads(firebase_key_json)
cred = credentials.Certificate(cred_dict)
# Prüfen, ob die App schon läuft, um Doppel-Start zu verhindern
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
db = firestore.client()
print("✅ Erfolgreich mit Firebase Firestore verbunden!")
else:
print("⚠️ WARNUNG: Kein FIREBASE_KEY in den Settings gefunden.")
db = None
except Exception as e:
print(f"❌ Kritischer Firebase Fehler: {e}")
db = None
# ------------------------------------------------------
# 2. KONFIGURATION
# ------------------------------------------------------
# Wie heißt deine Sammlung in Firebase?
# Laut deinem Pfad '/knowledge_base/...' wahrscheinlich so:
COLLECTION_NAME = "knowledge_base"
# Falls du die Fragen in einer 'inbox' sammeln willst:
INBOX_COLLECTION = "inbox"
@app.get("/")
def home():
if db:
return {"status": "Online & mit Firebase verbunden."}
else:
return {"status": "Online, aber KEINE Firebase-Verbindung (Key fehlt)."}
@app.post("/search")
async def search_knowledge(request: Request):
# --- A. DATEN EMPFANGEN (Vapi) ---
try:
data = await request.json()
print(f"📥 Vapi Anfrage: {json.dumps(data)}")
except:
return {"result": "Fehler: Kein JSON."}
query_text = ""
# Parsing Logik
if "query" in data and isinstance(data["query"], str):
query_text = data["query"]
elif "message" in data and "toolCalls" in data["message"]:
try:
args = data["message"]["toolCalls"][0]["function"]["arguments"]
if isinstance(args, str):
query_text = json.loads(args).get("query", "")
else:
query_text = args.get("query", "")
except:
pass
elif "toolCall" in data:
try:
args = data["toolCall"]["function"]["arguments"]
if isinstance(args, str):
query_text = json.loads(args).get("query", "")
else:
query_text = args.get("query", "")
except:
pass
print(f"🔎 Suche in Firebase nach: '{query_text}'")
if not query_text:
return {"result": "Ich habe die Frage nicht verstanden."}
if not db:
return {"result": "Server-Fehler: Keine Datenbankverbindung."}
# --- B. SUCHE IN FIREBASE ---
antwort = "Dazu habe ich leider keine Informationen in meiner Datenbank. Ich habe die Frage für unser Team notiert."
treffer_gefunden = False
try:
# Wir laden alle Dokumente aus der Knowledge Base
# (Bei sehr vielen Daten > 1000 Einträge müsste man das optimieren, aber für jetzt reicht das)
docs = db.collection(COLLECTION_NAME).stream()
query_lower = query_text.lower()
# Einfache Suche: Wir schauen, ob Keywords aus der DB in der Frage vorkommen
# Oder ob die Frage in der DB der Frage des Users ähnelt.
for doc in docs:
doc_data = doc.to_dict()
# Wir gehen davon aus, dass deine Dokumente Felder wie 'question', 'answer' oder 'keywords' haben
db_answer = doc_data.get("answer") or doc_data.get("Antwort")
# Check 1: Keywords Suche (Falls du ein Feld 'keywords' hast)
keywords = doc_data.get("keywords") or []
if keywords and isinstance(keywords, list):
if any(k.lower() in query_lower for k in keywords):
antwort = db_answer
treffer_gefunden = True
break
# Check 2: Frage-Text Suche (Falls du ein Feld 'question' hast)
db_question = doc_data.get("question") or doc_data.get("Frage") or ""
if db_question and db_question.lower() in query_lower:
antwort = db_answer
treffer_gefunden = True
break
# --- C. LERN-MODUS (Inbox) ---
if not treffer_gefunden:
print(f"⚠️ Kein Treffer. Speichere '{query_text}' in {INBOX_COLLECTION}...")
try:
db.collection(INBOX_COLLECTION).add({
"question": query_text,
"status": "open",
"timestamp": firestore.SERVER_TIMESTAMP
})
print("📩 Erfolgreich gespeichert.")
except Exception as e:
print(f"Fehler beim Speichern in Inbox: {e}")
except Exception as e:
print(f"❌ Firebase Abfrage-Fehler: {e}")
return {"result": "Es gab ein Problem beim Lesen der Datenbank."}
return {"result": antwort}