Spaces:
Running
Running
| import os | |
| import json | |
| import re | |
| import time | |
| import sys | |
| import traceback | |
| import threading | |
| import html as html_lib | |
| import ipaddress | |
| import random | |
| from datetime import datetime | |
| from urllib.parse import urlparse | |
| from urllib.request import Request, urlopen | |
| from urllib.error import URLError, HTTPError | |
| from difflib import SequenceMatcher | |
| import torch | |
| import gradio as gr | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer, AutoModel | |
| from huggingface_hub import hf_hub_download, upload_file | |
| try: | |
| from bs4 import BeautifulSoup | |
| except Exception: | |
| BeautifulSoup = None | |
| # ========================================================= | |
| # KONFIG – Hier kannst du alles einstellen | |
| # ========================================================= | |
| # --- MODELL --- | |
| MODEL_NAME = "Qwen/Qwen2.5-3B-Instruct" | |
| # Beispiele: | |
| # "Qwen/Qwen3-0.6B" → sehr klein, schnell, schwächer | |
| # "Qwen/Qwen3-1.7B" → gute Balance (empfohlen) | |
| # "Qwen/Qwen3-4B" → klüger, braucht mehr RAM | |
| # "Qwen/Qwen2.5-1.5B-Instruct" → älteres Modell, stabil | |
| # --- HUGGING FACE --- | |
| HF_DATASET = "RedJul2110/wissen-datenbank" | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| ADMIN_CODE = os.environ.get("CODE", "") | |
| # --- QUANTISIERUNG (4 für Geschwindigkeit, 8 für Schärfe, 16 für volle Power) --- | |
| # Wähle: 4, 8 oder 16 | |
| LOAD_IN_BITS = 4 # Empfohlen für 16 GB RAM 4 8 16 | |
| # --- DATEIPFADE --- | |
| DATA_DIR = "/data" if os.path.isdir("/data") else "." | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| WISSEN_FILE = os.path.join(DATA_DIR, "wissen.json") | |
| CHAT_FILE = os.path.join(DATA_DIR, "chat_history.json") | |
| LOG_FILE = os.path.join(DATA_DIR, "ai_log.txt") | |
| SETTINGS_FILE = os.path.join(DATA_DIR, "settings.json") | |
| # --- ANTWORTLÄNGE (wie lang darf die KI antworten?) --- | |
| MAX_NEW_TOKENS_CHAT = 80 # Normale Chat-Antwort | |
| MAX_NEW_TOKENS_POLISH = 60 # Antwort mit Wissensdatenbank | |
| MAX_NEW_TOKENS_SUMMARY = 50 # Link-Zusammenfassung | |
| # Beispiele: | |
| # 80 → sehr kurze Antworten (schneller) | |
| # 150 → mittlere Antworten (empfohlen) | |
| # 300 → lange, detaillierte Antworten (langsamer) | |
| # --- KREATIVITÄT (wie kreativ / zufällig antwortet die KI?) --- | |
| TEMPERATURE_CHAT = 0.45 # Haupt-Chat | |
| TEMPERATURE_POLISH = 0.55 # Antwort mit Wissen | |
| # Beispiele: | |
| # 0.1 → sehr präzise, fast immer gleiche Antworten | |
| # 0.5 → ausgewogen (empfohlen) | |
| # 0.9 → kreativ, aber manchmal ungenau | |
| # --- WIEDERHOLUNGSSCHUTZ --- | |
| REPETITION_PENALTY = 1.25 # Wie stark Wiederholungen bestraft werden | |
| NO_REPEAT_NGRAM_SIZE = 3 # Keine X Wörter hintereinander wiederholen | |
| # Beispiele repetition_penalty: | |
| # 1.0 → kein Schutz (kann sich wiederholen) | |
| # 1.15 → leichter Schutz (empfohlen) | |
| # 1.4 → starker Schutz (kann Antwort unnatürlich machen) | |
| # --- CHAT-GEDÄCHTNIS (wie viele Nachrichten merkt sie sich?) --- | |
| MAX_CHAT_HISTORY = 5 # Maximale gespeicherte Nachrichten | |
| MAX_CONTEXT_TURNS = 2 # Wie viele Nachrichten als Kontext genutzt werden | |
| # Beispiele: | |
| # 6 → kurzes Gedächtnis, sneller | |
| # 10 → mittleres Gedächtnis (empfohlen) | |
| # 20 → langes Gedächtnis (braucht mehr Rechenleistung) | |
| # --- KI-PERSÖNLICHKEIT --- | |
| AI_NAME = "RedJul2110" | |
| FALLBACK_NO_INFO = "Dazu habe ich gerade keine sichere Antwort." | |
| SYSTEM_PROMPT_ZUSATZ = "Du bist ein brillanter KI-Assistent. Du kannst sehr gut programmieren und Code-Beispiele liefern." | |
| # --- WISSENSDATENBANK --- | |
| USE_QWEN_POLISH = True # True = KI verfeinert Antworten mit Wissen | |
| DB_DIRECT_MATCH_THRESHOLD = 0.88 # Wie ähnlich muss eine Frage sein? (0.0–1.0) | |
| DB_FACT_MATCH_THRESHOLD = 0.70 # Ab wann gilt ein Fakt als passend? (0.0–1.0) | |
| # Beispiele threshold: | |
| # 0.95 → nur fast identische Treffer (sehr streng) | |
| # 0.88 → gute Balance (empfohlen) | |
| # 0.60 → viele Treffer, auch weniger passende | |
| # ========================================================= | |
| # GLOBALE VARIABLEN | |
| # ========================================================= | |
| model = None | |
| tokenizer = None | |
| embed_model = None | |
| embed_tokenizer = None | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| knowledge_lock = threading.Lock() | |
| chat_lock = threading.Lock() | |
| api_chat_historie = [] # list[dict] | |
| upload_in_progress = False | |
| letzter_hf_sync = None | |
| letzter_upload = None | |
| letzte_wissensänderung = None | |
| letzte_api_latenz = None | |
| letzter_fehler = None | |
| # ========================================================= | |
| # HILFSFUNKTIONEN | |
| # ========================================================= | |
| def now_str(): | |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| def log_line(message): | |
| try: | |
| with open(LOG_FILE, "a", encoding="utf-8") as f: | |
| f.write(f"[{now_str()}] {message}\n") | |
| except: | |
| pass | |
| def log_error(where, exc): | |
| global letzter_fehler | |
| letzter_fehler = f"{where}: {exc}" | |
| log_line(f"[ERROR] {where}: {exc}\n{traceback.format_exc()}") | |
| def normalize_text(text): | |
| text = (text or "").lower().strip() | |
| text = ( | |
| text.replace("ä", "ae") | |
| .replace("ö", "oe") | |
| .replace("ü", "ue") | |
| .replace("ß", "ss") | |
| ) | |
| text = re.sub(r"[^a-z0-9]+", " ", text) | |
| return re.sub(r"\s+", " ", text).strip() | |
| def text_tokens(text): | |
| stopwords = { | |
| "der", "die", "das", "ein", "eine", "einer", "eines", "und", "oder", "ist", | |
| "sind", "war", "waren", "wie", "was", "wer", "wo", "wann", "warum", "wieso", | |
| "woher", "wieviel", "wieviele", "im", "in", "am", "an", "zu", "mit", "von", | |
| "für", "auf", "aus", "den", "dem", "des", "ich", "du", "er", "sie", "es", | |
| "man", "nicht", "nur", "auch", "noch" | |
| } | |
| tokens = normalize_text(text).split() | |
| return {t for t in tokens if t and t not in stopwords} | |
| def ensure_json_list_file(path): | |
| if not os.path.exists(path): | |
| save_json_list(path, []) | |
| def load_json_list(path): | |
| if not os.path.exists(path): | |
| return [] | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data if isinstance(data, list) else [] | |
| except: | |
| return [] | |
| def save_json_list(path, data): | |
| tmp = path + ".tmp" | |
| with open(tmp, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| os.replace(tmp, path) | |
| def format_entry(item, idx=None): | |
| titel = item.get("frage", "").strip() | |
| text = item.get("antwort", "").strip() | |
| kategorie = item.get("kategorie", "").strip() | |
| quelle = item.get("quelle", "").strip() | |
| created = item.get("created_at", "").strip() | |
| out = [] | |
| if idx is None: | |
| out.append(f"{titel}") | |
| else: | |
| out.append(f"{idx}. {titel}") | |
| if kategorie: | |
| out.append(f"[Kategorie: {kategorie}]") | |
| if quelle: | |
| out.append(f"[Quelle: {quelle}]") | |
| if created: | |
| out.append(f"[Zeit: {created}]") | |
| out.append(text) | |
| return "\n".join(out) | |
| def history_to_context(history, max_turns=MAX_CONTEXT_TURNS): | |
| if not history: | |
| return "" | |
| lines = [] | |
| if isinstance(history[0], tuple): | |
| for user, assistant in history[-max_turns:]: | |
| lines.append(f"User: {user}") | |
| lines.append(f"Assistant: {assistant}") | |
| return "\n".join(lines) | |
| recent = history[-max_turns * 2:] | |
| for msg in recent: | |
| role = msg.get("role", "") | |
| content = msg.get("content", "") | |
| lines.append(f"{role}: {content}") | |
| return "\n".join(lines) | |
| def api_history_to_pairs(messages): | |
| pairs = [] | |
| pending_user = None | |
| for msg in messages: | |
| role = msg.get("role", "") | |
| content = msg.get("content", "") | |
| if role == "user": | |
| pending_user = content | |
| elif role == "assistant" and pending_user is not None: | |
| pairs.append((pending_user, content)) | |
| pending_user = None | |
| return pairs | |
| def trim_api_history(max_messages=MAX_CHAT_HISTORY): | |
| global api_chat_historie | |
| if len(api_chat_historie) > max_messages: | |
| api_chat_historie = api_chat_historie[-max_messages:] | |
| def looks_like_factual_question(text): | |
| t = normalize_text(text) | |
| return "?" in text or t.startswith(( | |
| "was", "wer", "wie", "wann", "wo", "warum", "wieso", | |
| "welche", "welcher", "welches", "nenn", "nenne", "erklaer", "erklär" | |
| )) | |
| def compress_text(text, max_chars=220): | |
| text = (text or "").strip() | |
| if not text: | |
| return "" | |
| text = re.sub(r"\s+", " ", text) | |
| if len(text) <= max_chars: | |
| return text | |
| cut = text[:max_chars].rsplit(" ", 1)[0].strip() | |
| return cut + "..." | |
| def is_generic_or_placeholder_answer(text): | |
| t = normalize_text(text) | |
| if not t: | |
| return True | |
| markers = [ | |
| "interessante frage", | |
| "was moechtest du wissen", | |
| "was möchtest du wissen", | |
| "ich bin sehr praezise", | |
| "ich bin sehr präzise", | |
| "dazu habe ich nichts", | |
| "was genau", | |
| "wie kann ich helfen", | |
| "ruf mich an", | |
| "ich kann dir", | |
| "hallo, ich bin", | |
| "hallo ich bin", | |
| "hier ist mein angebot", | |
| ] | |
| return any(m in t for m in markers) | |
| def dedupe_facts(facts): | |
| seen = set() | |
| unique = [] | |
| for item in facts: | |
| key = ( | |
| normalize_text(item.get("frage", "")), | |
| normalize_text(item.get("antwort", "")), | |
| normalize_text(item.get("quelle", "")), | |
| ) | |
| if key not in seen: | |
| seen.add(key) | |
| unique.append(item) | |
| return unique | |
| # ========================================================= | |
| # KNOWLEDGE / DATENBANK | |
| # ========================================================= | |
| def load_wissen(): | |
| ensure_json_list_file(WISSEN_FILE) | |
| return load_json_list(WISSEN_FILE) | |
| def sync_wissen_from_hf(): | |
| global letzter_hf_sync | |
| ensure_json_list_file(WISSEN_FILE) | |
| if not HF_TOKEN: | |
| log_line("[WARN] HF_TOKEN fehlt. Lokale Datei wird genutzt.") | |
| return False, "HF_TOKEN fehlt. Lokale Datei wird genutzt." | |
| try: | |
| remote_path = hf_hub_download( | |
| repo_id=HF_DATASET, | |
| filename="wissen.json", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| force_download=True | |
| ) | |
| remote_data = load_json_list(remote_path) | |
| save_json_list(WISSEN_FILE, remote_data) | |
| letzter_hf_sync = now_str() | |
| return True, f"✅ Wissen aus HF geladen ({len(remote_data)} Einträge)." | |
| except Exception as e: | |
| msg = str(e) | |
| if "Entry Not Found" in msg or "404" in msg or "not found" in msg.lower(): | |
| return False, "ℹ️ Im HF-Dataset gibt es noch keine wissen.json. Lokale Datei wird genutzt." | |
| log_error("sync_wissen_from_hf", e) | |
| return False, f"⚠️ HF-Sync fehlgeschlagen, lokale Datei bleibt aktiv: {e}" | |
| def upload_wissen_background(): | |
| global upload_in_progress, letzter_upload | |
| if not HF_TOKEN: | |
| log_line("[WARN] Upload übersprungen, weil HF_TOKEN fehlt.") | |
| return | |
| upload_in_progress = True | |
| try: | |
| upload_file( | |
| path_or_fileobj=WISSEN_FILE, | |
| path_in_repo="wissen.json", | |
| repo_id=HF_DATASET, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| commit_message=f"Update wissen.json ({now_str()})" | |
| ) | |
| letzter_upload = now_str() | |
| log_line("[OK] wissen.json erfolgreich hochgeladen.") | |
| except Exception as e: | |
| log_error("upload_wissen_background", e) | |
| finally: | |
| upload_in_progress = False | |
| def init_embed_model(): | |
| global embed_tokenizer, embed_model | |
| if embed_model is not None: | |
| return | |
| try: | |
| log_line("[INFO] Lade Embedding-Modell für semantische Suche...") | |
| embed_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| embed_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2").to(device) | |
| embed_model.eval() | |
| log_line("[INFO] Embedding-Modell erfolgreich geladen.") | |
| except Exception as e: | |
| log_error("init_embed_model", e) | |
| def get_embedding(text): | |
| init_embed_model() | |
| if embed_model is None or embed_tokenizer is None or not text: | |
| return [] | |
| try: | |
| inputs = embed_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) | |
| with torch.no_grad(): | |
| outputs = embed_model(**inputs) | |
| # Mean Pooling | |
| emb = outputs.last_hidden_state.mean(dim=1) | |
| return emb[0].cpu().numpy().tolist() | |
| except Exception as e: | |
| log_error("get_embedding", e) | |
| return [] | |
| def cosine_similarity(v1, v2): | |
| if not v1 or not v2 or len(v1) != len(v2): | |
| return 0.0 | |
| import math | |
| dot = sum(a * b for a, b in zip(v1, v2)) | |
| mag1 = math.sqrt(sum(a * a for a in v1)) | |
| mag2 = math.sqrt(sum(b * b for b in v2)) | |
| if mag1 == 0 or mag2 == 0: | |
| return 0.0 | |
| return dot / (mag1 * mag2) | |
| def db_match_score(query, item, query_emb=None): | |
| q_norm = normalize_text(query) | |
| frage = normalize_text(item.get("frage", "")) | |
| antwort = item.get("antwort", "") | |
| kategorie = normalize_text(item.get("kategorie", "")) | |
| quelle = normalize_text(item.get("quelle", "")) | |
| blob_norm = normalize_text(f"{item.get('frage', '')} {antwort} {item.get('kategorie','')} {item.get('quelle','')}") | |
| if not q_norm or not frage: | |
| return 0.0 | |
| seq = SequenceMatcher(None, q_norm, frage).ratio() | |
| q_tokens = text_tokens(q_norm) | |
| c_tokens = text_tokens(blob_norm) | |
| token_score = len(q_tokens & c_tokens) / max(len(q_tokens), 1) | |
| semantic_score = 0.0 | |
| item_emb = item.get("embedding", []) | |
| if query_emb and item_emb: | |
| semantic_score = cosine_similarity(query_emb, item_emb) | |
| bonus = 0.0 | |
| if q_norm == frage: | |
| bonus += 0.35 | |
| if q_norm in blob_norm: | |
| bonus += 0.12 | |
| if quelle and q_norm == quelle: | |
| bonus += 0.15 | |
| if kategorie and q_norm == kategorie: | |
| bonus += 0.08 | |
| return max((seq * 0.6) + (token_score * 0.4) + bonus, semantic_score + bonus) | |
| def exact_db_answer(user_message): | |
| q = normalize_text(user_message) | |
| if not q: | |
| return None | |
| data = load_wissen() | |
| for item in data: | |
| frage = normalize_text(item.get("frage", "")) | |
| quelle = normalize_text(item.get("quelle", "")) | |
| antwort = item.get("antwort", "").strip() | |
| if q == frage or (quelle and q == quelle): | |
| return antwort | |
| return None | |
| def best_db_answer(user_message, threshold=DB_DIRECT_MATCH_THRESHOLD): | |
| exact = exact_db_answer(user_message) | |
| if exact: | |
| return exact | |
| data = load_wissen() | |
| if not data: | |
| return None | |
| best_item = None | |
| best_score = 0.0 | |
| query_emb = get_embedding(user_message) | |
| for item in data: | |
| score = db_match_score(user_message, item, query_emb=query_emb) | |
| if score > best_score: | |
| best_score = score | |
| best_item = item | |
| if best_item is not None and best_score >= threshold: | |
| return best_item.get("antwort", "").strip() | |
| return None | |
| def find_relevant_facts(query, max_items=6, min_score=DB_FACT_MATCH_THRESHOLD): | |
| data = load_wissen() | |
| if not data: | |
| return [] | |
| scored = [] | |
| query_emb = get_embedding(query) | |
| for item in data: | |
| score = db_match_score(query, item, query_emb=query_emb) | |
| if score >= min_score: | |
| scored.append((score, item)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| return [item for _, item in scored[:max_items]] | |
| def get_knowledge_stats(): | |
| data = load_wissen() | |
| categories = [] | |
| for item in data: | |
| cat = item.get("kategorie", "").strip() | |
| if cat and cat not in categories: | |
| categories.append(cat) | |
| return { | |
| "count": len(data), | |
| "categories": categories[:10], | |
| } | |
| def search_knowledge(query, max_results=8): | |
| query = (query or "").strip() | |
| if not query: | |
| return "❌ Bitte gib einen Suchbegriff ein." | |
| data = load_wissen() | |
| if not data: | |
| return "Keine Einträge vorhanden." | |
| scored = [] | |
| query_emb = get_embedding(query) | |
| for item in data: | |
| score = db_match_score(query, item, query_emb=query_emb) | |
| if score >= DB_FACT_MATCH_THRESHOLD: | |
| scored.append((score, item)) | |
| scored.sort(key=lambda x: x[0], reverse=True) | |
| matches = [item for _, item in scored[:max_results]] | |
| if not matches: | |
| return "❌ Keine passenden Einträge gefunden." | |
| out = [f"✅ {len(matches)} Treffer:\n"] | |
| for i, item in enumerate(matches, 1): | |
| out.append(format_entry(item, i)) | |
| out.append("\n" + "-" * 40 + "\n") | |
| return "\n".join(out).strip() | |
| def delete_knowledge(query): | |
| global letzte_wissensänderung | |
| query = (query or "").strip() | |
| if not query: | |
| return False, "❌ Bitte einen Suchbegriff zum Löschen eingeben." | |
| with knowledge_lock: | |
| sync_wissen_from_hf() | |
| data = load_wissen() | |
| if not data: | |
| return False, "Keine Einträge vorhanden." | |
| new_data = [] | |
| removed = [] | |
| for item in data: | |
| item_score = db_match_score(query, item) | |
| if item_score >= DB_FACT_MATCH_THRESHOLD: | |
| removed.append(item) | |
| else: | |
| new_data.append(item) | |
| if not removed: | |
| return False, "❌ Nichts gefunden, was gelöscht werden kann." | |
| save_json_list(WISSEN_FILE, new_data) | |
| letzte_wissensänderung = now_str() | |
| threading.Thread(target=upload_wissen_background, daemon=True).start() | |
| return True, f"✅ {len(removed)} Eintrag/Einträge gelöscht." | |
| def delete_all_knowledge(admin_code): | |
| global letzte_wissensänderung | |
| if admin_code != ADMIN_CODE: | |
| return False, "❌ Falscher Admin-Code." | |
| with knowledge_lock: | |
| save_json_list(WISSEN_FILE, []) | |
| letzte_wissensänderung = now_str() | |
| threading.Thread(target=upload_wissen_background, daemon=True).start() | |
| return True, "✅ Alle Wissenseinträge wurden gelöscht." | |
| def save_knowledge_entry(frage, antwort, kategorie="", quelle="", embedding=None): | |
| global letzte_wissensänderung | |
| frage = (frage or "").strip() | |
| antwort = (antwort or "").strip() | |
| kategorie = (kategorie or "").strip() | |
| quelle = (quelle or "").strip() | |
| if not frage or not antwort: | |
| return False, "❌ Thema/Stichwort und Text dürfen nicht leer sein." | |
| with knowledge_lock: | |
| sync_wissen_from_hf() | |
| data = load_wissen() | |
| q_norm = normalize_text(frage) | |
| for item in data: | |
| if normalize_text(item.get("frage", "")) == q_norm: | |
| return False, "ℹ️ Dieser Eintrag ist schon vorhanden." | |
| if embedding is None: | |
| embedding = get_embedding(frage) | |
| entry = { | |
| "frage": frage, | |
| "antwort": antwort, | |
| "kategorie": kategorie, | |
| "quelle": quelle, | |
| "embedding": embedding, | |
| "created_at": now_str() | |
| } | |
| data.append(entry) | |
| save_json_list(WISSEN_FILE, data) | |
| letzte_wissensänderung = now_str() | |
| threading.Thread(target=upload_wissen_background, daemon=True).start() | |
| return True, f"✅ Lokal gespeichert. Upload läuft im Hintergrund.\n\nThema: {frage}" | |
| # ========================================================= | |
| # WEB-LINK LERNEN | |
| # ========================================================= | |
| def is_private_or_local_host(hostname): | |
| if not hostname: | |
| return True | |
| host = hostname.lower().strip() | |
| if host in {"localhost", "127.0.0.1", "::1"}: | |
| return True | |
| try: | |
| ip = ipaddress.ip_address(host) | |
| return ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved | |
| except: | |
| return False | |
| def extract_webpage_text(url, timeout=15, max_bytes=2_000_000): | |
| parsed = urlparse(url.strip()) | |
| if parsed.scheme not in {"http", "https"}: | |
| raise ValueError("Nur http/https Links sind erlaubt.") | |
| if is_private_or_local_host(parsed.hostname): | |
| raise ValueError("Lokale oder private Adressen sind nicht erlaubt.") | |
| req = Request( | |
| url, | |
| headers={ | |
| "User-Agent": "Mozilla/5.0 (compatible; KnowledgeBot/1.0)" | |
| } | |
| ) | |
| with urlopen(req, timeout=timeout) as resp: | |
| raw = resp.read(max_bytes) | |
| charset = resp.headers.get_content_charset() or "utf-8" | |
| html_text = raw.decode(charset, errors="ignore") | |
| title = "" | |
| m = re.search(r"(?is)<title[^>]*>(.*?)</title>", html_text) | |
| if m: | |
| title = html_lib.unescape(m.group(1)).strip() | |
| if BeautifulSoup is not None: | |
| soup = BeautifulSoup(html_text, "html.parser") | |
| for tag in soup(["script", "style", "noscript"]): | |
| tag.decompose() | |
| page_text = soup.get_text(" ", strip=True) | |
| else: | |
| page_text = re.sub(r"(?is)<(script|style|noscript).*?>.*?</\1>", " ", html_text) | |
| page_text = re.sub(r"(?is)<head.*?>.*?</head>", " ", page_text) | |
| page_text = re.sub(r"(?is)<[^>]+>", " ", page_text) | |
| page_text = html_lib.unescape(page_text) | |
| page_text = re.sub(r"\s+", " ", page_text).strip() | |
| if not title: | |
| title = parsed.netloc | |
| if len(page_text) > 12000: | |
| page_text = page_text[:12000] | |
| return title, page_text | |
| def heuristic_summary(text, max_sentences=5, max_chars=2200): | |
| if not text: | |
| return "" | |
| sentences = re.split(r"(?<=[.!?])\s+", text) | |
| picked = [] | |
| total = 0 | |
| for s in sentences: | |
| s = s.strip() | |
| if not s: | |
| continue | |
| picked.append(s) | |
| total += len(s) | |
| if len(picked) >= max_sentences or total >= max_chars: | |
| break | |
| summary = " ".join(picked).strip() | |
| return summary[:max_chars].strip() | |
| def summarize_web_text(title, raw_text): | |
| if not raw_text: | |
| return "" | |
| base = heuristic_summary(raw_text) | |
| if not USE_QWEN_POLISH or model is None or tokenizer is None: | |
| return base | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "Du fasst nur den gegebenen Webseitentext zusammen. " | |
| "Erfinde nichts. Keine neuen Fakten. " | |
| "Schreibe 4 bis 8 kurze Sätze auf Deutsch." | |
| ) | |
| }, | |
| { | |
| "role": "user", | |
| "content": ( | |
| f"Titel: {title}\n\n" | |
| f"Webseitentext:\n{raw_text}\n\n" | |
| "Fasse das als kompakten Lerntest zusammen." | |
| ) | |
| } | |
| ] | |
| try: | |
| out = model_generate(messages, max_new_tokens=180, temperature=0.35, do_sample=True) | |
| out = (out or "").strip() | |
| return out if out else base | |
| except Exception as e: | |
| log_error("summarize_web_text", e) | |
| return base | |
| def save_link_as_knowledge(url, thema="", kategorie="web"): | |
| url = (url or "").strip() | |
| thema = (thema or "").strip() | |
| kategorie = (kategorie or "").strip() or "web" | |
| if not url: | |
| return False, "❌ Bitte einen Link eingeben." | |
| try: | |
| title, raw_text = extract_webpage_text(url) | |
| except Exception as e: | |
| log_error("extract_webpage_text", e) | |
| return False, f"❌ Link konnte nicht gelesen werden: {e}" | |
| if not raw_text or len(raw_text) < 50: | |
| return False, "❌ Auf der Seite konnte kein ausreichender Text gefunden werden." | |
| summary = summarize_web_text(title, raw_text) | |
| if not summary or len(summary.strip()) < 30: | |
| summary = heuristic_summary(raw_text, max_sentences=6, max_chars=3000) | |
| if not summary: | |
| return False, "❌ Der Text war zu leer oder unlesbar." | |
| topic = thema or title or url | |
| ok, msg = save_knowledge_entry( | |
| frage=topic, | |
| antwort=summary, | |
| kategorie=kategorie, | |
| quelle=url | |
| ) | |
| if ok: | |
| return True, f"✅ Link gelernt!\n\nThema: {topic}\nQuelle: {url}\n\nZusammenfassung:\n{summary}" | |
| return ok, msg | |
| # ========================================================= | |
| # CHAT / SPEICHER | |
| # ========================================================= | |
| def load_chat_history(): | |
| ensure_json_list_file(CHAT_FILE) | |
| return load_json_list(CHAT_FILE) | |
| def save_chat_history(history): | |
| save_json_list(CHAT_FILE, history) | |
| def get_chat_session(session_id="default"): | |
| global api_chat_historie | |
| if not isinstance(api_chat_historie, dict): | |
| if isinstance(api_chat_historie, list): | |
| api_chat_historie = {"default": api_chat_historie} | |
| else: | |
| api_chat_historie = {"default": []} | |
| if session_id not in api_chat_historie: | |
| api_chat_historie[session_id] = [] | |
| return api_chat_historie[session_id] | |
| def reset_chat_history(): | |
| global api_chat_historie | |
| with chat_lock: | |
| api_chat_historie = {"default": []} | |
| save_chat_history(api_chat_historie) | |
| log_line("[CHAT] Chat-Historie zurückgesetzt.") | |
| return True, "✅ Chat-Historie gelöscht." | |
| def chat_history_status(): | |
| history = load_chat_history() | |
| if not history: | |
| return "Chat-Historie ist leer." | |
| out = [f"📜 Gespeicherte Nachrichten: {len(history)}\n"] | |
| for i, msg in enumerate(history[-12:], 1): | |
| role = msg.get("role", "?") | |
| content = msg.get("content", "") | |
| out.append(f"{i}. {role}: {content[:250]}") | |
| out.append("\n") | |
| return "\n".join(out).strip() | |
| def load_visible_chat_history_for_ui(): | |
| pairs = api_history_to_pairs(load_chat_history()) | |
| return pairs, pairs | |
| # ========================================================= | |
| # MODEL / QWEN | |
| # ========================================================= | |
| def init_model_if_needed(): | |
| global model, tokenizer, device | |
| if model is not None and tokenizer is not None: | |
| return | |
| print("=" * 60) | |
| print("🤖 Initialisiere Modell") | |
| print("=" * 60) | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| print(f"--- Lade Modell im {LOAD_IN_BITS}-Bit Modus ---") | |
| if LOAD_IN_BITS == 4: | |
| # 4-Bit Modus (sehr sparsam, ca. 5GB RAM) | |
| from transformers import BitsAndBytesConfig | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_use_double_quant=True | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| low_cpu_mem_usage=True, | |
| token=HF_TOKEN | |
| ) | |
| elif LOAD_IN_BITS == 8: | |
| from transformers import BitsAndBytesConfig | |
| # Hier ist der entscheidende Fix für den Fehler: | |
| bnb_config_8 = BitsAndBytesConfig( | |
| load_in_8bit=True, | |
| llm_int8_enable_fp32_cpu_offload=True | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| quantization_config=bnb_config_8, | |
| device_map="auto", # WICHTIG: Verteilt es jetzt auf GPU UND CPU | |
| low_cpu_mem_usage=True, | |
| token=HF_TOKEN | |
| ) | |
| model.eval() | |
| print(f"✅ Modell geladen auf: {device}") | |
| def format_messages_for_model(messages_history): | |
| if tokenizer is None: | |
| return "" | |
| try: | |
| return tokenizer.apply_chat_template( | |
| messages_history, | |
| tokenize=False, | |
| add_generation_prompt=True, | |
| enable_thinking=False | |
| ) | |
| except TypeError: | |
| # Fallback für ältere Modelle ohne enable_thinking | |
| return tokenizer.apply_chat_template( | |
| messages_history, | |
| tokenize=False, | |
| add_generation_prompt=True | |
| ) | |
| except Exception: | |
| lines = [] | |
| for m in messages_history: | |
| role = m.get("role", "user").capitalize() | |
| content = m.get("content", "") | |
| lines.append(f"{role}: {content}") | |
| lines.append("Assistant:") | |
| return "\n".join(lines) | |
| def is_repetitive_output(text, threshold=0.6): | |
| """Prüft ob der generierte Text sich selbst stark wiederholt.""" | |
| if not text or len(text) < 40: | |
| return False | |
| words = text.split() | |
| if len(words) < 10: | |
| return False | |
| half = len(words) // 2 | |
| first_half = " ".join(words[:half]) | |
| second_half = " ".join(words[half:]) | |
| ratio = SequenceMatcher(None, first_half, second_half).ratio() | |
| return ratio > threshold | |
| def model_generate(messages_history, max_new_tokens=120, temperature=0.6, do_sample=True): | |
| if model is None or tokenizer is None: | |
| return "" | |
| prompt_text = format_messages_for_model(messages_history) | |
| if not prompt_text: | |
| return "" | |
| inputs = tokenizer( | |
| [prompt_text], | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=2048 | |
| ).to(device) | |
| attention_mask = inputs.get("attention_mask", None) | |
| with torch.no_grad(): | |
| output = model.generate( | |
| inputs.input_ids, | |
| attention_mask=attention_mask, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=do_sample, | |
| temperature=temperature, | |
| top_p=0.90, | |
| top_k=40, | |
| repetition_penalty=REPETITION_PENALTY, | |
| no_repeat_ngram_size=NO_REPEAT_NGRAM_SIZE, | |
| pad_token_id=tokenizer.eos_token_id, | |
| eos_token_id=tokenizer.eos_token_id | |
| ) | |
| new_tokens = output[0][inputs.input_ids.shape[-1]:] | |
| text = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() | |
| # Qwen3 Thinking-Blöcke entfernen (<think>...</think>) | |
| text = re.sub(r"<think>[\s\S]*?</think>", "", text).strip() | |
| # Auch abgeschnittene think-Tags entfernen | |
| text = re.sub(r"<think>[\s\S]*", "", text).strip() | |
| # Anti-Loop: Falls der Output sich extrem wiederholt, leere Antwort zurückgeben | |
| if is_repetitive_output(text): | |
| log_line("[WARN] Wiederholende Ausgabe erkannt, wird verworfen.") | |
| return "" | |
| return text | |
| def model_generate_stream(messages_history, max_new_tokens=120, temperature=0.6, do_sample=True): | |
| if model is None or tokenizer is None: | |
| yield "Modell nicht geladen." | |
| return | |
| prompt_text = format_messages_for_model(messages_history) | |
| if not prompt_text: | |
| return | |
| inputs = tokenizer( | |
| [prompt_text], | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=2048 | |
| ).to(device) | |
| attention_mask = inputs.get("attention_mask", None) | |
| streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) | |
| generation_kwargs = { | |
| "input_ids": inputs.input_ids, | |
| "attention_mask": attention_mask, | |
| "max_new_tokens": max_new_tokens, | |
| "do_sample": do_sample, | |
| "temperature": temperature, | |
| "top_p": 0.90, | |
| "top_k": 40, | |
| "repetition_penalty": REPETITION_PENALTY, | |
| "no_repeat_ngram_size": NO_REPEAT_NGRAM_SIZE, | |
| "pad_token_id": tokenizer.eos_token_id, | |
| "eos_token_id": tokenizer.eos_token_id, | |
| "streamer": streamer | |
| } | |
| thread = threading.Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| generated_text = "" | |
| for new_text in streamer: | |
| generated_text += new_text | |
| yield generated_text.strip() | |
| def build_system_prompt(user_message=""): | |
| facts = find_relevant_facts(user_message, max_items=6) | |
| if not facts: | |
| facts = load_wissen()[:6] | |
| fact_lines = [] | |
| for idx, item in enumerate(facts, 1): | |
| fact_lines.append( | |
| f"Fakt {idx}:\n" | |
| f"Thema: {item.get('frage', '')}\n" | |
| f"Text: {item.get('antwort', '')}" | |
| ) | |
| fact_block = "\n\n".join(fact_lines) if fact_lines else "Keine gespeicherten Fakten vorhanden." | |
| return f"""Du bist {AI_NAME}, ein hilfreicher KI-Assistent. Antworte kurz und klar auf Deutsch. | |
| Bekannte Fakten: | |
| {fact_block}""" | |
| def get_system_prompt(): | |
| return build_system_prompt("") | |
| def compose_draft_from_facts(facts): | |
| if not facts: | |
| return "" | |
| pieces = [] | |
| for item in facts[:4]: | |
| topic = (item.get("frage", "") or "").strip() | |
| ans = compress_text(item.get("antwort", ""), 220) | |
| if topic and ans: | |
| pieces.append(f"{topic}: {ans}") | |
| elif ans: | |
| pieces.append(ans) | |
| if not pieces: | |
| return "" | |
| random.shuffle(pieces) | |
| return "\n".join(pieces) | |
| def polish_with_model(user_message, draft, facts, history_context=""): | |
| if not USE_QWEN_POLISH: | |
| return draft | |
| if model is None or tokenizer is None: | |
| return draft | |
| fact_lines = [] | |
| for idx, item in enumerate(facts, 1): | |
| fact_lines.append( | |
| f"{idx}. Thema: {item.get('frage', '')}\n" | |
| f" Zusatzwissen: {compress_text(item.get('antwort', ''), 260)}" | |
| ) | |
| fact_block = "\n".join(fact_lines) if fact_lines else "Keine zusätzlichen Fakten." | |
| extra = f"\nZusatzinfo: {draft}" if draft else "" | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"{user_message}{extra}" | |
| } | |
| ] | |
| try: | |
| out = model_generate(messages, max_new_tokens=MAX_NEW_TOKENS_POLISH, temperature=TEMPERATURE_POLISH, do_sample=True) | |
| if not out: | |
| return draft | |
| return out.strip() | |
| except Exception as e: | |
| log_error("polish_with_model", e) | |
| return draft | |
| def general_chat_reply(user_message, history_context=""): | |
| if model is None or tokenizer is None: | |
| return "Dazu habe ich gerade keine sichere Antwort." | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln. Bei Unsicherheit: 'Ich bin nicht sicher, aber ich glaube...'" | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_message | |
| } | |
| ] | |
| try: | |
| out = model_generate(messages, max_new_tokens=MAX_NEW_TOKENS_CHAT, temperature=TEMPERATURE_CHAT, do_sample=True) | |
| return (out or "").strip() or "Dazu habe ich gerade keine sichere Antwort." | |
| except Exception as e: | |
| log_error("general_chat_reply", e) | |
| return "Dazu habe ich gerade keine sichere Antwort." | |
| def generate_reply(user_message, history_context=""): | |
| query = f"{user_message} {history_context}".strip() | |
| facts = find_relevant_facts(query, max_items=6) | |
| facts = dedupe_facts(facts) | |
| exact = exact_db_answer(user_message) | |
| if exact and not is_generic_or_placeholder_answer(exact): | |
| extra_fact = { | |
| "frage": user_message, | |
| "antwort": exact, | |
| "kategorie": "", | |
| "quelle": "", | |
| "created_at": "" | |
| } | |
| facts = dedupe_facts([extra_fact] + facts) | |
| draft = compose_draft_from_facts(facts) | |
| if facts and len(facts) > 0: | |
| reply = polish_with_model(user_message, draft, facts, history_context) | |
| # Wenn das Polieren geklappt hat und keine Standard-Floskel ist, nimm es | |
| if reply and not is_generic_or_placeholder_answer(reply): | |
| return reply | |
| # Falls keine Fakten da sind oder das Polieren Mist war: Normaler Chat | |
| return general_chat_reply(user_message, history_context) | |
| def general_chat_reply_stream(user_message, history_context=""): | |
| if model is None or tokenizer is None: | |
| yield "Dazu habe ich gerade keine sichere Antwort." | |
| return | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln. Bei Unsicherheit: 'Ich bin nicht sicher, aber ich glaube...'" | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_message | |
| } | |
| ] | |
| try: | |
| for chunk in model_generate_stream(messages, max_new_tokens=MAX_NEW_TOKENS_CHAT, temperature=TEMPERATURE_CHAT, do_sample=True): | |
| yield chunk or "Dazu habe ich gerade keine sichere Antwort." | |
| except Exception as e: | |
| log_error("general_chat_reply_stream", e) | |
| yield "Dazu habe ich gerade keine sichere Antwort." | |
| def generate_reply_stream(user_message, history_context=""): | |
| query = f"{user_message} {history_context}".strip() | |
| facts = find_relevant_facts(query, max_items=6) | |
| facts = dedupe_facts(facts) | |
| exact = exact_db_answer(user_message) | |
| if exact and not is_generic_or_placeholder_answer(exact): | |
| extra_fact = { | |
| "frage": user_message, | |
| "antwort": exact, | |
| "kategorie": "", | |
| "quelle": "", | |
| "created_at": "" | |
| } | |
| facts = dedupe_facts([extra_fact] + facts) | |
| draft = compose_draft_from_facts(facts) | |
| if facts and len(facts) > 0: | |
| reply = polish_with_model(user_message, draft, facts, history_context) | |
| if reply and not is_generic_or_placeholder_answer(reply): | |
| yield reply | |
| return | |
| # Normaler Chat Stream | |
| for chunk in general_chat_reply_stream(user_message, history_context): | |
| yield chunk | |
| # ========================================================= | |
| # API | |
| # ========================================================= | |
| def gradio_simple_api(user_message, session_id="default"): | |
| global api_chat_historie, letzte_api_latenz | |
| start = time.perf_counter() | |
| session_id = session_id or "default" | |
| with chat_lock: | |
| history = get_chat_session(session_id) | |
| history_context = history_to_context(history) | |
| reply = generate_reply(user_message, history_context=history_context) | |
| history.append({"role": "user", "content": user_message}) | |
| history.append({"role": "assistant", "content": reply}) | |
| if len(history) > MAX_CHAT_HISTORY: | |
| api_chat_historie[session_id] = history[-MAX_CHAT_HISTORY:] | |
| save_chat_history(api_chat_historie) | |
| log_line(f"[USER] {user_message} (Session: {session_id})") | |
| log_line(f"[ASSISTANT] {reply}") | |
| letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms" | |
| return reply | |
| def gradio_stream_api(user_message, session_id="default"): | |
| global api_chat_historie, letzte_api_latenz | |
| start = time.perf_counter() | |
| session_id = session_id or "default" | |
| with chat_lock: | |
| history = get_chat_session(session_id) | |
| history_context = history_to_context(history) | |
| reply = "" | |
| for chunk in generate_reply_stream(user_message, history_context=history_context): | |
| reply = chunk | |
| yield reply | |
| with chat_lock: | |
| history = get_chat_session(session_id) | |
| history.append({"role": "user", "content": user_message}) | |
| history.append({"role": "assistant", "content": reply}) | |
| if len(history) > MAX_CHAT_HISTORY: | |
| api_chat_historie[session_id] = history[-MAX_CHAT_HISTORY:] | |
| save_chat_history(api_chat_historie) | |
| log_line(f"[USER] {user_message} (Session: {session_id})") | |
| log_line(f"[ASSISTANT] {reply}") | |
| letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms" | |
| # ========================================================= | |
| # UI FUNKTIONEN | |
| # ========================================================= | |
| def ui_zeige_status(): | |
| facts = load_wissen() | |
| stats = get_knowledge_stats() | |
| chat_entries = len(load_chat_history()) | |
| return f"""🟢 SYSTEM ONLINE | |
| 🤖 Modell: {MODEL_NAME} | |
| 🖥️ Device: {device} | |
| 🏠 Space: RedJul2110/MyfirstAI | |
| 📦 Datenbank: {HF_DATASET} | |
| 💾 Gespeicherte Fakten: {len(facts)} | |
| 🗂️ Kategorien: {", ".join(stats["categories"]) if stats["categories"] else "keine"} | |
| 💬 Gespeicherte Chat-Nachrichten: {chat_entries} | |
| ⏱️ Letzte API-Antwortzeit: {letzte_api_latenz if letzte_api_latenz else "noch keine"} | |
| 🔁 Letzter HF-Sync: {letzter_hf_sync if letzter_hf_sync else "noch keiner"} | |
| ⬆️ Letzter Upload: {letzter_upload if letzter_upload else "noch keiner"} | |
| 🧠 Letzte Wissensänderung: {letzte_wissensänderung if letzte_wissensänderung else "noch keine"} | |
| 🔄 Upload läuft: {"ja" if upload_in_progress else "nein"} | |
| ⚠️ Letzter Fehler: {letzter_fehler if letzter_fehler else "keiner"} | |
| Lokale Wissensdatei: {WISSEN_FILE} | |
| Chat-Datei: {CHAT_FILE} | |
| Log-Datei: {LOG_FILE} | |
| """ | |
| def ui_sync_wissen(): | |
| ok, msg = sync_wissen_from_hf() | |
| return msg | |
| def ui_web_lernen(passwort, frage, antwort, kategorie): | |
| if passwort != ADMIN_CODE: | |
| return "❌ Zugriff verweigert! Falscher Admin-Code." | |
| ok, msg = save_knowledge_entry(frage, antwort, kategorie) | |
| return msg | |
| def ui_link_lernen_multi(passwort, urls_text, topics_text, cats_text): | |
| if passwort != ADMIN_CODE: | |
| return "❌ Zugriff verweigert! Falscher Admin-Code." | |
| urls = [u.strip() for u in urls_text.replace(",", "\n").split("\n") if u.strip()] | |
| if not urls: | |
| return "❌ Keine gültigen URLs gefunden." | |
| topics = [t.strip() for t in (topics_text or "").replace(",", "\n").split("\n")] | |
| cats = [c.strip() for c in (cats_text or "").replace(",", "\n").split("\n")] | |
| while len(topics) < len(urls): | |
| topics.append("") | |
| while len(cats) < len(urls): | |
| cats.append("") | |
| results = [] | |
| for u, t, c in zip(urls, topics, cats): | |
| if not u: continue | |
| ok, msg = save_link_as_knowledge(u, t, c) | |
| results.append(f"[{u}]: {msg}") | |
| return "\n\n".join(results) | |
| def extract_pdf_text_fallback(filepath): | |
| try: | |
| import PyPDF2 | |
| text = "" | |
| with open(filepath, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| return text.strip() | |
| except ImportError: | |
| return "ERROR_NO_PYPDF2" | |
| except Exception as e: | |
| return f"ERROR_READ: {e}" | |
| def ui_pdf_lernen(passwort, file_obj, thema, kategorie): | |
| if passwort != ADMIN_CODE: | |
| return "❌ Zugriff verweigert! Falscher Admin-Code." | |
| if file_obj is None: | |
| return "❌ Bitte eine Datei hochladen." | |
| filepath = file_obj if type(file_obj) is str else getattr(file_obj, "name", None) | |
| if not filepath: | |
| return "❌ Dateipfad konnte nicht ermittelt werden." | |
| text = extract_pdf_text_fallback(filepath) | |
| if text == "ERROR_NO_PYPDF2": | |
| return "❌ Das Paket 'PyPDF2' fehlt. Bitte füge 'PyPDF2' zu deiner 'requirements.txt' in Hugging Face hinzu!" | |
| elif text.startswith("ERROR_READ:"): | |
| return f"❌ Fehler beim Lesen: {text}" | |
| if len(text) < 50: | |
| return "❌ In der PDF konnte kein/kaum Text gefunden werden." | |
| topic = thema or "PDF Dokument" | |
| cat = kategorie or "dokument" | |
| summary = summarize_web_text(topic, text) | |
| if not summary or len(summary.strip()) < 30: | |
| summary = heuristic_summary(text, max_sentences=6, max_chars=3000) | |
| ok, msg = save_knowledge_entry(frage=topic, antwort=summary, kategorie=cat, quelle="PDF Upload") | |
| return msg | |
| def ui_wissen_suchen(suchbegriff): | |
| return search_knowledge(suchbegriff) | |
| def ui_wissen_loeschen(passwort, suchbegriff): | |
| if passwort != ADMIN_CODE: | |
| return "❌ Zugriff verweigert! Falscher Admin-Code." | |
| ok, msg = delete_knowledge(suchbegriff) | |
| return msg | |
| def ui_wissen_alle_loeschen(passwort): | |
| if passwort != ADMIN_CODE: | |
| return "❌ Zugriff verweigert! Falscher Admin-Code." | |
| ok, msg = delete_all_knowledge(passwort) | |
| return msg | |
| def ui_chat_send(user_message, visible_history): | |
| global api_chat_historie, letzte_api_latenz | |
| user_message = (user_message or "").strip() | |
| if not user_message: | |
| return "", visible_history, visible_history | |
| start = time.perf_counter() | |
| if visible_history is None: | |
| visible_history = [] | |
| history_context = history_to_context(visible_history) | |
| reply = generate_reply(user_message, history_context=history_context) | |
| visible_history = visible_history + [(user_message, reply)] | |
| with chat_lock: | |
| api_chat_historie.append({"role": "user", "content": user_message}) | |
| api_chat_historie.append({"role": "assistant", "content": reply}) | |
| trim_api_history(10) | |
| save_chat_history(api_chat_historie) | |
| log_line(f"[CHAT USER] {user_message}") | |
| log_line(f"[CHAT BOT] {reply}") | |
| letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms" | |
| return reply, visible_history, visible_history | |
| def ui_chat_reset(): | |
| global api_chat_historie | |
| ok, msg = reset_chat_history() | |
| api_chat_historie = [] | |
| return "", [], [], msg | |
| def ui_chat_status(): | |
| return chat_history_status() | |
| def load_visible_chat_history_for_ui(): | |
| pairs = api_history_to_pairs(load_chat_history()) | |
| return pairs, pairs | |
| # ========================================================= | |
| # APP | |
| # ========================================================= | |
| def erzeuge_gradio_app(): | |
| custom_css = """ | |
| body { background: linear-gradient(135deg, #0f2027, #203a43, #2c5364); color: #fff; font-family: 'Inter', sans-serif; } | |
| .gradio-container { background: rgba(255, 255, 255, 0.05); backdrop-filter: blur(15px); border-radius: 12px; border: 1px solid rgba(255,255,255,0.1); box-shadow: 0 8px 32px 0 rgba(0, 0, 0, 0.37); } | |
| button.primary { background: linear-gradient(90deg, #00C9FF 0%, #92FE9D 100%); border: none; color: black; font-weight: bold; } | |
| button.primary:hover { transform: translateY(-2px); box-shadow: 0 5px 15px rgba(0,201,255,0.4); } | |
| """ | |
| with gr.Blocks(title="Privates KI Kontrollzentrum", theme=gr.themes.Soft(), css=custom_css) as demo: | |
| hidden_msg = gr.Textbox(value="", visible=False) | |
| hidden_session = gr.Textbox(value="default", visible=False) | |
| hidden_out = gr.Textbox(value="", visible=False) | |
| api_trigger = gr.Button(visible=False) | |
| api_trigger.click( | |
| gradio_simple_api, | |
| inputs=[hidden_msg, hidden_session], | |
| outputs=[hidden_out], | |
| api_name="predict" | |
| ) | |
| api_trigger_stream = gr.Button(visible=False) | |
| api_trigger_stream.click( | |
| gradio_stream_api, | |
| inputs=[hidden_msg, hidden_session], | |
| outputs=[hidden_out], | |
| api_name="stream" | |
| ) | |
| gr.Markdown("# 🤖 Privates KI Kontrollzentrum") | |
| gr.Markdown("Die KI nutzt zuerst ihre eigenen Antworten. Gefundene Fakten aus der Datenbank dienen nur als Zusatzwissen.") | |
| with gr.Tab("📊 System Status"): | |
| status_text = gr.Textbox(label="Systembericht", lines=16, interactive=False) | |
| with gr.Row(): | |
| refresh_btn = gr.Button("Status aktualisieren") | |
| sync_btn = gr.Button("Wissen von HF neu laden") | |
| refresh_btn.click(ui_zeige_status, outputs=status_text) | |
| sync_btn.click(ui_sync_wissen, outputs=status_text) | |
| demo.load(ui_zeige_status, outputs=status_text) | |
| with gr.Tab("🔒 Admin-Bereich"): | |
| login_col = gr.Column(visible=True) | |
| admin_col = gr.Column(visible=False) | |
| with login_col: | |
| gr.Markdown("### Bitte Admin-Code eingeben, um Einstellungen und Lern-Tools freizuschalten.") | |
| admin_pw = gr.Textbox(label="Admin-Code", type="password") | |
| login_btn = gr.Button("Login", variant="primary") | |
| login_err = gr.Markdown(visible=False) | |
| with admin_col: | |
| with gr.Tabs(): | |
| with gr.Tab("🧠 Wissen lernen"): | |
| gr.Markdown("Speichere neue Fakten direkt in die Datenbank.") | |
| q_input = gr.Textbox(label="Thema / Stichwort", placeholder="z. B. Frankreich, Mars") | |
| a_input = gr.Textbox(label="Text", placeholder="Langer Infotext", lines=6) | |
| k_input = gr.Textbox(label="Kategorie / Bereich (optional)", placeholder="z. B. Geschichte") | |
| lern_btn = gr.Button("Wissen speichern", variant="primary") | |
| lern_out = gr.Textbox(label="Ergebnis", interactive=False) | |
| lern_btn.click(ui_web_lernen, inputs=[admin_pw, q_input, a_input, k_input], outputs=lern_out) | |
| with gr.Tab("🌐 Link lernen (Multi)"): | |
| gr.Markdown("Ein oder mehrere öffentliche Links einfügen (durch neue Zeile getrennt). Die KI liest und lernt diese.") | |
| link_urls = gr.Textbox(label="Links (Eine URL pro Zeile)", placeholder="https://...\nhttps://...", lines=5) | |
| link_topic = gr.Textbox(label="Themen (Optional, ein Thema pro Zeile passend zur URL)", lines=5) | |
| link_cat = gr.Textbox(label="Kategorien (Optional, eine Kategorie pro Zeile)", lines=5) | |
| link_btn = gr.Button("Links lernen", variant="primary") | |
| link_out = gr.Textbox(label="Ergebnis", lines=8, interactive=False) | |
| link_btn.click(ui_link_lernen_multi, inputs=[admin_pw, link_urls, link_topic, link_cat], outputs=link_out) | |
| with gr.Tab("📄 PDF lernen"): | |
| gr.Markdown("Lade eine PDF-Datei hoch, um ihren Text zu analysieren und als Wissen zu speichern.") | |
| pdf_file = gr.File(label="PDF Datei", file_types=[".pdf"]) | |
| pdf_topic = gr.Textbox(label="Thema / Stichwort (optional)") | |
| pdf_cat = gr.Textbox(label="Kategorie / Bereich (optional)") | |
| pdf_btn = gr.Button("Dokument lernen", variant="primary") | |
| pdf_out = gr.Textbox(label="Ergebnis", lines=6, interactive=False) | |
| pdf_btn.click(ui_pdf_lernen, inputs=[admin_pw, pdf_file, pdf_topic, pdf_cat], outputs=pdf_out) | |
| with gr.Tab("🔍 Suchen / Löschen"): | |
| gr.Markdown("Suche in der Datenbank oder lösche Einträge wieder.") | |
| search_box = gr.Textbox(label="Suchbegriff", placeholder="z. B. Frankreich") | |
| search_btn = gr.Button("Suchen") | |
| search_out = gr.Textbox(label="Treffer", lines=12, interactive=False) | |
| del_box = gr.Textbox(label="Löschen nach Begriff", placeholder="z. B. Frankreich") | |
| del_btn = gr.Button("Löschen", variant="secondary") | |
| del_out = gr.Textbox(label="Lösch-Ergebnis", interactive=False) | |
| all_del_btn = gr.Button("ALLES löschen", variant="stop") | |
| all_del_out = gr.Textbox(label="Alles löschen", interactive=False) | |
| search_btn.click(ui_wissen_suchen, inputs=[search_box], outputs=search_out) | |
| del_btn.click(ui_wissen_loeschen, inputs=[admin_pw, del_box], outputs=del_out) | |
| all_del_btn.click(ui_wissen_alle_loeschen, inputs=[admin_pw], outputs=all_del_out) | |
| def do_login(pw): | |
| if pw == ADMIN_CODE: | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| return gr.update(visible=True), gr.update(visible=False), gr.update(value="**❌ Falscher Admin-Code**", visible=True) | |
| login_btn.click(do_login, inputs=[admin_pw], outputs=[login_col, admin_col, login_err]) | |
| demo.queue(default_concurrency_limit=8) | |
| return demo | |
| # ========================================================= | |
| # LOKALER CHAT (FALLBACK) | |
| # ========================================================= | |
| def local_terminal_chat(): | |
| print("Lokaler Chat gestartet. Tippe 'exit' zum Beenden.") | |
| while True: | |
| user = input("Du: ").strip() | |
| if user.lower() in {"exit", "quit", "ende"}: | |
| break | |
| if not user: | |
| continue | |
| reply = generate_reply(user) | |
| print("Bot:", reply) | |
| # ========================================================= | |
| # BOOTSTRAP | |
| # ========================================================= | |
| def bootstrap(): | |
| global api_chat_historie | |
| ensure_json_list_file(WISSEN_FILE) | |
| ensure_json_list_file(CHAT_FILE) | |
| sync_wissen_from_hf() | |
| api_chat_historie = load_chat_history() | |
| init_model_if_needed() | |
| if os.environ.get("SPACE_ID"): | |
| app = erzeuge_gradio_app() | |
| app.queue().launch() | |
| else: | |
| local_terminal_chat() | |
| if __name__ == "__main__": | |
| bootstrap() |