import os
import json
import re
import time
import sys
import traceback
import threading
import html as html_lib
import ipaddress
import random
from datetime import datetime
from urllib.parse import urlparse
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
from difflib import SequenceMatcher
import torch
import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer, AutoModel
from huggingface_hub import hf_hub_download, upload_file
try:
from bs4 import BeautifulSoup
except Exception:
BeautifulSoup = None
# =========================================================
# KONFIG – Hier kannst du alles einstellen
# =========================================================
# --- MODELL ---
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"
# Beispiele:
# "Qwen/Qwen3-0.6B" → sehr klein, schnell, schwächer
# "Qwen/Qwen3-1.7B" → gute Balance (empfohlen)
# "Qwen/Qwen3-4B" → klüger, braucht mehr RAM
# "Qwen/Qwen2.5-1.5B-Instruct" → älteres Modell, stabil
# --- HUGGING FACE ---
HF_DATASET = "RedJul2110/wissen-datenbank"
HF_TOKEN = os.environ.get("HF_TOKEN", "")
ADMIN_CODE = os.environ.get("CODE", "")
# --- QUANTISIERUNG (4 für Geschwindigkeit, 8 für Schärfe, 16 für volle Power) ---
# Wähle: 4, 8 oder 16
LOAD_IN_BITS = 4 # Empfohlen für 16 GB RAM 4 8 16
# --- DATEIPFADE ---
DATA_DIR = "/data" if os.path.isdir("/data") else "."
os.makedirs(DATA_DIR, exist_ok=True)
WISSEN_FILE = os.path.join(DATA_DIR, "wissen.json")
CHAT_FILE = os.path.join(DATA_DIR, "chat_history.json")
LOG_FILE = os.path.join(DATA_DIR, "ai_log.txt")
SETTINGS_FILE = os.path.join(DATA_DIR, "settings.json")
# --- ANTWORTLÄNGE (wie lang darf die KI antworten?) ---
MAX_NEW_TOKENS_CHAT = 100 # Normale Chat-Antwort
MAX_NEW_TOKENS_POLISH = 100 # Antwort mit Wissensdatenbank
MAX_NEW_TOKENS_SUMMARY = 200 # Link-Zusammenfassung
# Beispiele:
# 80 → sehr kurze Antworten (schneller)
# 150 → mittlere Antworten (empfohlen)
# 300 → lange, detaillierte Antworten (langsamer)
# --- KREATIVITÄT (wie kreativ / zufällig antwortet die KI?) ---
TEMPERATURE_CHAT = 0.3 # Haupt-Chat
TEMPERATURE_POLISH = 0.4 # Antwort mit Wissen
# Beispiele:
# 0.1 → sehr präzise, fast immer gleiche Antworten
# 0.5 → ausgewogen (empfohlen)
# 0.9 → kreativ, aber manchmal ungenau
# --- WIEDERHOLUNGSSCHUTZ ---
REPETITION_PENALTY = 1.15 # Wie stark Wiederholungen bestraft werden
NO_REPEAT_NGRAM_SIZE = 3 # Keine X Wörter hintereinander wiederholen
# Beispiele repetition_penalty:
# 1.0 → kein Schutz (kann sich wiederholen)
# 1.15 → leichter Schutz (empfohlen)
# 1.4 → starker Schutz (kann Antwort unnatürlich machen)
# --- CHAT-GEDÄCHTNIS (wie viele Nachrichten merkt sie sich?) ---
MAX_CHAT_HISTORY = 10 # Maximale gespeicherte Nachrichten
MAX_CONTEXT_TURNS = 5 # Wie viele Nachrichten als Kontext genutzt werden
# Beispiele:
# 6 → kurzes Gedächtnis, sneller
# 10 → mittleres Gedächtnis (empfohlen)
# 20 → langes Gedächtnis (braucht mehr Rechenleistung)
# --- KI-PERSÖNLICHKEIT ---
AI_NAME = "RedJul2110"
FALLBACK_NO_INFO = "Dazu habe ich gerade keine sichere Antwort."
SYSTEM_PROMPT_ZUSATZ = "Du bist ein brillanter KI-Assistent. Du kannst sehr gut programmieren und Code-Beispiele liefern, aber du kannst auch bei jeglichen anderen Themen helfen. Du antwortest immer klar und kurz."
# --- WISSENSDATENBANK ---
USE_QWEN_POLISH = True # True = KI verfeinert Antworten mit Wissen
DB_DIRECT_MATCH_THRESHOLD = 0.88 # Wie ähnlich muss eine Frage sein? (0.0–1.0)
DB_FACT_MATCH_THRESHOLD = 0.70 # Ab wann gilt ein Fakt als passend? (0.0–1.0)
# Beispiele threshold:
# 0.95 → nur fast identische Treffer (sehr streng)
# 0.88 → gute Balance (empfohlen)
# 0.60 → viele Treffer, auch weniger passende
# =========================================================
# GLOBALE VARIABLEN
# =========================================================
model = None
tokenizer = None
embed_model = None
embed_tokenizer = None
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
knowledge_lock = threading.Lock()
chat_lock = threading.Lock()
api_chat_historie = [] # list[dict]
upload_in_progress = False
letzter_hf_sync = None
letzter_upload = None
letzte_wissensänderung = None
letzte_api_latenz = None
letzter_fehler = None
# =========================================================
# HILFSFUNKTIONEN
# =========================================================
def now_str():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def log_line(message):
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(f"[{now_str()}] {message}\n")
except:
pass
def log_error(where, exc):
global letzter_fehler
letzter_fehler = f"{where}: {exc}"
log_line(f"[ERROR] {where}: {exc}\n{traceback.format_exc()}")
def normalize_text(text):
text = (text or "").lower().strip()
text = (
text.replace("ä", "ae")
.replace("ö", "oe")
.replace("ü", "ue")
.replace("ß", "ss")
)
text = re.sub(r"[^a-z0-9]+", " ", text)
return re.sub(r"\s+", " ", text).strip()
def text_tokens(text):
stopwords = {
"der", "die", "das", "ein", "eine", "einer", "eines", "und", "oder", "ist",
"sind", "war", "waren", "wie", "was", "wer", "wo", "wann", "warum", "wieso",
"woher", "wieviel", "wieviele", "im", "in", "am", "an", "zu", "mit", "von",
"für", "auf", "aus", "den", "dem", "des", "ich", "du", "er", "sie", "es",
"man", "nicht", "nur", "auch", "noch"
}
tokens = normalize_text(text).split()
return {t for t in tokens if t and t not in stopwords}
def ensure_json_list_file(path):
if not os.path.exists(path):
save_json_list(path, [])
def load_json_list(path):
if not os.path.exists(path):
return []
try:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, list) else []
except:
return []
def save_json_list(path, data):
tmp = path + ".tmp"
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(tmp, path)
def format_entry(item, idx=None):
titel = item.get("frage", "").strip()
text = item.get("antwort", "").strip()
kategorie = item.get("kategorie", "").strip()
quelle = item.get("quelle", "").strip()
created = item.get("created_at", "").strip()
out = []
if idx is None:
out.append(f"{titel}")
else:
out.append(f"{idx}. {titel}")
if kategorie:
out.append(f"[Kategorie: {kategorie}]")
if quelle:
out.append(f"[Quelle: {quelle}]")
if created:
out.append(f"[Zeit: {created}]")
out.append(text)
return "\n".join(out)
def history_to_context(history, max_turns=MAX_CONTEXT_TURNS):
if not history:
return ""
lines = []
if isinstance(history[0], tuple):
for user, assistant in history[-max_turns:]:
lines.append(f"User: {user}")
lines.append(f"Assistant: {assistant}")
return "\n".join(lines)
recent = history[-max_turns * 2:]
for msg in recent:
role = msg.get("role", "")
content = msg.get("content", "")
lines.append(f"{role}: {content}")
return "\n".join(lines)
def api_history_to_pairs(messages):
pairs = []
pending_user = None
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
if role == "user":
pending_user = content
elif role == "assistant" and pending_user is not None:
pairs.append((pending_user, content))
pending_user = None
return pairs
def trim_api_history(max_messages=MAX_CHAT_HISTORY):
global api_chat_historie
if len(api_chat_historie) > max_messages:
api_chat_historie = api_chat_historie[-max_messages:]
def looks_like_factual_question(text):
t = normalize_text(text)
return "?" in text or t.startswith((
"was", "wer", "wie", "wann", "wo", "warum", "wieso",
"welche", "welcher", "welches", "nenn", "nenne", "erklaer", "erklär"
))
def compress_text(text, max_chars=220):
text = (text or "").strip()
if not text:
return ""
text = re.sub(r"\s+", " ", text)
if len(text) <= max_chars:
return text
cut = text[:max_chars].rsplit(" ", 1)[0].strip()
return cut + "..."
def is_generic_or_placeholder_answer(text):
t = normalize_text(text)
if not t:
return True
markers = [
"interessante frage",
"was moechtest du wissen",
"was möchtest du wissen",
"ich bin sehr praezise",
"ich bin sehr präzise",
"dazu habe ich nichts",
"was genau",
"wie kann ich helfen",
"ruf mich an",
"ich kann dir",
"hallo, ich bin",
"hallo ich bin",
"hier ist mein angebot",
]
return any(m in t for m in markers)
def dedupe_facts(facts):
seen = set()
unique = []
for item in facts:
key = (
normalize_text(item.get("frage", "")),
normalize_text(item.get("antwort", "")),
normalize_text(item.get("quelle", "")),
)
if key not in seen:
seen.add(key)
unique.append(item)
return unique
# =========================================================
# KNOWLEDGE / DATENBANK
# =========================================================
def load_wissen():
ensure_json_list_file(WISSEN_FILE)
return load_json_list(WISSEN_FILE)
def sync_wissen_from_hf():
global letzter_hf_sync
ensure_json_list_file(WISSEN_FILE)
if not HF_TOKEN:
log_line("[WARN] HF_TOKEN fehlt. Lokale Datei wird genutzt.")
return False, "HF_TOKEN fehlt. Lokale Datei wird genutzt."
try:
remote_path = hf_hub_download(
repo_id=HF_DATASET,
filename="wissen.json",
repo_type="dataset",
token=HF_TOKEN,
force_download=True
)
remote_data = load_json_list(remote_path)
save_json_list(WISSEN_FILE, remote_data)
letzter_hf_sync = now_str()
return True, f"✅ Wissen aus HF geladen ({len(remote_data)} Einträge)."
except Exception as e:
msg = str(e)
if "Entry Not Found" in msg or "404" in msg or "not found" in msg.lower():
return False, "ℹ️ Im HF-Dataset gibt es noch keine wissen.json. Lokale Datei wird genutzt."
log_error("sync_wissen_from_hf", e)
return False, f"⚠️ HF-Sync fehlgeschlagen, lokale Datei bleibt aktiv: {e}"
def upload_wissen_background():
global upload_in_progress, letzter_upload
if not HF_TOKEN:
log_line("[WARN] Upload übersprungen, weil HF_TOKEN fehlt.")
return
upload_in_progress = True
try:
upload_file(
path_or_fileobj=WISSEN_FILE,
path_in_repo="wissen.json",
repo_id=HF_DATASET,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Update wissen.json ({now_str()})"
)
letzter_upload = now_str()
log_line("[OK] wissen.json erfolgreich hochgeladen.")
except Exception as e:
log_error("upload_wissen_background", e)
finally:
upload_in_progress = False
def init_embed_model():
global embed_tokenizer, embed_model
if embed_model is not None:
return
try:
log_line("[INFO] Lade Embedding-Modell für semantische Suche...")
embed_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
embed_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2").to(device)
embed_model.eval()
log_line("[INFO] Embedding-Modell erfolgreich geladen.")
except Exception as e:
log_error("init_embed_model", e)
def get_embedding(text):
init_embed_model()
if embed_model is None or embed_tokenizer is None or not text:
return []
try:
inputs = embed_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
with torch.no_grad():
outputs = embed_model(**inputs)
# Mean Pooling
emb = outputs.last_hidden_state.mean(dim=1)
return emb[0].cpu().numpy().tolist()
except Exception as e:
log_error("get_embedding", e)
return []
def cosine_similarity(v1, v2):
if not v1 or not v2 or len(v1) != len(v2):
return 0.0
import math
dot = sum(a * b for a, b in zip(v1, v2))
mag1 = math.sqrt(sum(a * a for a in v1))
mag2 = math.sqrt(sum(b * b for b in v2))
if mag1 == 0 or mag2 == 0:
return 0.0
return dot / (mag1 * mag2)
def db_match_score(query, item, query_emb=None):
q_norm = normalize_text(query)
frage = normalize_text(item.get("frage", ""))
antwort = item.get("antwort", "")
kategorie = normalize_text(item.get("kategorie", ""))
quelle = normalize_text(item.get("quelle", ""))
blob_norm = normalize_text(f"{item.get('frage', '')} {antwort} {item.get('kategorie','')} {item.get('quelle','')}")
if not q_norm or not frage:
return 0.0
seq = SequenceMatcher(None, q_norm, frage).ratio()
q_tokens = text_tokens(q_norm)
c_tokens = text_tokens(blob_norm)
token_score = len(q_tokens & c_tokens) / max(len(q_tokens), 1)
semantic_score = 0.0
item_emb = item.get("embedding", [])
if query_emb and item_emb:
semantic_score = cosine_similarity(query_emb, item_emb)
bonus = 0.0
if q_norm == frage:
bonus += 0.35
if q_norm in blob_norm:
bonus += 0.12
if quelle and q_norm == quelle:
bonus += 0.15
if kategorie and q_norm == kategorie:
bonus += 0.08
return max((seq * 0.6) + (token_score * 0.4) + bonus, semantic_score + bonus)
def exact_db_answer(user_message):
q = normalize_text(user_message)
if not q:
return None
data = load_wissen()
for item in data:
frage = normalize_text(item.get("frage", ""))
quelle = normalize_text(item.get("quelle", ""))
antwort = item.get("antwort", "").strip()
if q == frage or (quelle and q == quelle):
return antwort
return None
def best_db_answer(user_message, threshold=DB_DIRECT_MATCH_THRESHOLD):
exact = exact_db_answer(user_message)
if exact:
return exact
data = load_wissen()
if not data:
return None
best_item = None
best_score = 0.0
query_emb = get_embedding(user_message)
for item in data:
score = db_match_score(user_message, item, query_emb=query_emb)
if score > best_score:
best_score = score
best_item = item
if best_item is not None and best_score >= threshold:
return best_item.get("antwort", "").strip()
return None
def find_relevant_facts(query, max_items=6, min_score=DB_FACT_MATCH_THRESHOLD):
data = load_wissen()
if not data:
return []
scored = []
query_emb = get_embedding(query)
for item in data:
score = db_match_score(query, item, query_emb=query_emb)
if score >= min_score:
scored.append((score, item))
scored.sort(key=lambda x: x[0], reverse=True)
return [item for _, item in scored[:max_items]]
def get_knowledge_stats():
data = load_wissen()
categories = []
for item in data:
cat = item.get("kategorie", "").strip()
if cat and cat not in categories:
categories.append(cat)
return {
"count": len(data),
"categories": categories[:10],
}
def search_knowledge(query, max_results=8):
query = (query or "").strip()
if not query:
return "❌ Bitte gib einen Suchbegriff ein."
data = load_wissen()
if not data:
return "Keine Einträge vorhanden."
scored = []
query_emb = get_embedding(query)
for item in data:
score = db_match_score(query, item, query_emb=query_emb)
if score >= DB_FACT_MATCH_THRESHOLD:
scored.append((score, item))
scored.sort(key=lambda x: x[0], reverse=True)
matches = [item for _, item in scored[:max_results]]
if not matches:
return "❌ Keine passenden Einträge gefunden."
out = [f"✅ {len(matches)} Treffer:\n"]
for i, item in enumerate(matches, 1):
out.append(format_entry(item, i))
out.append("\n" + "-" * 40 + "\n")
return "\n".join(out).strip()
def delete_knowledge(query):
global letzte_wissensänderung
query = (query or "").strip()
if not query:
return False, "❌ Bitte einen Suchbegriff zum Löschen eingeben."
with knowledge_lock:
sync_wissen_from_hf()
data = load_wissen()
if not data:
return False, "Keine Einträge vorhanden."
new_data = []
removed = []
for item in data:
item_score = db_match_score(query, item)
if item_score >= DB_FACT_MATCH_THRESHOLD:
removed.append(item)
else:
new_data.append(item)
if not removed:
return False, "❌ Nichts gefunden, was gelöscht werden kann."
save_json_list(WISSEN_FILE, new_data)
letzte_wissensänderung = now_str()
threading.Thread(target=upload_wissen_background, daemon=True).start()
return True, f"✅ {len(removed)} Eintrag/Einträge gelöscht."
def delete_all_knowledge(admin_code):
global letzte_wissensänderung
if admin_code != ADMIN_CODE:
return False, "❌ Falscher Admin-Code."
with knowledge_lock:
save_json_list(WISSEN_FILE, [])
letzte_wissensänderung = now_str()
threading.Thread(target=upload_wissen_background, daemon=True).start()
return True, "✅ Alle Wissenseinträge wurden gelöscht."
def save_knowledge_entry(frage, antwort, kategorie="", quelle="", embedding=None):
global letzte_wissensänderung
frage = (frage or "").strip()
antwort = (antwort or "").strip()
kategorie = (kategorie or "").strip()
quelle = (quelle or "").strip()
if not frage or not antwort:
return False, "❌ Thema/Stichwort und Text dürfen nicht leer sein."
with knowledge_lock:
sync_wissen_from_hf()
data = load_wissen()
q_norm = normalize_text(frage)
for item in data:
if normalize_text(item.get("frage", "")) == q_norm:
return False, "ℹ️ Dieser Eintrag ist schon vorhanden."
if embedding is None:
embedding = get_embedding(frage)
entry = {
"frage": frage,
"antwort": antwort,
"kategorie": kategorie,
"quelle": quelle,
"embedding": embedding,
"created_at": now_str()
}
data.append(entry)
save_json_list(WISSEN_FILE, data)
letzte_wissensänderung = now_str()
threading.Thread(target=upload_wissen_background, daemon=True).start()
return True, f"✅ Lokal gespeichert. Upload läuft im Hintergrund.\n\nThema: {frage}"
# =========================================================
# WEB-LINK LERNEN
# =========================================================
def is_private_or_local_host(hostname):
if not hostname:
return True
host = hostname.lower().strip()
if host in {"localhost", "127.0.0.1", "::1"}:
return True
try:
ip = ipaddress.ip_address(host)
return ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved
except:
return False
def extract_webpage_text(url, timeout=15, max_bytes=2_000_000):
parsed = urlparse(url.strip())
if parsed.scheme not in {"http", "https"}:
raise ValueError("Nur http/https Links sind erlaubt.")
if is_private_or_local_host(parsed.hostname):
raise ValueError("Lokale oder private Adressen sind nicht erlaubt.")
req = Request(
url,
headers={
"User-Agent": "Mozilla/5.0 (compatible; KnowledgeBot/1.0)"
}
)
with urlopen(req, timeout=timeout) as resp:
raw = resp.read(max_bytes)
charset = resp.headers.get_content_charset() or "utf-8"
html_text = raw.decode(charset, errors="ignore")
title = ""
m = re.search(r"(?is)
]*>(.*?)", html_text)
if m:
title = html_lib.unescape(m.group(1)).strip()
if BeautifulSoup is not None:
soup = BeautifulSoup(html_text, "html.parser")
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
page_text = soup.get_text(" ", strip=True)
else:
page_text = re.sub(r"(?is)<(script|style|noscript).*?>.*?\1>", " ", html_text)
page_text = re.sub(r"(?is).*?", " ", page_text)
page_text = re.sub(r"(?is)<[^>]+>", " ", page_text)
page_text = html_lib.unescape(page_text)
page_text = re.sub(r"\s+", " ", page_text).strip()
if not title:
title = parsed.netloc
if len(page_text) > 12000:
page_text = page_text[:12000]
return title, page_text
def heuristic_summary(text, max_sentences=5, max_chars=2200):
if not text:
return ""
sentences = re.split(r"(?<=[.!?])\s+", text)
picked = []
total = 0
for s in sentences:
s = s.strip()
if not s:
continue
picked.append(s)
total += len(s)
if len(picked) >= max_sentences or total >= max_chars:
break
summary = " ".join(picked).strip()
return summary[:max_chars].strip()
def summarize_web_text(title, raw_text):
if not raw_text:
return ""
base = heuristic_summary(raw_text)
if not USE_QWEN_POLISH or model is None or tokenizer is None:
return base
messages = [
{
"role": "system",
"content": (
"Du fasst nur den gegebenen Webseitentext zusammen. "
"Erfinde nichts. Keine neuen Fakten. "
"Schreibe 4 bis 8 kurze Sätze auf Deutsch."
)
},
{
"role": "user",
"content": (
f"Titel: {title}\n\n"
f"Webseitentext:\n{raw_text}\n\n"
"Fasse das als kompakten Lerntest zusammen."
)
}
]
try:
out = model_generate(messages, max_new_tokens=180, temperature=0.35, do_sample=True)
out = (out or "").strip()
return out if out else base
except Exception as e:
log_error("summarize_web_text", e)
return base
def save_link_as_knowledge(url, thema="", kategorie="web"):
url = (url or "").strip()
thema = (thema or "").strip()
kategorie = (kategorie or "").strip() or "web"
if not url:
return False, "❌ Bitte einen Link eingeben."
try:
title, raw_text = extract_webpage_text(url)
except Exception as e:
log_error("extract_webpage_text", e)
return False, f"❌ Link konnte nicht gelesen werden: {e}"
if not raw_text or len(raw_text) < 50:
return False, "❌ Auf der Seite konnte kein ausreichender Text gefunden werden."
summary = summarize_web_text(title, raw_text)
if not summary or len(summary.strip()) < 30:
summary = heuristic_summary(raw_text, max_sentences=6, max_chars=3000)
if not summary:
return False, "❌ Der Text war zu leer oder unlesbar."
topic = thema or title or url
ok, msg = save_knowledge_entry(
frage=topic,
antwort=summary,
kategorie=kategorie,
quelle=url
)
if ok:
return True, f"✅ Link gelernt!\n\nThema: {topic}\nQuelle: {url}\n\nZusammenfassung:\n{summary}"
return ok, msg
# =========================================================
# CHAT / SPEICHER
# =========================================================
def load_chat_history():
ensure_json_list_file(CHAT_FILE)
return load_json_list(CHAT_FILE)
def save_chat_history(history):
save_json_list(CHAT_FILE, history)
def get_chat_session(session_id="default"):
global api_chat_historie
if not isinstance(api_chat_historie, dict):
if isinstance(api_chat_historie, list):
api_chat_historie = {"default": api_chat_historie}
else:
api_chat_historie = {"default": []}
if session_id not in api_chat_historie:
api_chat_historie[session_id] = []
return api_chat_historie[session_id]
def reset_chat_history():
global api_chat_historie
with chat_lock:
api_chat_historie = {"default": []}
save_chat_history(api_chat_historie)
log_line("[CHAT] Chat-Historie zurückgesetzt.")
return True, "✅ Chat-Historie gelöscht."
def chat_history_status():
history = load_chat_history()
if not history:
return "Chat-Historie ist leer."
out = [f"📜 Gespeicherte Nachrichten: {len(history)}\n"]
for i, msg in enumerate(history[-12:], 1):
role = msg.get("role", "?")
content = msg.get("content", "")
out.append(f"{i}. {role}: {content[:250]}")
out.append("\n")
return "\n".join(out).strip()
def load_visible_chat_history_for_ui():
pairs = api_history_to_pairs(load_chat_history())
return pairs, pairs
# =========================================================
# MODEL / QWEN
# =========================================================
def init_model_if_needed():
global model, tokenizer, device
if model is not None and tokenizer is not None:
return
print("=" * 60)
print("🤖 Initialisiere Modell")
print("=" * 60)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
print(f"--- Lade Modell im {LOAD_IN_BITS}-Bit Modus ---")
if LOAD_IN_BITS == 4:
# 4-Bit Modus (sehr sparsam, ca. 5GB RAM)
from transformers import BitsAndBytesConfig
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
llm_int8_enable_fp32_cpu_offload=True
)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
quantization_config=bnb_config,
device_map="auto",
low_cpu_mem_usage=True,
token=HF_TOKEN,
offload_folder="offload"
)
elif LOAD_IN_BITS == 8:
from transformers import BitsAndBytesConfig
# Hier ist der entscheidende Fix für den Fehler:
bnb_config_8 = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_enable_fp32_cpu_offload=True
)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
quantization_config=bnb_config_8,
device_map="auto", # WICHTIG: Verteilt es jetzt auf GPU UND CPU
low_cpu_mem_usage=True,
token=HF_TOKEN
)
model.eval()
print(f"✅ Modell geladen auf: {device}")
def format_messages_for_model(messages_history):
if tokenizer is None:
return ""
try:
return tokenizer.apply_chat_template(
messages_history,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False
)
except TypeError:
# Fallback für ältere Modelle ohne enable_thinking
return tokenizer.apply_chat_template(
messages_history,
tokenize=False,
add_generation_prompt=True
)
except Exception:
lines = []
for m in messages_history:
role = m.get("role", "user").capitalize()
content = m.get("content", "")
lines.append(f"{role}: {content}")
lines.append("Assistant:")
return "\n".join(lines)
def is_repetitive_output(text, threshold=0.6):
"""Prüft ob der generierte Text sich selbst stark wiederholt."""
if not text or len(text) < 40:
return False
words = text.split()
if len(words) < 10:
return False
half = len(words) // 2
first_half = " ".join(words[:half])
second_half = " ".join(words[half:])
ratio = SequenceMatcher(None, first_half, second_half).ratio()
return ratio > threshold
def model_generate(messages_history, max_new_tokens=120, temperature=0.6, do_sample=True):
if model is None or tokenizer is None:
return ""
prompt_text = format_messages_for_model(messages_history)
if not prompt_text:
return ""
inputs = tokenizer(
[prompt_text],
return_tensors="pt",
truncation=True,
max_length=2048
).to(device)
attention_mask = inputs.get("attention_mask", None)
with torch.no_grad():
output = model.generate(
inputs.input_ids,
attention_mask=attention_mask,
max_new_tokens=max_new_tokens,
do_sample=do_sample,
temperature=temperature,
top_p=0.90,
top_k=40,
repetition_penalty=REPETITION_PENALTY,
no_repeat_ngram_size=NO_REPEAT_NGRAM_SIZE,
pad_token_id=tokenizer.eos_token_id,
eos_token_id=tokenizer.eos_token_id
)
new_tokens = output[0][inputs.input_ids.shape[-1]:]
text = tokenizer.decode(new_tokens, skip_special_tokens=True).strip()
# Qwen3 Thinking-Blöcke entfernen (...)
text = re.sub(r"[\s\S]*?", "", text).strip()
# Auch abgeschnittene think-Tags entfernen
text = re.sub(r"[\s\S]*", "", text).strip()
# Anti-Loop: Falls der Output sich extrem wiederholt, leere Antwort zurückgeben
if is_repetitive_output(text):
log_line("[WARN] Wiederholende Ausgabe erkannt, wird verworfen.")
return ""
return text
def model_generate_stream(messages_history, max_new_tokens=120, temperature=0.6, do_sample=True):
if model is None or tokenizer is None:
yield "Modell nicht geladen."
return
prompt_text = format_messages_for_model(messages_history)
if not prompt_text:
return
inputs = tokenizer(
[prompt_text],
return_tensors="pt",
truncation=True,
max_length=2048
).to(device)
attention_mask = inputs.get("attention_mask", None)
streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {
"input_ids": inputs.input_ids,
"attention_mask": attention_mask,
"max_new_tokens": max_new_tokens,
"do_sample": do_sample,
"temperature": temperature,
"top_p": 0.90,
"top_k": 40,
"repetition_penalty": REPETITION_PENALTY,
"no_repeat_ngram_size": NO_REPEAT_NGRAM_SIZE,
"pad_token_id": tokenizer.eos_token_id,
"eos_token_id": tokenizer.eos_token_id,
"streamer": streamer
}
thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
generated_text = ""
for new_text in streamer:
generated_text += new_text
yield generated_text.strip()
def build_system_prompt(user_message=""):
facts = find_relevant_facts(user_message, max_items=6)
if not facts:
facts = load_wissen()[:6]
fact_lines = []
for idx, item in enumerate(facts, 1):
fact_lines.append(
f"Fakt {idx}:\n"
f"Thema: {item.get('frage', '')}\n"
f"Text: {item.get('antwort', '')}"
)
fact_block = "\n\n".join(fact_lines) if fact_lines else "Keine gespeicherten Fakten vorhanden."
return f"""Du bist {AI_NAME}, ein hilfreicher KI-Assistent. Antworte kurz und klar auf Deutsch.
Bekannte Fakten:
{fact_block}"""
def get_system_prompt():
return build_system_prompt("")
def compose_draft_from_facts(facts):
if not facts:
return ""
pieces = []
for item in facts[:4]:
topic = (item.get("frage", "") or "").strip()
ans = compress_text(item.get("antwort", ""), 220)
if topic and ans:
pieces.append(f"{topic}: {ans}")
elif ans:
pieces.append(ans)
if not pieces:
return ""
random.shuffle(pieces)
return "\n".join(pieces)
def polish_with_model(user_message, draft, facts, history_context=""):
if not USE_QWEN_POLISH:
return draft
if model is None or tokenizer is None:
return draft
fact_lines = []
for idx, item in enumerate(facts, 1):
fact_lines.append(
f"{idx}. Thema: {item.get('frage', '')}\n"
f" Zusatzwissen: {compress_text(item.get('antwort', ''), 260)}"
)
fact_block = "\n".join(fact_lines) if fact_lines else "Keine zusätzlichen Fakten."
extra = f"\nZusatzinfo: {draft}" if draft else ""
messages = [
{
"role": "system",
"content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln."
},
{
"role": "user",
"content": f"{user_message}{extra}"
}
]
try:
out = model_generate(messages, max_new_tokens=MAX_NEW_TOKENS_POLISH, temperature=TEMPERATURE_POLISH, do_sample=True)
if not out:
return draft
return out.strip()
except Exception as e:
log_error("polish_with_model", e)
return draft
def general_chat_reply(user_message, history_context=""):
if model is None or tokenizer is None:
return "Dazu habe ich gerade keine sichere Antwort."
messages = [
{
"role": "system",
"content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln. Bei Unsicherheit: 'Ich bin nicht sicher, aber ich glaube...'"
},
{
"role": "user",
"content": user_message
}
]
try:
out = model_generate(messages, max_new_tokens=MAX_NEW_TOKENS_CHAT, temperature=TEMPERATURE_CHAT, do_sample=True)
return (out or "").strip() or "Dazu habe ich gerade keine sichere Antwort."
except Exception as e:
log_error("general_chat_reply", e)
return "Dazu habe ich gerade keine sichere Antwort."
def generate_reply(user_message, history_context=""):
query = f"{user_message} {history_context}".strip()
facts = find_relevant_facts(query, max_items=6)
facts = dedupe_facts(facts)
exact = exact_db_answer(user_message)
if exact and not is_generic_or_placeholder_answer(exact):
extra_fact = {
"frage": user_message,
"antwort": exact,
"kategorie": "",
"quelle": "",
"created_at": ""
}
facts = dedupe_facts([extra_fact] + facts)
draft = compose_draft_from_facts(facts)
if facts and len(facts) > 0:
reply = polish_with_model(user_message, draft, facts, history_context)
# Wenn das Polieren geklappt hat und keine Standard-Floskel ist, nimm es
if reply and not is_generic_or_placeholder_answer(reply):
return reply
# Falls keine Fakten da sind oder das Polieren Mist war: Normaler Chat
return general_chat_reply(user_message, history_context)
def general_chat_reply_stream(user_message, history_context=""):
if model is None or tokenizer is None:
yield "Dazu habe ich gerade keine sichere Antwort."
return
messages = [
{
"role": "system",
"content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln. Bei Unsicherheit: 'Ich bin nicht sicher, aber ich glaube...'"
},
{
"role": "user",
"content": user_message
}
]
try:
for chunk in model_generate_stream(messages, max_new_tokens=MAX_NEW_TOKENS_CHAT, temperature=TEMPERATURE_CHAT, do_sample=True):
yield chunk or "Dazu habe ich gerade keine sichere Antwort."
except Exception as e:
log_error("general_chat_reply_stream", e)
yield "Dazu habe ich gerade keine sichere Antwort."
def generate_reply_stream(user_message, history_context=""):
query = f"{user_message} {history_context}".strip()
facts = find_relevant_facts(query, max_items=6)
facts = dedupe_facts(facts)
exact = exact_db_answer(user_message)
if exact and not is_generic_or_placeholder_answer(exact):
extra_fact = {
"frage": user_message,
"antwort": exact,
"kategorie": "",
"quelle": "",
"created_at": ""
}
facts = dedupe_facts([extra_fact] + facts)
draft = compose_draft_from_facts(facts)
if facts and len(facts) > 0:
reply = polish_with_model(user_message, draft, facts, history_context)
if reply and not is_generic_or_placeholder_answer(reply):
yield reply
return
# Normaler Chat Stream
for chunk in general_chat_reply_stream(user_message, history_context):
yield chunk
# =========================================================
# API
# =========================================================
def gradio_simple_api(user_message, session_id="default"):
global api_chat_historie, letzte_api_latenz
start = time.perf_counter()
session_id = session_id or "default"
with chat_lock:
history = get_chat_session(session_id)
history_context = history_to_context(history)
reply = generate_reply(user_message, history_context=history_context)
history.append({"role": "user", "content": user_message})
history.append({"role": "assistant", "content": reply})
if len(history) > MAX_CHAT_HISTORY:
api_chat_historie[session_id] = history[-MAX_CHAT_HISTORY:]
save_chat_history(api_chat_historie)
log_line(f"[USER] {user_message} (Session: {session_id})")
log_line(f"[ASSISTANT] {reply}")
letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms"
return reply
def gradio_stream_api(user_message, session_id="default"):
global api_chat_historie, letzte_api_latenz
start = time.perf_counter()
session_id = session_id or "default"
with chat_lock:
history = get_chat_session(session_id)
history_context = history_to_context(history)
reply = ""
for chunk in generate_reply_stream(user_message, history_context=history_context):
reply = chunk
yield reply
with chat_lock:
history = get_chat_session(session_id)
history.append({"role": "user", "content": user_message})
history.append({"role": "assistant", "content": reply})
if len(history) > MAX_CHAT_HISTORY:
api_chat_historie[session_id] = history[-MAX_CHAT_HISTORY:]
save_chat_history(api_chat_historie)
log_line(f"[USER] {user_message} (Session: {session_id})")
log_line(f"[ASSISTANT] {reply}")
letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms"
# =========================================================
# UI FUNKTIONEN
# =========================================================
def ui_zeige_status():
facts = load_wissen()
stats = get_knowledge_stats()
chat_entries = len(load_chat_history())
return f"""🟢 SYSTEM ONLINE
🤖 Modell: {MODEL_NAME}
🖥️ Device: {device}
🏠 Space: RedJul2110/MyfirstAI
📦 Datenbank: {HF_DATASET}
💾 Gespeicherte Fakten: {len(facts)}
🗂️ Kategorien: {", ".join(stats["categories"]) if stats["categories"] else "keine"}
💬 Gespeicherte Chat-Nachrichten: {chat_entries}
⏱️ Letzte API-Antwortzeit: {letzte_api_latenz if letzte_api_latenz else "noch keine"}
🔁 Letzter HF-Sync: {letzter_hf_sync if letzter_hf_sync else "noch keiner"}
⬆️ Letzter Upload: {letzter_upload if letzter_upload else "noch keiner"}
🧠 Letzte Wissensänderung: {letzte_wissensänderung if letzte_wissensänderung else "noch keine"}
🔄 Upload läuft: {"ja" if upload_in_progress else "nein"}
⚠️ Letzter Fehler: {letzter_fehler if letzter_fehler else "keiner"}
Lokale Wissensdatei: {WISSEN_FILE}
Chat-Datei: {CHAT_FILE}
Log-Datei: {LOG_FILE}
"""
def ui_sync_wissen():
ok, msg = sync_wissen_from_hf()
return msg
def ui_web_lernen(passwort, frage, antwort, kategorie):
if passwort != ADMIN_CODE:
return "❌ Zugriff verweigert! Falscher Admin-Code."
ok, msg = save_knowledge_entry(frage, antwort, kategorie)
return msg
def ui_link_lernen_multi(passwort, urls_text, topics_text, cats_text):
if passwort != ADMIN_CODE:
return "❌ Zugriff verweigert! Falscher Admin-Code."
urls = [u.strip() for u in urls_text.replace(",", "\n").split("\n") if u.strip()]
if not urls:
return "❌ Keine gültigen URLs gefunden."
topics = [t.strip() for t in (topics_text or "").replace(",", "\n").split("\n")]
cats = [c.strip() for c in (cats_text or "").replace(",", "\n").split("\n")]
while len(topics) < len(urls):
topics.append("")
while len(cats) < len(urls):
cats.append("")
results = []
for u, t, c in zip(urls, topics, cats):
if not u: continue
ok, msg = save_link_as_knowledge(u, t, c)
results.append(f"[{u}]: {msg}")
return "\n\n".join(results)
def extract_pdf_text_fallback(filepath):
try:
import PyPDF2
text = ""
with open(filepath, "rb") as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text.strip()
except ImportError:
return "ERROR_NO_PYPDF2"
except Exception as e:
return f"ERROR_READ: {e}"
def ui_pdf_lernen(passwort, file_obj, thema, kategorie):
if passwort != ADMIN_CODE:
return "❌ Zugriff verweigert! Falscher Admin-Code."
if file_obj is None:
return "❌ Bitte eine Datei hochladen."
filepath = file_obj if type(file_obj) is str else getattr(file_obj, "name", None)
if not filepath:
return "❌ Dateipfad konnte nicht ermittelt werden."
text = extract_pdf_text_fallback(filepath)
if text == "ERROR_NO_PYPDF2":
return "❌ Das Paket 'PyPDF2' fehlt. Bitte füge 'PyPDF2' zu deiner 'requirements.txt' in Hugging Face hinzu!"
elif text.startswith("ERROR_READ:"):
return f"❌ Fehler beim Lesen: {text}"
if len(text) < 50:
return "❌ In der PDF konnte kein/kaum Text gefunden werden."
topic = thema or "PDF Dokument"
cat = kategorie or "dokument"
summary = summarize_web_text(topic, text)
if not summary or len(summary.strip()) < 30:
summary = heuristic_summary(text, max_sentences=6, max_chars=3000)
ok, msg = save_knowledge_entry(frage=topic, antwort=summary, kategorie=cat, quelle="PDF Upload")
return msg
def ui_wissen_suchen(suchbegriff):
return search_knowledge(suchbegriff)
def ui_wissen_loeschen(passwort, suchbegriff):
if passwort != ADMIN_CODE:
return "❌ Zugriff verweigert! Falscher Admin-Code."
ok, msg = delete_knowledge(suchbegriff)
return msg
def ui_wissen_alle_loeschen(passwort):
if passwort != ADMIN_CODE:
return "❌ Zugriff verweigert! Falscher Admin-Code."
ok, msg = delete_all_knowledge(passwort)
return msg
def ui_chat_send(user_message, visible_history):
global api_chat_historie, letzte_api_latenz
user_message = (user_message or "").strip()
if not user_message:
return "", visible_history, visible_history
start = time.perf_counter()
if visible_history is None:
visible_history = []
history_context = history_to_context(visible_history)
reply = generate_reply(user_message, history_context=history_context)
visible_history = visible_history + [(user_message, reply)]
with chat_lock:
api_chat_historie.append({"role": "user", "content": user_message})
api_chat_historie.append({"role": "assistant", "content": reply})
trim_api_history(10)
save_chat_history(api_chat_historie)
log_line(f"[CHAT USER] {user_message}")
log_line(f"[CHAT BOT] {reply}")
letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms"
return reply, visible_history, visible_history
def ui_chat_reset():
global api_chat_historie
ok, msg = reset_chat_history()
api_chat_historie = []
return "", [], [], msg
def ui_chat_status():
return chat_history_status()
def load_visible_chat_history_for_ui():
pairs = api_history_to_pairs(load_chat_history())
return pairs, pairs
# =========================================================
# APP
# =========================================================
def erzeuge_gradio_app():
custom_css = """
body { background: linear-gradient(135deg, #0f2027, #203a43, #2c5364); color: #fff; font-family: 'Inter', sans-serif; }
.gradio-container { background: rgba(255, 255, 255, 0.05); backdrop-filter: blur(15px); border-radius: 12px; border: 1px solid rgba(255,255,255,0.1); box-shadow: 0 8px 32px 0 rgba(0, 0, 0, 0.37); }
button.primary { background: linear-gradient(90deg, #00C9FF 0%, #92FE9D 100%); border: none; color: black; font-weight: bold; }
button.primary:hover { transform: translateY(-2px); box-shadow: 0 5px 15px rgba(0,201,255,0.4); }
"""
with gr.Blocks(title="KI Status", theme=gr.themes.Soft(), css=custom_css) as demo:
hidden_msg = gr.Textbox(value="", visible=False)
hidden_session = gr.Textbox(value="default", visible=False)
hidden_out = gr.Textbox(value="", visible=False)
api_trigger = gr.Button(visible=False)
api_trigger.click(
gradio_simple_api,
inputs=[hidden_msg, hidden_session],
outputs=[hidden_out],
api_name="predict"
)
api_trigger_stream = gr.Button(visible=False)
api_trigger_stream.click(
gradio_stream_api,
inputs=[hidden_msg, hidden_session],
outputs=[hidden_out],
api_name="stream"
)
gr.Markdown("# 🤖 KI Status")
gr.Markdown("Die KI nutzt zuerst ihre eigenen Antworten. Gefundene Fakten aus der Datenbank dienen nur als Zusatzwissen.")
with gr.Tab("📊 System Status"):
status_text = gr.Textbox(label="Systembericht", lines=16, interactive=False)
with gr.Row():
refresh_btn = gr.Button("Status aktualisieren")
sync_btn = gr.Button("Wissen von HF neu laden")
refresh_btn.click(ui_zeige_status, outputs=status_text)
sync_btn.click(ui_sync_wissen, outputs=status_text)
demo.load(ui_zeige_status, outputs=status_text)
with gr.Tab("🔒 Admin-Bereich"):
login_col = gr.Column(visible=True)
admin_col = gr.Column(visible=False)
with login_col:
gr.Markdown("### Bitte Admin-Code eingeben, um Einstellungen und Lern-Tools freizuschalten.")
admin_pw = gr.Textbox(label="Admin-Code", type="password")
login_btn = gr.Button("Login", variant="primary")
login_err = gr.Markdown(visible=False)
with admin_col:
with gr.Tabs():
with gr.Tab("🧠 Wissen lernen"):
gr.Markdown("Speichere neue Fakten direkt in die Datenbank.")
q_input = gr.Textbox(label="Thema / Stichwort", placeholder="z. B. Frankreich, Mars")
a_input = gr.Textbox(label="Text", placeholder="Langer Infotext", lines=6)
k_input = gr.Textbox(label="Kategorie / Bereich (optional)", placeholder="z. B. Geschichte")
lern_btn = gr.Button("Wissen speichern", variant="primary")
lern_out = gr.Textbox(label="Ergebnis", interactive=False)
lern_btn.click(ui_web_lernen, inputs=[admin_pw, q_input, a_input, k_input], outputs=lern_out)
with gr.Tab("🌐 Link lernen (Multi)"):
gr.Markdown("Ein oder mehrere öffentliche Links einfügen (durch neue Zeile getrennt). Die KI liest und lernt diese.")
link_urls = gr.Textbox(label="Links (Eine URL pro Zeile)", placeholder="https://...\nhttps://...", lines=5)
link_topic = gr.Textbox(label="Themen (Optional, ein Thema pro Zeile passend zur URL)", lines=5)
link_cat = gr.Textbox(label="Kategorien (Optional, eine Kategorie pro Zeile)", lines=5)
link_btn = gr.Button("Links lernen", variant="primary")
link_out = gr.Textbox(label="Ergebnis", lines=8, interactive=False)
link_btn.click(ui_link_lernen_multi, inputs=[admin_pw, link_urls, link_topic, link_cat], outputs=link_out)
with gr.Tab("📄 PDF lernen"):
gr.Markdown("Lade eine PDF-Datei hoch, um ihren Text zu analysieren und als Wissen zu speichern.")
pdf_file = gr.File(label="PDF Datei", file_types=[".pdf"])
pdf_topic = gr.Textbox(label="Thema / Stichwort (optional)")
pdf_cat = gr.Textbox(label="Kategorie / Bereich (optional)")
pdf_btn = gr.Button("Dokument lernen", variant="primary")
pdf_out = gr.Textbox(label="Ergebnis", lines=6, interactive=False)
pdf_btn.click(ui_pdf_lernen, inputs=[admin_pw, pdf_file, pdf_topic, pdf_cat], outputs=pdf_out)
with gr.Tab("🔍 Suchen / Löschen"):
gr.Markdown("Suche in der Datenbank oder lösche Einträge wieder.")
search_box = gr.Textbox(label="Suchbegriff", placeholder="z. B. Frankreich")
search_btn = gr.Button("Suchen")
search_out = gr.Textbox(label="Treffer", lines=12, interactive=False)
del_box = gr.Textbox(label="Löschen nach Begriff", placeholder="z. B. Frankreich")
del_btn = gr.Button("Löschen", variant="secondary")
del_out = gr.Textbox(label="Lösch-Ergebnis", interactive=False)
all_del_btn = gr.Button("ALLES löschen", variant="stop")
all_del_out = gr.Textbox(label="Alles löschen", interactive=False)
search_btn.click(ui_wissen_suchen, inputs=[search_box], outputs=search_out)
del_btn.click(ui_wissen_loeschen, inputs=[admin_pw, del_box], outputs=del_out)
all_del_btn.click(ui_wissen_alle_loeschen, inputs=[admin_pw], outputs=all_del_out)
def do_login(pw):
if pw == ADMIN_CODE:
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
return gr.update(visible=True), gr.update(visible=False), gr.update(value="**❌ Falscher Admin-Code**", visible=True)
login_btn.click(do_login, inputs=[admin_pw], outputs=[login_col, admin_col, login_err])
demo.queue(default_concurrency_limit=8)
return demo
# =========================================================
# LOKALER CHAT (FALLBACK)
# =========================================================
def local_terminal_chat():
print("Lokaler Chat gestartet. Tippe 'exit' zum Beenden.")
while True:
user = input("Du: ").strip()
if user.lower() in {"exit", "quit", "ende"}:
break
if not user:
continue
reply = generate_reply(user)
print("Bot:", reply)
# =========================================================
# BOOTSTRAP
# =========================================================
def bootstrap():
global api_chat_historie
ensure_json_list_file(WISSEN_FILE)
ensure_json_list_file(CHAT_FILE)
sync_wissen_from_hf()
api_chat_historie = load_chat_history()
init_model_if_needed()
if os.environ.get("SPACE_ID"):
app = erzeuge_gradio_app()
app.queue().launch()
else:
local_terminal_chat()
if __name__ == "__main__":
bootstrap()