| import os |
| import time |
| import concurrent.futures |
| import gradio as gr |
| from openai import OpenAI |
| import PyPDF2 |
| import chromadb |
| from chromadb.utils import embedding_functions |
| from theme import CustomTheme |
| import base64 |
| import mimetypes |
|
|
| |
| WIKI_PATH = "./minecraftwiki/minecraft_wiki_polished/" |
|
|
| |
| TEST_MODE = False |
| SHOW_SOURCES = False |
|
|
| TEST_TARGETS = [ |
| "Anvil.txt", "Trident.txt", "Chest.txt", "Diamond Pickaxe.txt", |
| "Crafting Table.txt", "Furnace.txt", "Beacon.txt", "Enchanting Table.txt", |
| "Smithing Table.txt" |
| ] |
|
|
| CONTEXT_SIZE = 8 |
| CHUNK_SIZE = 600 |
| CHUNK_OVERLAP = 0 |
| BATCH_SIZE = 50 |
|
|
| |
| client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) |
|
|
| chroma_client = chromadb.PersistentClient(path="./chroma_db") |
| openai_ef = embedding_functions.OpenAIEmbeddingFunction( |
| api_key=os.environ.get("OPENAI_API_KEY"), |
| model_name="text-embedding-3-large" |
| ) |
|
|
| |
| def chunk_text(text, chunk_size=CHUNK_SIZE, overlap=CHUNK_OVERLAP): |
| chunks = [] |
| start = 0 |
| while start < len(text): |
| end = start + chunk_size |
| chunk = text[start:end] |
| if chunk.strip(): |
| chunks.append(chunk) |
| start = end - overlap |
| return chunks |
|
|
| def process_single_file(filepath, filename): |
| file_chunks = [] |
| file_metadatas = [] |
| try: |
| if filename.endswith('.pdf'): |
| with open(filepath, 'rb') as f: |
| pdf_reader = PyPDF2.PdfReader(f) |
| for page_num, page in enumerate(pdf_reader.pages, 1): |
| page_text = page.extract_text() |
| if page_text and page_text.strip(): |
| chunks = chunk_text(page_text) |
| for i, chunk in enumerate(chunks): |
| file_chunks.append(chunk) |
| file_metadatas.append({ |
| "filename": filename, "path": filepath, "page": page_num, "chunk_id": i, "source_type": "pdf" |
| }) |
| elif filename.endswith(('.txt', '.md')): |
| with open(filepath, 'r', encoding='utf-8', errors='replace') as f: |
| text = f.read() |
| if text.strip(): |
| chunks = chunk_text(text) |
| for i, chunk in enumerate(chunks): |
| file_chunks.append(chunk) |
| file_metadatas.append({ |
| "filename": filename, "path": filepath, "page": 0, "chunk_id": i, "source_type": "text" |
| }) |
| return file_chunks, file_metadatas |
| except Exception as e: |
| return [], [] |
|
|
| def load_documents_to_vectordb(root_path=WIKI_PATH): |
| print(f"🔍 Starte Indexierung in: {root_path}") |
| if not os.path.exists(root_path): |
| print(f"⚠️ Pfad nicht gefunden: {root_path}") |
| return None |
|
|
| try: |
| chroma_client.delete_collection(name="documents") |
| except: |
| pass |
| |
| collection = chroma_client.create_collection(name="documents", embedding_function=openai_ef) |
| all_files_to_process = [] |
| |
| for current_root, dirs, files in os.walk(root_path): |
| for file in files: |
| full_path = os.path.join(current_root, file) |
| if TEST_MODE: |
| if file in TEST_TARGETS: |
| all_files_to_process.append((full_path, file)) |
| else: |
| if file.endswith(('.txt', '.md', '.pdf')) and not file.startswith('.'): |
| all_files_to_process.append((full_path, file)) |
| |
| print(f"📂 Gesamt gefunden: {len(all_files_to_process)} Dateien.") |
| all_chunks = [] |
| all_metadatas = [] |
| |
| with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: |
| future_to_file = {executor.submit(process_single_file, path, name): name for path, name in all_files_to_process} |
| for future in concurrent.futures.as_completed(future_to_file): |
| chunks, metadatas = future.result() |
| if chunks: |
| all_chunks.extend(chunks) |
| all_metadatas.extend(metadatas) |
|
|
| total_chunks = len(all_chunks) |
| if total_chunks > 0: |
| for i in range(0, total_chunks, BATCH_SIZE): |
| end_idx = min(i + BATCH_SIZE, total_chunks) |
| batch_chunks = all_chunks[i:end_idx] |
| batch_metadatas = all_metadatas[i:end_idx] |
| batch_ids = [f"doc_{j}" for j in range(i, end_idx)] |
| try: |
| collection.add(documents=batch_chunks, metadatas=batch_metadatas, ids=batch_ids) |
| except Exception as e: |
| print(f"Error Batch {i}: {e}") |
|
|
| return collection |
|
|
| def get_relevant_context(query, collection, n_results=CONTEXT_SIZE): |
| try: |
| results = collection.query(query_texts=[query], n_results=n_results) |
| context = "" |
| if results['documents']: |
| for i, (doc, metadata) in enumerate(zip(results['documents'][0], results['metadatas'][0])): |
| context += f"\n--- Quelle: {metadata['filename']} ---\n{doc}\n" |
| return context |
| except Exception as e: |
| return "" |
|
|
| |
| try: |
| collection = chroma_client.get_collection(name="documents", embedding_function=openai_ef) |
| if TEST_MODE or collection.count() == 0: |
| collection = load_documents_to_vectordb() |
| else: |
| print(f"DB geladen ({collection.count()} Chunks).") |
| except: |
| collection = load_documents_to_vectordb() |
|
|
| |
| def response(message, history): |
| context = get_relevant_context(message, collection, n_results=CONTEXT_SIZE) |
| debug_info = f"\n\n---\n**🔍 GENUTZTER KONTEXT:**\n```text\n{context}\n```" |
|
|
| if not context.strip(): |
| msg = "Dazu habe ich in meinem aktuell Kontext leider keine Informationen gefunden." |
| if SHOW_SOURCES: yield msg + debug_info |
| else: yield msg |
| return |
|
|
| system_prompt = f"""Du bist ein hilfreicher Minecraft-Experte. Deine Aufgabe ist es, Fragen basierend auf dem bereitgestellten KONTEXT zu beantworten. |
| |
| KONTEXT AUS DATENBANK: |
| --------------------- |
| {context} |
| --------------------- |
| |
| ANWEISUNGEN FÜR DEINE ANTWORT: |
| |
| 1. **ANALYSE**: Prüfe, ob der Nutzer nach einem Crafting-Rezept fragt oder eine allgemeine Frage stellt (z.B. Haltbarkeit, Fundort, Schaden). |
| |
| 2. **BEI CRAFTING-REZEPTEN**: |
| Suche im Kontext nach `=== Crafting Template ===` oder Rezept-Daten. |
| |
| * **FALL A: Geformtes Rezept (Shaped)** |
| * Erkennbar daran, dass Zutaten spezifischen Slots zugewiesen sind (z.B. A1, B2, C3). |
| * Erstelle ZWINGEND eine **3x3 Markdown-Tabelle**. |
| * A1 ist oben links, C3 unten rechts. Leere Slots bleiben leer. |
| |
| * **FALL B: Formloses Rezept (Shapeless)** |
| * Erkennbar daran, dass Zutaten gelistet sind, aber **keine Slot-Nummern** (wie A1, B2) davor stehen. |
| * Erstelle **KEINE Tabelle**. |
| * Liste die benötigten Zutaten als Stichpunkte auf. |
| * **WICHTIG:** Schreibe explizit dazu: *"Die Position im Crafting Table ist egal."* |
| |
| * **FALL C: Nicht herstellbar** |
| * Wenn im Kontext steht "not craftable" oder Ähnliches, antworte: "Das Item kann nicht hergestellt werden" und erkläre kurz, wie man es stattdessen bekommt (z.B. Drop, Kiste). |
| |
| 3. **BEI ALLGEMEINEN FRAGEN**: |
| * Wenn der Nutzer nicht nach einem Rezept fragt (z.B. "Wie viel Haltbarkeit hat X?", "Was droppt Y?"), antworte in einem gut lesbaren Fließtext. |
| * Verwende Fettgedrucktes für Schlüsselbegriffe. |
| |
| 4. **WISSENS-GRENZE**: |
| * Nutze NUR die Informationen aus dem obigen Kontext. Wenn die Info dort fehlt, erfinde nichts, sondern sage, dass die Info in den Dateien fehlt. |
| """ |
| |
| messages = [{"role": "system", "content": system_prompt}] |
| for msg in history: |
| messages.append({"role": msg["role"], "content": msg["content"]}) |
| messages.append({"role": "user", "content": message}) |
| |
| stream = client.chat.completions.create( |
| model="gpt-4o-mini", messages=messages, temperature=0.0, stream=True |
| ) |
| |
| answer = "" |
| for chunk in stream: |
| if chunk.choices and chunk.choices[0].delta.content: |
| answer += chunk.choices[0].delta.content |
| yield answer |
| |
| if SHOW_SOURCES: |
| answer += debug_info |
| yield answer |
|
|
| |
| def load_css(): |
| with open("./style.css", "r") as f: |
| return f.read() |
|
|
| def chat_wrapper(message, history): |
| new_history = history + [{"role": "user", "content": message}] |
| new_history.append({"role": "assistant", "content": ""}) |
| gen = response(message, history) |
| full_response = "" |
| for chunk in gen: |
| full_response = chunk |
| new_history[-1]["content"] = full_response |
| yield "", new_history |
|
|
| |
| def main(): |
| css_content = load_css() |
| theme = CustomTheme() |
|
|
| |
| js_set_default = "(x) => { document.documentElement.setAttribute('data-theme', 'default'); }" |
| js_set_night = "(x) => { document.documentElement.setAttribute('data-theme', 'night'); }" |
| js_set_forest = "(x) => { document.documentElement.setAttribute('data-theme', 'forest'); }" |
|
|
| with gr.Blocks(theme=theme, css=css_content, title="SteveGPT") as demo: |
| |
| |
| menu_visible = gr.State(False) |
| crafting_visible = gr.State(False) |
|
|
| |
| with gr.Column(elem_id="main-wrapper"): |
| |
| |
| with gr.Row(elem_id="header-row"): |
| |
| |
| with gr.Column(scale=1, min_width=120, elem_id="theme-col"): |
| theme_btn = gr.Button("Themes", elem_id="theme-btn") |
| |
| with gr.Group(visible=False, elem_id="theme-menu-group") as theme_menu: |
| btn_default = gr.Button("Default", size="sm", elem_classes=["menu-btn"]) |
| btn_night = gr.Button("Nacht", size="sm", elem_classes=["menu-btn"]) |
| btn_forest = gr.Button("Wald", size="sm", elem_classes=["menu-btn"]) |
|
|
| |
| with gr.Column(scale=10): |
| pass |
|
|
| |
| with gr.Column(scale=1, min_width=120, elem_id="crafting-col"): |
| crafting_btn = gr.Button("Crafting", elem_id="crafting-btn") |
| |
| |
| with gr.Group(visible=False, elem_id="crafting-popup") as crafting_menu: |
| gr.HTML(""" |
| <div class="crafting-gui-container"> |
| <div class="crafting-label">CRAFTING</div> |
| <div class="crafting-layout"> |
| <div class="crafting-grid-3x3"> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| <div class="mc-slot"><img src="" class="slot-item" alt=""></div> |
| </div> |
| |
| <div class="crafting-arrow-container"> |
| <div class="arrow-shaft"></div> |
| <div class="arrow-tip"></div> |
| </div> |
| |
| <div class="crafting-output-slot"> |
| <div class="mc-slot large"><img src="" class="slot-item" alt=""></div> |
| </div> |
| </div> |
| </div> |
| """) |
|
|
| |
| chatbot = gr.Chatbot( |
| value=[], |
| type="messages", |
| elem_id="minecraft-chat", |
| show_label=False, |
| avatar_images=("./assets/avatar_images/human.png", "./assets/avatar_images/steve.png"), |
| bubble_full_width=False, |
| ) |
|
|
| |
| with gr.Group(elem_id="footer-area"): |
| with gr.Row(elem_id="input-row"): |
| msg_box = gr.Textbox( |
| placeholder="Was möchtest du craften?", |
| show_label=False, container=False, |
| elem_id="minecraft-input", |
| autofocus=True, |
| scale=1 |
| ) |
| send_btn = gr.Button("Senden", elem_id="minecraft-send-btn", scale=0) |
|
|
| |
| |
| |
| def toggle_menu(is_visible): |
| return not is_visible, gr.update(visible=not is_visible) |
|
|
| |
| theme_btn.click( |
| fn=toggle_menu, |
| inputs=[menu_visible], |
| outputs=[menu_visible, theme_menu] |
| ) |
|
|
| |
| btn_default.click(None, [], [], js=js_set_default).then( |
| fn=lambda: (False, gr.update(visible=False)), outputs=[menu_visible, theme_menu] |
| ) |
| btn_night.click(None, [], [], js=js_set_night).then( |
| fn=lambda: (False, gr.update(visible=False)), outputs=[menu_visible, theme_menu] |
| ) |
| btn_forest.click(None, [], [], js=js_set_forest).then( |
| fn=lambda: (False, gr.update(visible=False)), outputs=[menu_visible, theme_menu] |
| ) |
|
|
| |
| crafting_btn.click( |
| fn=toggle_menu, |
| inputs=[crafting_visible], |
| outputs=[crafting_visible, crafting_menu] |
| ) |
|
|
| |
| msg_box.submit(chat_wrapper, inputs=[msg_box, chatbot], outputs=[msg_box, chatbot]) |
| send_btn.click(chat_wrapper, inputs=[msg_box, chatbot], outputs=[msg_box, chatbot]) |
|
|
| demo.launch(inbrowser=True, allowed_paths=["./assets"]) |
|
|
| if __name__ == "__main__": |
| main() |