Call_Agent_API / main.py
martinbrahm's picture
Upload main.py
a0a7178 verified
from fastapi import FastAPI, Request
import json
import os
import re
import firebase_admin
from firebase_admin import credentials, firestore
from datetime import datetime
app = FastAPI()
# --- SETUP ---
COLLECTION_KNOWLEDGE = "knowledge_base"
COLLECTION_RULES = "availability_rules"
COLLECTION_INBOX = "inbox"
KNOWLEDGE_CACHE = []
# --- FIREBASE VERBINDUNG ---
db = None
try:
key = os.environ.get("FIREBASE_KEY")
if key:
cred = credentials.Certificate(json.loads(key))
if not firebase_admin._apps:
firebase_admin.initialize_app(cred)
db = firestore.client()
print("✅ DB VERBUNDEN")
else:
print("❌ FEHLER: FIREBASE_KEY fehlt!")
except Exception as e:
print(f"❌ DB CRASH: {e}")
# --- CACHE LADEN ---
def reload_knowledge():
global KNOWLEDGE_CACHE
if not db: return
try:
docs = db.collection(COLLECTION_KNOWLEDGE).stream()
KNOWLEDGE_CACHE = [d.to_dict() for d in docs]
print(f"📚 {len(KNOWLEDGE_CACHE)} Einträge geladen.")
except Exception as e:
print(f"❌ Cache Fehler: {e}")
@app.on_event("startup")
async def startup():
reload_knowledge()
# --- HELPER: STEMMING & TOKENIZING ---
def get_stem(word):
# Einfaches Stemming
w = word.lower().strip()
suffixes = ["ungen", "innen", "keit", "sch", "ern", "en", "er", "es", "st", "te", "e", "s", "t"]
for end in suffixes:
if w.endswith(end) and len(w) > len(end) + 2:
return w[:-len(end)]
return w
def tokenize(text):
# Entfernt Sonderzeichen und zerlegt in Stämme
clean_text = re.sub(r'[^\w\s]', '', text.lower())
return [get_stem(w) for w in clean_text.split() if w]
# --- HELPER: VAPI REQUEST PARSER ---
def parse_vapi_request(data):
tool_call_id = "unknown"
args = {}
try:
msg = data.get("message", {})
if "toolCallList" in msg:
call = msg["toolCallList"][0]
tool_call_id = call["id"]
if "function" in call and "arguments" in call["function"]:
args = call["function"]["arguments"]
elif "toolCalls" in msg:
call = msg["toolCalls"][0]
tool_call_id = call["id"]
if "function" in call and "arguments" in call["function"]:
args = call["function"]["arguments"]
if isinstance(args, str):
args = json.loads(args)
except Exception as e:
print(f"⚠️ Parsing Info: {e}")
return tool_call_id, args
# ==========================================
# TOOL: SUCHE (OPTIMIERT)
# ==========================================
@app.post("/search")
async def search(request: Request):
data = await request.json()
tool_call_id, args = parse_vapi_request(data)
query = args.get("search_query") or args.get("query") or data.get("search_query")
print(f"🔎 QUERY: '{query}'")
answer_text = "Dazu habe ich leider keine Informationen in meiner Datenbank."
if query:
# --- STOP WÖRTER LISTE (MASSIV ERWEITERT) ---
# Diese Wörter werden komplett ignoriert und geben 0 Punkte.
STOP_WORDS = [
# Kommunikation
"hallo", "guten", "tag", "moin", "bitte", "danke", "frage", "sagen", "kannst", "du", "mir",
"was", "ist", "wer", "wie", "wo", "wann", "erzähl", "über", "möchte", "will", "haben",
# Artikel & Füllwörter (DIE KILLER!)
"der", "die", "das", "dem", "den", "des", "ein", "eine", "einer", "eines",
"im", "in", "von", "zu", "bei", "mit", "für", "auf", "aus", "um", "und", "oder",
# Generische Business-Wörter (die alles matchen würden)
"anbieten", "machen", "tun", "geben", "helfen", "unterstützen", "bieten",
"firma", "unternehmen", "gmbh", "produkt", "system", "plattform"
# "plattform" ist hier Stop-Wort, damit "Kosten der Plattform" nicht beim "Plattform-Feature" landet!
]
# 1. Query bereinigen
query_stems = [w for w in tokenize(query) if w not in STOP_WORDS and len(w) > 2]
print(f"🧐 Relevante Tokens: {query_stems}")
found = False
if query_stems:
best_doc = None
best_score = 0
for doc in KNOWLEDGE_CACHE:
score = 0
hits = []
# Dokument Inhalte tokenizen
# WICHTIG: Keywords zählen wir doppelt so stark, wenn sie exakt passen
doc_keywords = [get_stem(k) for k in doc.get("keywords", [])]
doc_title_stems = tokenize(doc.get("question", ""))
for q_stem in query_stems:
# 1. KEYWORD MATCH (Der "Router") -> 100 Punkte!
if q_stem in doc_keywords:
score += 100
hits.append(f"KEYWORD '{q_stem}'")
# 2. TITEL MATCH -> 50 Punkte
elif q_stem in doc_title_stems:
score += 50
hits.append(f"TITLE '{q_stem}'")
# (Wir ignorieren den Fließtext für das Scoring, um Zufallstreffer zu vermeiden)
if score > best_score:
best_score = score
best_doc = doc
if score > 0:
print(f" Kandidat: {score} Pkt ({hits}) -> {doc.get('question')[:30]}...")
# SCHWELLE: 50 PUNKTE
# Es muss mindestens ein Titel-Treffer (50) oder Keyword (100) sein.
if best_doc and best_score >= 50:
print(f"🏆 GEWINNER ({best_score} Pkt): {best_doc.get('question')}")
answer_text = best_doc.get("answer")
found = True
else:
print(f"⚠️ Kein Treffer (Max Score: {best_score})")
# --- INBOX ---
if not found and db:
print("📥 Ab in die Inbox.")
db.collection(COLLECTION_INBOX).add({
"query": query,
"timestamp": datetime.now(),
"status": "open"
})
return {"results": [{"toolCallId": tool_call_id, "result": answer_text}]}
# --- ANDERE ENDPOINTS ---
@app.post("/check_availability")
async def check_availability(request: Request):
data = await request.json()
tool_call_id, _ = parse_vapi_request(data)
today = datetime.now().strftime("%Y-%m-%d")
status, instruction = "available", "Normal arbeiten"
if db:
rules = db.collection(COLLECTION_RULES).where("active", "==", True).stream()
for r in rules:
rd = r.to_dict()
if rd.get('start_date') <= today <= rd.get('end_date'):
status = "limited" if "ferien" in rd.get('name', '').lower() else "unavailable"
instruction = rd.get('instruction_text')
break
return {"results": [{"toolCallId": tool_call_id, "result": {"status": status, "instruction": instruction}}]}
@app.post("/vapi-incoming")
async def dummy_incoming(request: Request): return {"status": "ok"}
@app.get("/")
def home(): return {"status": "Online"}