import os import json import re import time import sys import traceback import threading import html as html_lib import ipaddress import random from datetime import datetime from urllib.parse import urlparse from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError from difflib import SequenceMatcher import torch import gradio as gr from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer, AutoModel from huggingface_hub import hf_hub_download, upload_file try: from bs4 import BeautifulSoup except Exception: BeautifulSoup = None # ========================================================= # KONFIG – Hier kannst du alles einstellen # ========================================================= # --- MODELL --- MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct" # Beispiele: # "Qwen/Qwen3-0.6B" → sehr klein, schnell, schwächer # "Qwen/Qwen3-1.7B" → gute Balance (empfohlen) # "Qwen/Qwen3-4B" → klüger, braucht mehr RAM # "Qwen/Qwen2.5-1.5B-Instruct" → älteres Modell, stabil # --- HUGGING FACE --- HF_DATASET = "RedJul2110/wissen-datenbank" HF_TOKEN = os.environ.get("HF_TOKEN", "") ADMIN_CODE = os.environ.get("CODE", "") # --- QUANTISIERUNG (4 für Geschwindigkeit, 8 für Schärfe, 16 für volle Power) --- # Wähle: 4, 8 oder 16 LOAD_IN_BITS = 4 # Empfohlen für 16 GB RAM 4 8 16 # --- DATEIPFADE --- DATA_DIR = "/data" if os.path.isdir("/data") else "." os.makedirs(DATA_DIR, exist_ok=True) WISSEN_FILE = os.path.join(DATA_DIR, "wissen.json") CHAT_FILE = os.path.join(DATA_DIR, "chat_history.json") LOG_FILE = os.path.join(DATA_DIR, "ai_log.txt") SETTINGS_FILE = os.path.join(DATA_DIR, "settings.json") # --- ANTWORTLÄNGE (wie lang darf die KI antworten?) --- MAX_NEW_TOKENS_CHAT = 100 # Normale Chat-Antwort MAX_NEW_TOKENS_POLISH = 100 # Antwort mit Wissensdatenbank MAX_NEW_TOKENS_SUMMARY = 200 # Link-Zusammenfassung # Beispiele: # 80 → sehr kurze Antworten (schneller) # 150 → mittlere Antworten (empfohlen) # 300 → lange, detaillierte Antworten (langsamer) # --- KREATIVITÄT (wie kreativ / zufällig antwortet die KI?) --- TEMPERATURE_CHAT = 0.3 # Haupt-Chat TEMPERATURE_POLISH = 0.4 # Antwort mit Wissen # Beispiele: # 0.1 → sehr präzise, fast immer gleiche Antworten # 0.5 → ausgewogen (empfohlen) # 0.9 → kreativ, aber manchmal ungenau # --- WIEDERHOLUNGSSCHUTZ --- REPETITION_PENALTY = 1.15 # Wie stark Wiederholungen bestraft werden NO_REPEAT_NGRAM_SIZE = 3 # Keine X Wörter hintereinander wiederholen # Beispiele repetition_penalty: # 1.0 → kein Schutz (kann sich wiederholen) # 1.15 → leichter Schutz (empfohlen) # 1.4 → starker Schutz (kann Antwort unnatürlich machen) # --- CHAT-GEDÄCHTNIS (wie viele Nachrichten merkt sie sich?) --- MAX_CHAT_HISTORY = 10 # Maximale gespeicherte Nachrichten MAX_CONTEXT_TURNS = 5 # Wie viele Nachrichten als Kontext genutzt werden # Beispiele: # 6 → kurzes Gedächtnis, sneller # 10 → mittleres Gedächtnis (empfohlen) # 20 → langes Gedächtnis (braucht mehr Rechenleistung) # --- KI-PERSÖNLICHKEIT --- AI_NAME = "RedJul2110" FALLBACK_NO_INFO = "Dazu habe ich gerade keine sichere Antwort." SYSTEM_PROMPT_ZUSATZ = "Du bist ein brillanter KI-Assistent. Du kannst sehr gut programmieren und Code-Beispiele liefern, aber du kannst auch bei jeglichen anderen Themen helfen. Du antwortest immer klar und kurz." # --- WISSENSDATENBANK --- USE_QWEN_POLISH = True # True = KI verfeinert Antworten mit Wissen DB_DIRECT_MATCH_THRESHOLD = 0.88 # Wie ähnlich muss eine Frage sein? (0.0–1.0) DB_FACT_MATCH_THRESHOLD = 0.70 # Ab wann gilt ein Fakt als passend? (0.0–1.0) # Beispiele threshold: # 0.95 → nur fast identische Treffer (sehr streng) # 0.88 → gute Balance (empfohlen) # 0.60 → viele Treffer, auch weniger passende # ========================================================= # GLOBALE VARIABLEN # ========================================================= model = None tokenizer = None embed_model = None embed_tokenizer = None device = torch.device("cuda" if torch.cuda.is_available() else "cpu") knowledge_lock = threading.Lock() chat_lock = threading.Lock() api_chat_historie = [] # list[dict] upload_in_progress = False letzter_hf_sync = None letzter_upload = None letzte_wissensänderung = None letzte_api_latenz = None letzter_fehler = None # ========================================================= # HILFSFUNKTIONEN # ========================================================= def now_str(): return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def log_line(message): try: with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"[{now_str()}] {message}\n") except: pass def log_error(where, exc): global letzter_fehler letzter_fehler = f"{where}: {exc}" log_line(f"[ERROR] {where}: {exc}\n{traceback.format_exc()}") def normalize_text(text): text = (text or "").lower().strip() text = ( text.replace("ä", "ae") .replace("ö", "oe") .replace("ü", "ue") .replace("ß", "ss") ) text = re.sub(r"[^a-z0-9]+", " ", text) return re.sub(r"\s+", " ", text).strip() def text_tokens(text): stopwords = { "der", "die", "das", "ein", "eine", "einer", "eines", "und", "oder", "ist", "sind", "war", "waren", "wie", "was", "wer", "wo", "wann", "warum", "wieso", "woher", "wieviel", "wieviele", "im", "in", "am", "an", "zu", "mit", "von", "für", "auf", "aus", "den", "dem", "des", "ich", "du", "er", "sie", "es", "man", "nicht", "nur", "auch", "noch" } tokens = normalize_text(text).split() return {t for t in tokens if t and t not in stopwords} def ensure_json_list_file(path): if not os.path.exists(path): save_json_list(path, []) def load_json_list(path): if not os.path.exists(path): return [] try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, list) else [] except: return [] def save_json_list(path, data): tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp, path) def format_entry(item, idx=None): titel = item.get("frage", "").strip() text = item.get("antwort", "").strip() kategorie = item.get("kategorie", "").strip() quelle = item.get("quelle", "").strip() created = item.get("created_at", "").strip() out = [] if idx is None: out.append(f"{titel}") else: out.append(f"{idx}. {titel}") if kategorie: out.append(f"[Kategorie: {kategorie}]") if quelle: out.append(f"[Quelle: {quelle}]") if created: out.append(f"[Zeit: {created}]") out.append(text) return "\n".join(out) def history_to_context(history, max_turns=MAX_CONTEXT_TURNS): if not history: return "" lines = [] if isinstance(history[0], tuple): for user, assistant in history[-max_turns:]: lines.append(f"User: {user}") lines.append(f"Assistant: {assistant}") return "\n".join(lines) recent = history[-max_turns * 2:] for msg in recent: role = msg.get("role", "") content = msg.get("content", "") lines.append(f"{role}: {content}") return "\n".join(lines) def api_history_to_pairs(messages): pairs = [] pending_user = None for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "user": pending_user = content elif role == "assistant" and pending_user is not None: pairs.append((pending_user, content)) pending_user = None return pairs def trim_api_history(max_messages=MAX_CHAT_HISTORY): global api_chat_historie if len(api_chat_historie) > max_messages: api_chat_historie = api_chat_historie[-max_messages:] def looks_like_factual_question(text): t = normalize_text(text) return "?" in text or t.startswith(( "was", "wer", "wie", "wann", "wo", "warum", "wieso", "welche", "welcher", "welches", "nenn", "nenne", "erklaer", "erklär" )) def compress_text(text, max_chars=220): text = (text or "").strip() if not text: return "" text = re.sub(r"\s+", " ", text) if len(text) <= max_chars: return text cut = text[:max_chars].rsplit(" ", 1)[0].strip() return cut + "..." def is_generic_or_placeholder_answer(text): t = normalize_text(text) if not t: return True markers = [ "interessante frage", "was moechtest du wissen", "was möchtest du wissen", "ich bin sehr praezise", "ich bin sehr präzise", "dazu habe ich nichts", "was genau", "wie kann ich helfen", "ruf mich an", "ich kann dir", "hallo, ich bin", "hallo ich bin", "hier ist mein angebot", ] return any(m in t for m in markers) def dedupe_facts(facts): seen = set() unique = [] for item in facts: key = ( normalize_text(item.get("frage", "")), normalize_text(item.get("antwort", "")), normalize_text(item.get("quelle", "")), ) if key not in seen: seen.add(key) unique.append(item) return unique # ========================================================= # KNOWLEDGE / DATENBANK # ========================================================= def load_wissen(): ensure_json_list_file(WISSEN_FILE) return load_json_list(WISSEN_FILE) def sync_wissen_from_hf(): global letzter_hf_sync ensure_json_list_file(WISSEN_FILE) if not HF_TOKEN: log_line("[WARN] HF_TOKEN fehlt. Lokale Datei wird genutzt.") return False, "HF_TOKEN fehlt. Lokale Datei wird genutzt." try: remote_path = hf_hub_download( repo_id=HF_DATASET, filename="wissen.json", repo_type="dataset", token=HF_TOKEN, force_download=True ) remote_data = load_json_list(remote_path) save_json_list(WISSEN_FILE, remote_data) letzter_hf_sync = now_str() return True, f"✅ Wissen aus HF geladen ({len(remote_data)} Einträge)." except Exception as e: msg = str(e) if "Entry Not Found" in msg or "404" in msg or "not found" in msg.lower(): return False, "ℹ️ Im HF-Dataset gibt es noch keine wissen.json. Lokale Datei wird genutzt." log_error("sync_wissen_from_hf", e) return False, f"⚠️ HF-Sync fehlgeschlagen, lokale Datei bleibt aktiv: {e}" def upload_wissen_background(): global upload_in_progress, letzter_upload if not HF_TOKEN: log_line("[WARN] Upload übersprungen, weil HF_TOKEN fehlt.") return upload_in_progress = True try: upload_file( path_or_fileobj=WISSEN_FILE, path_in_repo="wissen.json", repo_id=HF_DATASET, repo_type="dataset", token=HF_TOKEN, commit_message=f"Update wissen.json ({now_str()})" ) letzter_upload = now_str() log_line("[OK] wissen.json erfolgreich hochgeladen.") except Exception as e: log_error("upload_wissen_background", e) finally: upload_in_progress = False def init_embed_model(): global embed_tokenizer, embed_model if embed_model is not None: return try: log_line("[INFO] Lade Embedding-Modell für semantische Suche...") embed_tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") embed_model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2").to(device) embed_model.eval() log_line("[INFO] Embedding-Modell erfolgreich geladen.") except Exception as e: log_error("init_embed_model", e) def get_embedding(text): init_embed_model() if embed_model is None or embed_tokenizer is None or not text: return [] try: inputs = embed_tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device) with torch.no_grad(): outputs = embed_model(**inputs) # Mean Pooling emb = outputs.last_hidden_state.mean(dim=1) return emb[0].cpu().numpy().tolist() except Exception as e: log_error("get_embedding", e) return [] def cosine_similarity(v1, v2): if not v1 or not v2 or len(v1) != len(v2): return 0.0 import math dot = sum(a * b for a, b in zip(v1, v2)) mag1 = math.sqrt(sum(a * a for a in v1)) mag2 = math.sqrt(sum(b * b for b in v2)) if mag1 == 0 or mag2 == 0: return 0.0 return dot / (mag1 * mag2) def db_match_score(query, item, query_emb=None): q_norm = normalize_text(query) frage = normalize_text(item.get("frage", "")) antwort = item.get("antwort", "") kategorie = normalize_text(item.get("kategorie", "")) quelle = normalize_text(item.get("quelle", "")) blob_norm = normalize_text(f"{item.get('frage', '')} {antwort} {item.get('kategorie','')} {item.get('quelle','')}") if not q_norm or not frage: return 0.0 seq = SequenceMatcher(None, q_norm, frage).ratio() q_tokens = text_tokens(q_norm) c_tokens = text_tokens(blob_norm) token_score = len(q_tokens & c_tokens) / max(len(q_tokens), 1) semantic_score = 0.0 item_emb = item.get("embedding", []) if query_emb and item_emb: semantic_score = cosine_similarity(query_emb, item_emb) bonus = 0.0 if q_norm == frage: bonus += 0.35 if q_norm in blob_norm: bonus += 0.12 if quelle and q_norm == quelle: bonus += 0.15 if kategorie and q_norm == kategorie: bonus += 0.08 return max((seq * 0.6) + (token_score * 0.4) + bonus, semantic_score + bonus) def exact_db_answer(user_message): q = normalize_text(user_message) if not q: return None data = load_wissen() for item in data: frage = normalize_text(item.get("frage", "")) quelle = normalize_text(item.get("quelle", "")) antwort = item.get("antwort", "").strip() if q == frage or (quelle and q == quelle): return antwort return None def best_db_answer(user_message, threshold=DB_DIRECT_MATCH_THRESHOLD): exact = exact_db_answer(user_message) if exact: return exact data = load_wissen() if not data: return None best_item = None best_score = 0.0 query_emb = get_embedding(user_message) for item in data: score = db_match_score(user_message, item, query_emb=query_emb) if score > best_score: best_score = score best_item = item if best_item is not None and best_score >= threshold: return best_item.get("antwort", "").strip() return None def find_relevant_facts(query, max_items=6, min_score=DB_FACT_MATCH_THRESHOLD): data = load_wissen() if not data: return [] scored = [] query_emb = get_embedding(query) for item in data: score = db_match_score(query, item, query_emb=query_emb) if score >= min_score: scored.append((score, item)) scored.sort(key=lambda x: x[0], reverse=True) return [item for _, item in scored[:max_items]] def get_knowledge_stats(): data = load_wissen() categories = [] for item in data: cat = item.get("kategorie", "").strip() if cat and cat not in categories: categories.append(cat) return { "count": len(data), "categories": categories[:10], } def search_knowledge(query, max_results=8): query = (query or "").strip() if not query: return "❌ Bitte gib einen Suchbegriff ein." data = load_wissen() if not data: return "Keine Einträge vorhanden." scored = [] query_emb = get_embedding(query) for item in data: score = db_match_score(query, item, query_emb=query_emb) if score >= DB_FACT_MATCH_THRESHOLD: scored.append((score, item)) scored.sort(key=lambda x: x[0], reverse=True) matches = [item for _, item in scored[:max_results]] if not matches: return "❌ Keine passenden Einträge gefunden." out = [f"✅ {len(matches)} Treffer:\n"] for i, item in enumerate(matches, 1): out.append(format_entry(item, i)) out.append("\n" + "-" * 40 + "\n") return "\n".join(out).strip() def delete_knowledge(query): global letzte_wissensänderung query = (query or "").strip() if not query: return False, "❌ Bitte einen Suchbegriff zum Löschen eingeben." with knowledge_lock: sync_wissen_from_hf() data = load_wissen() if not data: return False, "Keine Einträge vorhanden." new_data = [] removed = [] for item in data: item_score = db_match_score(query, item) if item_score >= DB_FACT_MATCH_THRESHOLD: removed.append(item) else: new_data.append(item) if not removed: return False, "❌ Nichts gefunden, was gelöscht werden kann." save_json_list(WISSEN_FILE, new_data) letzte_wissensänderung = now_str() threading.Thread(target=upload_wissen_background, daemon=True).start() return True, f"✅ {len(removed)} Eintrag/Einträge gelöscht." def delete_all_knowledge(admin_code): global letzte_wissensänderung if admin_code != ADMIN_CODE: return False, "❌ Falscher Admin-Code." with knowledge_lock: save_json_list(WISSEN_FILE, []) letzte_wissensänderung = now_str() threading.Thread(target=upload_wissen_background, daemon=True).start() return True, "✅ Alle Wissenseinträge wurden gelöscht." def save_knowledge_entry(frage, antwort, kategorie="", quelle="", embedding=None): global letzte_wissensänderung frage = (frage or "").strip() antwort = (antwort or "").strip() kategorie = (kategorie or "").strip() quelle = (quelle or "").strip() if not frage or not antwort: return False, "❌ Thema/Stichwort und Text dürfen nicht leer sein." with knowledge_lock: sync_wissen_from_hf() data = load_wissen() q_norm = normalize_text(frage) for item in data: if normalize_text(item.get("frage", "")) == q_norm: return False, "ℹ️ Dieser Eintrag ist schon vorhanden." if embedding is None: embedding = get_embedding(frage) entry = { "frage": frage, "antwort": antwort, "kategorie": kategorie, "quelle": quelle, "embedding": embedding, "created_at": now_str() } data.append(entry) save_json_list(WISSEN_FILE, data) letzte_wissensänderung = now_str() threading.Thread(target=upload_wissen_background, daemon=True).start() return True, f"✅ Lokal gespeichert. Upload läuft im Hintergrund.\n\nThema: {frage}" # ========================================================= # WEB-LINK LERNEN # ========================================================= def is_private_or_local_host(hostname): if not hostname: return True host = hostname.lower().strip() if host in {"localhost", "127.0.0.1", "::1"}: return True try: ip = ipaddress.ip_address(host) return ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved except: return False def extract_webpage_text(url, timeout=15, max_bytes=2_000_000): parsed = urlparse(url.strip()) if parsed.scheme not in {"http", "https"}: raise ValueError("Nur http/https Links sind erlaubt.") if is_private_or_local_host(parsed.hostname): raise ValueError("Lokale oder private Adressen sind nicht erlaubt.") req = Request( url, headers={ "User-Agent": "Mozilla/5.0 (compatible; KnowledgeBot/1.0)" } ) with urlopen(req, timeout=timeout) as resp: raw = resp.read(max_bytes) charset = resp.headers.get_content_charset() or "utf-8" html_text = raw.decode(charset, errors="ignore") title = "" m = re.search(r"(?is)]*>(.*?)", html_text) if m: title = html_lib.unescape(m.group(1)).strip() if BeautifulSoup is not None: soup = BeautifulSoup(html_text, "html.parser") for tag in soup(["script", "style", "noscript"]): tag.decompose() page_text = soup.get_text(" ", strip=True) else: page_text = re.sub(r"(?is)<(script|style|noscript).*?>.*?", " ", html_text) page_text = re.sub(r"(?is).*?", " ", page_text) page_text = re.sub(r"(?is)<[^>]+>", " ", page_text) page_text = html_lib.unescape(page_text) page_text = re.sub(r"\s+", " ", page_text).strip() if not title: title = parsed.netloc if len(page_text) > 12000: page_text = page_text[:12000] return title, page_text def heuristic_summary(text, max_sentences=5, max_chars=2200): if not text: return "" sentences = re.split(r"(?<=[.!?])\s+", text) picked = [] total = 0 for s in sentences: s = s.strip() if not s: continue picked.append(s) total += len(s) if len(picked) >= max_sentences or total >= max_chars: break summary = " ".join(picked).strip() return summary[:max_chars].strip() def summarize_web_text(title, raw_text): if not raw_text: return "" base = heuristic_summary(raw_text) if not USE_QWEN_POLISH or model is None or tokenizer is None: return base messages = [ { "role": "system", "content": ( "Du fasst nur den gegebenen Webseitentext zusammen. " "Erfinde nichts. Keine neuen Fakten. " "Schreibe 4 bis 8 kurze Sätze auf Deutsch." ) }, { "role": "user", "content": ( f"Titel: {title}\n\n" f"Webseitentext:\n{raw_text}\n\n" "Fasse das als kompakten Lerntest zusammen." ) } ] try: out = model_generate(messages, max_new_tokens=180, temperature=0.35, do_sample=True) out = (out or "").strip() return out if out else base except Exception as e: log_error("summarize_web_text", e) return base def save_link_as_knowledge(url, thema="", kategorie="web"): url = (url or "").strip() thema = (thema or "").strip() kategorie = (kategorie or "").strip() or "web" if not url: return False, "❌ Bitte einen Link eingeben." try: title, raw_text = extract_webpage_text(url) except Exception as e: log_error("extract_webpage_text", e) return False, f"❌ Link konnte nicht gelesen werden: {e}" if not raw_text or len(raw_text) < 50: return False, "❌ Auf der Seite konnte kein ausreichender Text gefunden werden." summary = summarize_web_text(title, raw_text) if not summary or len(summary.strip()) < 30: summary = heuristic_summary(raw_text, max_sentences=6, max_chars=3000) if not summary: return False, "❌ Der Text war zu leer oder unlesbar." topic = thema or title or url ok, msg = save_knowledge_entry( frage=topic, antwort=summary, kategorie=kategorie, quelle=url ) if ok: return True, f"✅ Link gelernt!\n\nThema: {topic}\nQuelle: {url}\n\nZusammenfassung:\n{summary}" return ok, msg # ========================================================= # CHAT / SPEICHER # ========================================================= def load_chat_history(): ensure_json_list_file(CHAT_FILE) return load_json_list(CHAT_FILE) def save_chat_history(history): save_json_list(CHAT_FILE, history) def get_chat_session(session_id="default"): global api_chat_historie if not isinstance(api_chat_historie, dict): if isinstance(api_chat_historie, list): api_chat_historie = {"default": api_chat_historie} else: api_chat_historie = {"default": []} if session_id not in api_chat_historie: api_chat_historie[session_id] = [] return api_chat_historie[session_id] def reset_chat_history(): global api_chat_historie with chat_lock: api_chat_historie = {"default": []} save_chat_history(api_chat_historie) log_line("[CHAT] Chat-Historie zurückgesetzt.") return True, "✅ Chat-Historie gelöscht." def chat_history_status(): history = load_chat_history() if not history: return "Chat-Historie ist leer." out = [f"📜 Gespeicherte Nachrichten: {len(history)}\n"] for i, msg in enumerate(history[-12:], 1): role = msg.get("role", "?") content = msg.get("content", "") out.append(f"{i}. {role}: {content[:250]}") out.append("\n") return "\n".join(out).strip() def load_visible_chat_history_for_ui(): pairs = api_history_to_pairs(load_chat_history()) return pairs, pairs # ========================================================= # MODEL / QWEN # ========================================================= def init_model_if_needed(): global model, tokenizer, device if model is not None and tokenizer is not None: return print("=" * 60) print("🤖 Initialisiere Modell") print("=" * 60) tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token print(f"--- Lade Modell im {LOAD_IN_BITS}-Bit Modus ---") if LOAD_IN_BITS == 4: # 4-Bit Modus (sehr sparsam, ca. 5GB RAM) from transformers import BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, llm_int8_enable_fp32_cpu_offload=True ) model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, quantization_config=bnb_config, device_map="auto", low_cpu_mem_usage=True, token=HF_TOKEN, offload_folder="offload" ) elif LOAD_IN_BITS == 8: from transformers import BitsAndBytesConfig # Hier ist der entscheidende Fix für den Fehler: bnb_config_8 = BitsAndBytesConfig( load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True ) model = AutoModelForCausalLM.from_pretrained( MODEL_NAME, quantization_config=bnb_config_8, device_map="auto", # WICHTIG: Verteilt es jetzt auf GPU UND CPU low_cpu_mem_usage=True, token=HF_TOKEN ) model.eval() print(f"✅ Modell geladen auf: {device}") def format_messages_for_model(messages_history): if tokenizer is None: return "" try: return tokenizer.apply_chat_template( messages_history, tokenize=False, add_generation_prompt=True, enable_thinking=False ) except TypeError: # Fallback für ältere Modelle ohne enable_thinking return tokenizer.apply_chat_template( messages_history, tokenize=False, add_generation_prompt=True ) except Exception: lines = [] for m in messages_history: role = m.get("role", "user").capitalize() content = m.get("content", "") lines.append(f"{role}: {content}") lines.append("Assistant:") return "\n".join(lines) def is_repetitive_output(text, threshold=0.6): """Prüft ob der generierte Text sich selbst stark wiederholt.""" if not text or len(text) < 40: return False words = text.split() if len(words) < 10: return False half = len(words) // 2 first_half = " ".join(words[:half]) second_half = " ".join(words[half:]) ratio = SequenceMatcher(None, first_half, second_half).ratio() return ratio > threshold def model_generate(messages_history, max_new_tokens=120, temperature=0.6, do_sample=True): if model is None or tokenizer is None: return "" prompt_text = format_messages_for_model(messages_history) if not prompt_text: return "" inputs = tokenizer( [prompt_text], return_tensors="pt", truncation=True, max_length=2048 ).to(device) attention_mask = inputs.get("attention_mask", None) with torch.no_grad(): output = model.generate( inputs.input_ids, attention_mask=attention_mask, max_new_tokens=max_new_tokens, do_sample=do_sample, temperature=temperature, top_p=0.90, top_k=40, repetition_penalty=REPETITION_PENALTY, no_repeat_ngram_size=NO_REPEAT_NGRAM_SIZE, pad_token_id=tokenizer.eos_token_id, eos_token_id=tokenizer.eos_token_id ) new_tokens = output[0][inputs.input_ids.shape[-1]:] text = tokenizer.decode(new_tokens, skip_special_tokens=True).strip() # Qwen3 Thinking-Blöcke entfernen (...) text = re.sub(r"[\s\S]*?", "", text).strip() # Auch abgeschnittene think-Tags entfernen text = re.sub(r"[\s\S]*", "", text).strip() # Anti-Loop: Falls der Output sich extrem wiederholt, leere Antwort zurückgeben if is_repetitive_output(text): log_line("[WARN] Wiederholende Ausgabe erkannt, wird verworfen.") return "" return text def model_generate_stream(messages_history, max_new_tokens=120, temperature=0.6, do_sample=True): if model is None or tokenizer is None: yield "Modell nicht geladen." return prompt_text = format_messages_for_model(messages_history) if not prompt_text: return inputs = tokenizer( [prompt_text], return_tensors="pt", truncation=True, max_length=2048 ).to(device) attention_mask = inputs.get("attention_mask", None) streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True) generation_kwargs = { "input_ids": inputs.input_ids, "attention_mask": attention_mask, "max_new_tokens": max_new_tokens, "do_sample": do_sample, "temperature": temperature, "top_p": 0.90, "top_k": 40, "repetition_penalty": REPETITION_PENALTY, "no_repeat_ngram_size": NO_REPEAT_NGRAM_SIZE, "pad_token_id": tokenizer.eos_token_id, "eos_token_id": tokenizer.eos_token_id, "streamer": streamer } thread = threading.Thread(target=model.generate, kwargs=generation_kwargs) thread.start() generated_text = "" for new_text in streamer: generated_text += new_text yield generated_text.strip() def build_system_prompt(user_message=""): facts = find_relevant_facts(user_message, max_items=6) if not facts: facts = load_wissen()[:6] fact_lines = [] for idx, item in enumerate(facts, 1): fact_lines.append( f"Fakt {idx}:\n" f"Thema: {item.get('frage', '')}\n" f"Text: {item.get('antwort', '')}" ) fact_block = "\n\n".join(fact_lines) if fact_lines else "Keine gespeicherten Fakten vorhanden." return f"""Du bist {AI_NAME}, ein hilfreicher KI-Assistent. Antworte kurz und klar auf Deutsch. Bekannte Fakten: {fact_block}""" def get_system_prompt(): return build_system_prompt("") def compose_draft_from_facts(facts): if not facts: return "" pieces = [] for item in facts[:4]: topic = (item.get("frage", "") or "").strip() ans = compress_text(item.get("antwort", ""), 220) if topic and ans: pieces.append(f"{topic}: {ans}") elif ans: pieces.append(ans) if not pieces: return "" random.shuffle(pieces) return "\n".join(pieces) def polish_with_model(user_message, draft, facts, history_context=""): if not USE_QWEN_POLISH: return draft if model is None or tokenizer is None: return draft fact_lines = [] for idx, item in enumerate(facts, 1): fact_lines.append( f"{idx}. Thema: {item.get('frage', '')}\n" f" Zusatzwissen: {compress_text(item.get('antwort', ''), 260)}" ) fact_block = "\n".join(fact_lines) if fact_lines else "Keine zusätzlichen Fakten." extra = f"\nZusatzinfo: {draft}" if draft else "" messages = [ { "role": "system", "content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln." }, { "role": "user", "content": f"{user_message}{extra}" } ] try: out = model_generate(messages, max_new_tokens=MAX_NEW_TOKENS_POLISH, temperature=TEMPERATURE_POLISH, do_sample=True) if not out: return draft return out.strip() except Exception as e: log_error("polish_with_model", e) return draft def general_chat_reply(user_message, history_context=""): if model is None or tokenizer is None: return "Dazu habe ich gerade keine sichere Antwort." messages = [ { "role": "system", "content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln. Bei Unsicherheit: 'Ich bin nicht sicher, aber ich glaube...'" }, { "role": "user", "content": user_message } ] try: out = model_generate(messages, max_new_tokens=MAX_NEW_TOKENS_CHAT, temperature=TEMPERATURE_CHAT, do_sample=True) return (out or "").strip() or "Dazu habe ich gerade keine sichere Antwort." except Exception as e: log_error("general_chat_reply", e) return "Dazu habe ich gerade keine sichere Antwort." def generate_reply(user_message, history_context=""): query = f"{user_message} {history_context}".strip() facts = find_relevant_facts(query, max_items=6) facts = dedupe_facts(facts) exact = exact_db_answer(user_message) if exact and not is_generic_or_placeholder_answer(exact): extra_fact = { "frage": user_message, "antwort": exact, "kategorie": "", "quelle": "", "created_at": "" } facts = dedupe_facts([extra_fact] + facts) draft = compose_draft_from_facts(facts) if facts and len(facts) > 0: reply = polish_with_model(user_message, draft, facts, history_context) # Wenn das Polieren geklappt hat und keine Standard-Floskel ist, nimm es if reply and not is_generic_or_placeholder_answer(reply): return reply # Falls keine Fakten da sind oder das Polieren Mist war: Normaler Chat return general_chat_reply(user_message, history_context) def general_chat_reply_stream(user_message, history_context=""): if model is None or tokenizer is None: yield "Dazu habe ich gerade keine sichere Antwort." return messages = [ { "role": "system", "content": f"Du bist {AI_NAME}. Antworte immer auf Deutsch. Kurz, direkt, hilfreich. Keine Floskeln. Bei Unsicherheit: 'Ich bin nicht sicher, aber ich glaube...'" }, { "role": "user", "content": user_message } ] try: for chunk in model_generate_stream(messages, max_new_tokens=MAX_NEW_TOKENS_CHAT, temperature=TEMPERATURE_CHAT, do_sample=True): yield chunk or "Dazu habe ich gerade keine sichere Antwort." except Exception as e: log_error("general_chat_reply_stream", e) yield "Dazu habe ich gerade keine sichere Antwort." def generate_reply_stream(user_message, history_context=""): query = f"{user_message} {history_context}".strip() facts = find_relevant_facts(query, max_items=6) facts = dedupe_facts(facts) exact = exact_db_answer(user_message) if exact and not is_generic_or_placeholder_answer(exact): extra_fact = { "frage": user_message, "antwort": exact, "kategorie": "", "quelle": "", "created_at": "" } facts = dedupe_facts([extra_fact] + facts) draft = compose_draft_from_facts(facts) if facts and len(facts) > 0: reply = polish_with_model(user_message, draft, facts, history_context) if reply and not is_generic_or_placeholder_answer(reply): yield reply return # Normaler Chat Stream for chunk in general_chat_reply_stream(user_message, history_context): yield chunk # ========================================================= # API # ========================================================= def gradio_simple_api(user_message, session_id="default"): global api_chat_historie, letzte_api_latenz start = time.perf_counter() session_id = session_id or "default" with chat_lock: history = get_chat_session(session_id) history_context = history_to_context(history) reply = generate_reply(user_message, history_context=history_context) history.append({"role": "user", "content": user_message}) history.append({"role": "assistant", "content": reply}) if len(history) > MAX_CHAT_HISTORY: api_chat_historie[session_id] = history[-MAX_CHAT_HISTORY:] save_chat_history(api_chat_historie) log_line(f"[USER] {user_message} (Session: {session_id})") log_line(f"[ASSISTANT] {reply}") letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms" return reply def gradio_stream_api(user_message, session_id="default"): global api_chat_historie, letzte_api_latenz start = time.perf_counter() session_id = session_id or "default" with chat_lock: history = get_chat_session(session_id) history_context = history_to_context(history) reply = "" for chunk in generate_reply_stream(user_message, history_context=history_context): reply = chunk yield reply with chat_lock: history = get_chat_session(session_id) history.append({"role": "user", "content": user_message}) history.append({"role": "assistant", "content": reply}) if len(history) > MAX_CHAT_HISTORY: api_chat_historie[session_id] = history[-MAX_CHAT_HISTORY:] save_chat_history(api_chat_historie) log_line(f"[USER] {user_message} (Session: {session_id})") log_line(f"[ASSISTANT] {reply}") letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms" # ========================================================= # UI FUNKTIONEN # ========================================================= def ui_zeige_status(): facts = load_wissen() stats = get_knowledge_stats() chat_entries = len(load_chat_history()) return f"""🟢 SYSTEM ONLINE 🤖 Modell: {MODEL_NAME} 🖥️ Device: {device} 🏠 Space: RedJul2110/MyfirstAI 📦 Datenbank: {HF_DATASET} 💾 Gespeicherte Fakten: {len(facts)} 🗂️ Kategorien: {", ".join(stats["categories"]) if stats["categories"] else "keine"} 💬 Gespeicherte Chat-Nachrichten: {chat_entries} ⏱️ Letzte API-Antwortzeit: {letzte_api_latenz if letzte_api_latenz else "noch keine"} 🔁 Letzter HF-Sync: {letzter_hf_sync if letzter_hf_sync else "noch keiner"} ⬆️ Letzter Upload: {letzter_upload if letzter_upload else "noch keiner"} 🧠 Letzte Wissensänderung: {letzte_wissensänderung if letzte_wissensänderung else "noch keine"} 🔄 Upload läuft: {"ja" if upload_in_progress else "nein"} ⚠️ Letzter Fehler: {letzter_fehler if letzter_fehler else "keiner"} Lokale Wissensdatei: {WISSEN_FILE} Chat-Datei: {CHAT_FILE} Log-Datei: {LOG_FILE} """ def ui_sync_wissen(): ok, msg = sync_wissen_from_hf() return msg def ui_web_lernen(passwort, frage, antwort, kategorie): if passwort != ADMIN_CODE: return "❌ Zugriff verweigert! Falscher Admin-Code." ok, msg = save_knowledge_entry(frage, antwort, kategorie) return msg def ui_link_lernen_multi(passwort, urls_text, topics_text, cats_text): if passwort != ADMIN_CODE: return "❌ Zugriff verweigert! Falscher Admin-Code." urls = [u.strip() for u in urls_text.replace(",", "\n").split("\n") if u.strip()] if not urls: return "❌ Keine gültigen URLs gefunden." topics = [t.strip() for t in (topics_text or "").replace(",", "\n").split("\n")] cats = [c.strip() for c in (cats_text or "").replace(",", "\n").split("\n")] while len(topics) < len(urls): topics.append("") while len(cats) < len(urls): cats.append("") results = [] for u, t, c in zip(urls, topics, cats): if not u: continue ok, msg = save_link_as_knowledge(u, t, c) results.append(f"[{u}]: {msg}") return "\n\n".join(results) def extract_pdf_text_fallback(filepath): try: import PyPDF2 text = "" with open(filepath, "rb") as f: reader = PyPDF2.PdfReader(f) for page in reader.pages: page_text = page.extract_text() if page_text: text += page_text + "\n" return text.strip() except ImportError: return "ERROR_NO_PYPDF2" except Exception as e: return f"ERROR_READ: {e}" def ui_pdf_lernen(passwort, file_obj, thema, kategorie): if passwort != ADMIN_CODE: return "❌ Zugriff verweigert! Falscher Admin-Code." if file_obj is None: return "❌ Bitte eine Datei hochladen." filepath = file_obj if type(file_obj) is str else getattr(file_obj, "name", None) if not filepath: return "❌ Dateipfad konnte nicht ermittelt werden." text = extract_pdf_text_fallback(filepath) if text == "ERROR_NO_PYPDF2": return "❌ Das Paket 'PyPDF2' fehlt. Bitte füge 'PyPDF2' zu deiner 'requirements.txt' in Hugging Face hinzu!" elif text.startswith("ERROR_READ:"): return f"❌ Fehler beim Lesen: {text}" if len(text) < 50: return "❌ In der PDF konnte kein/kaum Text gefunden werden." topic = thema or "PDF Dokument" cat = kategorie or "dokument" summary = summarize_web_text(topic, text) if not summary or len(summary.strip()) < 30: summary = heuristic_summary(text, max_sentences=6, max_chars=3000) ok, msg = save_knowledge_entry(frage=topic, antwort=summary, kategorie=cat, quelle="PDF Upload") return msg def ui_wissen_suchen(suchbegriff): return search_knowledge(suchbegriff) def ui_wissen_loeschen(passwort, suchbegriff): if passwort != ADMIN_CODE: return "❌ Zugriff verweigert! Falscher Admin-Code." ok, msg = delete_knowledge(suchbegriff) return msg def ui_wissen_alle_loeschen(passwort): if passwort != ADMIN_CODE: return "❌ Zugriff verweigert! Falscher Admin-Code." ok, msg = delete_all_knowledge(passwort) return msg def ui_chat_send(user_message, visible_history): global api_chat_historie, letzte_api_latenz user_message = (user_message or "").strip() if not user_message: return "", visible_history, visible_history start = time.perf_counter() if visible_history is None: visible_history = [] history_context = history_to_context(visible_history) reply = generate_reply(user_message, history_context=history_context) visible_history = visible_history + [(user_message, reply)] with chat_lock: api_chat_historie.append({"role": "user", "content": user_message}) api_chat_historie.append({"role": "assistant", "content": reply}) trim_api_history(10) save_chat_history(api_chat_historie) log_line(f"[CHAT USER] {user_message}") log_line(f"[CHAT BOT] {reply}") letzte_api_latenz = f"{(time.perf_counter() - start) * 1000:.2f} ms" return reply, visible_history, visible_history def ui_chat_reset(): global api_chat_historie ok, msg = reset_chat_history() api_chat_historie = [] return "", [], [], msg def ui_chat_status(): return chat_history_status() def load_visible_chat_history_for_ui(): pairs = api_history_to_pairs(load_chat_history()) return pairs, pairs # ========================================================= # APP # ========================================================= def erzeuge_gradio_app(): custom_css = """ body { background: linear-gradient(135deg, #0f2027, #203a43, #2c5364); color: #fff; font-family: 'Inter', sans-serif; } .gradio-container { background: rgba(255, 255, 255, 0.05); backdrop-filter: blur(15px); border-radius: 12px; border: 1px solid rgba(255,255,255,0.1); box-shadow: 0 8px 32px 0 rgba(0, 0, 0, 0.37); } button.primary { background: linear-gradient(90deg, #00C9FF 0%, #92FE9D 100%); border: none; color: black; font-weight: bold; } button.primary:hover { transform: translateY(-2px); box-shadow: 0 5px 15px rgba(0,201,255,0.4); } """ with gr.Blocks(title="KI Status", theme=gr.themes.Soft(), css=custom_css) as demo: hidden_msg = gr.Textbox(value="", visible=False) hidden_session = gr.Textbox(value="default", visible=False) hidden_out = gr.Textbox(value="", visible=False) api_trigger = gr.Button(visible=False) api_trigger.click( gradio_simple_api, inputs=[hidden_msg, hidden_session], outputs=[hidden_out], api_name="predict" ) api_trigger_stream = gr.Button(visible=False) api_trigger_stream.click( gradio_stream_api, inputs=[hidden_msg, hidden_session], outputs=[hidden_out], api_name="stream" ) gr.Markdown("# 🤖 KI Status") gr.Markdown("Die KI nutzt zuerst ihre eigenen Antworten. Gefundene Fakten aus der Datenbank dienen nur als Zusatzwissen.") with gr.Tab("📊 System Status"): status_text = gr.Textbox(label="Systembericht", lines=16, interactive=False) with gr.Row(): refresh_btn = gr.Button("Status aktualisieren") sync_btn = gr.Button("Wissen von HF neu laden") refresh_btn.click(ui_zeige_status, outputs=status_text) sync_btn.click(ui_sync_wissen, outputs=status_text) demo.load(ui_zeige_status, outputs=status_text) with gr.Tab("🔒 Admin-Bereich"): login_col = gr.Column(visible=True) admin_col = gr.Column(visible=False) with login_col: gr.Markdown("### Bitte Admin-Code eingeben, um Einstellungen und Lern-Tools freizuschalten.") admin_pw = gr.Textbox(label="Admin-Code", type="password") login_btn = gr.Button("Login", variant="primary") login_err = gr.Markdown(visible=False) with admin_col: with gr.Tabs(): with gr.Tab("🧠 Wissen lernen"): gr.Markdown("Speichere neue Fakten direkt in die Datenbank.") q_input = gr.Textbox(label="Thema / Stichwort", placeholder="z. B. Frankreich, Mars") a_input = gr.Textbox(label="Text", placeholder="Langer Infotext", lines=6) k_input = gr.Textbox(label="Kategorie / Bereich (optional)", placeholder="z. B. Geschichte") lern_btn = gr.Button("Wissen speichern", variant="primary") lern_out = gr.Textbox(label="Ergebnis", interactive=False) lern_btn.click(ui_web_lernen, inputs=[admin_pw, q_input, a_input, k_input], outputs=lern_out) with gr.Tab("🌐 Link lernen (Multi)"): gr.Markdown("Ein oder mehrere öffentliche Links einfügen (durch neue Zeile getrennt). Die KI liest und lernt diese.") link_urls = gr.Textbox(label="Links (Eine URL pro Zeile)", placeholder="https://...\nhttps://...", lines=5) link_topic = gr.Textbox(label="Themen (Optional, ein Thema pro Zeile passend zur URL)", lines=5) link_cat = gr.Textbox(label="Kategorien (Optional, eine Kategorie pro Zeile)", lines=5) link_btn = gr.Button("Links lernen", variant="primary") link_out = gr.Textbox(label="Ergebnis", lines=8, interactive=False) link_btn.click(ui_link_lernen_multi, inputs=[admin_pw, link_urls, link_topic, link_cat], outputs=link_out) with gr.Tab("📄 PDF lernen"): gr.Markdown("Lade eine PDF-Datei hoch, um ihren Text zu analysieren und als Wissen zu speichern.") pdf_file = gr.File(label="PDF Datei", file_types=[".pdf"]) pdf_topic = gr.Textbox(label="Thema / Stichwort (optional)") pdf_cat = gr.Textbox(label="Kategorie / Bereich (optional)") pdf_btn = gr.Button("Dokument lernen", variant="primary") pdf_out = gr.Textbox(label="Ergebnis", lines=6, interactive=False) pdf_btn.click(ui_pdf_lernen, inputs=[admin_pw, pdf_file, pdf_topic, pdf_cat], outputs=pdf_out) with gr.Tab("🔍 Suchen / Löschen"): gr.Markdown("Suche in der Datenbank oder lösche Einträge wieder.") search_box = gr.Textbox(label="Suchbegriff", placeholder="z. B. Frankreich") search_btn = gr.Button("Suchen") search_out = gr.Textbox(label="Treffer", lines=12, interactive=False) del_box = gr.Textbox(label="Löschen nach Begriff", placeholder="z. B. Frankreich") del_btn = gr.Button("Löschen", variant="secondary") del_out = gr.Textbox(label="Lösch-Ergebnis", interactive=False) all_del_btn = gr.Button("ALLES löschen", variant="stop") all_del_out = gr.Textbox(label="Alles löschen", interactive=False) search_btn.click(ui_wissen_suchen, inputs=[search_box], outputs=search_out) del_btn.click(ui_wissen_loeschen, inputs=[admin_pw, del_box], outputs=del_out) all_del_btn.click(ui_wissen_alle_loeschen, inputs=[admin_pw], outputs=all_del_out) def do_login(pw): if pw == ADMIN_CODE: return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) return gr.update(visible=True), gr.update(visible=False), gr.update(value="**❌ Falscher Admin-Code**", visible=True) login_btn.click(do_login, inputs=[admin_pw], outputs=[login_col, admin_col, login_err]) demo.queue(default_concurrency_limit=8) return demo # ========================================================= # LOKALER CHAT (FALLBACK) # ========================================================= def local_terminal_chat(): print("Lokaler Chat gestartet. Tippe 'exit' zum Beenden.") while True: user = input("Du: ").strip() if user.lower() in {"exit", "quit", "ende"}: break if not user: continue reply = generate_reply(user) print("Bot:", reply) # ========================================================= # BOOTSTRAP # ========================================================= def bootstrap(): global api_chat_historie ensure_json_list_file(WISSEN_FILE) ensure_json_list_file(CHAT_FILE) sync_wissen_from_hf() api_chat_historie = load_chat_history() init_model_if_needed() if os.environ.get("SPACE_ID"): app = erzeuge_gradio_app() app.queue().launch() else: local_terminal_chat() if __name__ == "__main__": bootstrap()