Spaces:
Build error
Build error
| # Importiert die notwendigen Module und Klassen aus LangChain | |
| #from langchain.document_loaders import JSONLoader | |
| from langchain_community.document_loaders import JSONLoader | |
| # Beispiel für den korrekten Dateipfad | |
| file_path = "search_results.json" | |
| # Definiert das JQ-Schema, das angibt, wie die JSON-Daten verarbeitet werden sollen | |
| jq_schema = '.[]' | |
| # Initialisiert den JSONLoader mit dem angegebenen Dateipfad und JQ-Schema | |
| # text_content=False bedeutet, dass der Inhalt der JSON-Datei nicht als reiner Text geladen wird, | |
| # sondern dass die Struktur der JSON-Daten beibehalten wird | |
| loader = JSONLoader(file_path=file_path, jq_schema=jq_schema, text_content=False) | |
| # Importiere die notwendigen Module und Klassen | |
| from langchain.schema import Document | |
| from langchain.text_splitter import CharacterTextSplitter | |
| import json | |
| import tiktoken | |
| # Funktion zur Berechnung der Anzahl der Tokens | |
| def num_tokens_from_string(string: str) -> int: | |
| """Gibt die Anzahl der Tokens in einem Textstring zurück.""" | |
| # Verwendet das 'cl100k_base' Encoding, um die Anzahl der Tokens zu berechnen | |
| encoding = tiktoken.get_encoding('cl100k_base') | |
| # Kodiert den String und berechnet die Länge der Tokens | |
| num_tokens = len(encoding.encode(string)) | |
| return num_tokens | |
| # Definiere den Textsplitter | |
| text_splitter = CharacterTextSplitter( | |
| separator="\n\n", # Trennt den Text bei doppelten Zeilenumbrüchen | |
| chunk_size=400, # Maximale Größe der Textblöcke: 400 Zeichen | |
| chunk_overlap=100, # Überlappung zwischen Chunks: 100 Zeichen | |
| length_function=len, # Verwendet die Standardlen-Funktion von Python zur Längenberechnung | |
| is_separator_regex=False, # Trenner ist kein regulärer Ausdruck | |
| ) | |
| # Lädt die Dokumente aus der JSON-Datei | |
| docs = loader.load() | |
| # Teilt die Dokumente in Chunks auf | |
| documents = [] # Initialisiert eine leere Liste, um die aufgeteilten Dokumente zu speichern | |
| for d in docs: | |
| try: | |
| # Dekodiert den JSON-Inhalt des Dokuments in ein Wörterbuch | |
| page_content_dict = json.loads(d.page_content) | |
| # Konvertiert das Wörterbuch in einen JSON-String mit richtiger Unicode-Darstellung | |
| json_string = json.dumps(page_content_dict, ensure_ascii=False) | |
| # Teilt den JSON-String in Chunks auf | |
| chunks = text_splitter.split_text(json_string) | |
| for i, chunk in enumerate(chunks): | |
| # Erstellt ein neues Dokument für jeden Chunk mit Metadaten | |
| documents.append(Document(page_content=chunk, metadata={'chunk': i, **d.metadata})) | |
| except json.JSONDecodeError as e: | |
| # Gibt einen Fehler aus, wenn das Dekodieren des JSON fehlschlägt | |
| print(f"JSON-Dekodierungsfehler für Dokument {d.metadata}: {e}") | |
| continue | |
| # Gibt die Anzahl der Dokumente und ein Beispiel-Dokument zur Überprüfung aus | |
| print(f"Gesamtanzahl der Dokumente: {len(documents)}") | |
| print(f"""\nBeispiel-Inhalt der Seite:\n{documents[0].page_content}\n""") | |
| import requests | |
| from langchain.vectorstores import Chroma | |
| from langchain.embeddings import SentenceTransformerEmbeddings | |
| # Verwende ein vortrainiertes Modell für mehrsprachige Einbettungen | |
| embedding_function = SentenceTransformerEmbeddings(model_name="paraphrase-multilingual-MiniLM-L12-v2") | |
| # Erstelle eine Chroma-Datenbank mit den Dokumenten und der Einbettungsfunktion | |
| db = Chroma.from_documents(documents, embedding_function) | |
| from dotenv import load_dotenv | |
| import os | |
| import gradio as gr | |
| import json | |
| import csv | |
| import os | |
| import requests | |
| from langchain_huggingface import HuggingFaceEmbeddings # Aktualisierter Import | |
| # Verfügbare Modelle | |
| MODELS = [ | |
| {'name': 'mixtral-8x7b-instruct', 'value': 0}, | |
| {'name': 'meta-llama-3-70b-instruct', 'value': 1} | |
| ] | |
| # Globale Variable für den API-Schlüssel | |
| API_KEY = None | |
| # Funktion zum Senden der Nachricht an KISSKI | |
| def send_message_to_kisski(selected_model, messages): | |
| API_ENDPOINT = 'https://chat-ai.academiccloud.de/v1/chat/completions' | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'Authorization': f'Bearer {API_KEY}' | |
| } | |
| payload = { | |
| 'model': selected_model, | |
| 'messages': messages | |
| } | |
| try: | |
| response = requests.post(API_ENDPOINT, headers=headers, json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| print(f"Fehler beim Senden der Anfrage an KISSKI: {e}") | |
| return None | |
| # Funktion zum Abrufen von Informationen aus der Datenbank | |
| #def get_retrieved_info(query): | |
| # results = db.similarity_search(query, k=1) | |
| # if results: | |
| # return json.loads(results[0].page_content) | |
| #return "Keine relevanten Informationen gefunden." | |
| # Funktion zum Abrufen von Informationen aus der Datenbank | |
| def get_retrieved_info(query): | |
| results = db.similarity_search(query, k=3) # Ähnlichkeitssuche in der Datenbank, hier auf 3 belassen oder erhöhen | |
| json_objects = [] # Initialisiere eine Liste zur Speicherung der abgerufenen JSON-Objekte | |
| if results: | |
| for result in results: | |
| try: | |
| # Versuche, das `page_content` des Ergebnisses zu dekodieren | |
| json_object = json.loads(result.page_content) | |
| json_objects.append(json_object) # Füge das JSON-Objekt zur Liste hinzu | |
| except json.JSONDecodeError as e: | |
| print(f"JSON-Dekodierungsfehler für das Dokument mit Metadaten {result.metadata}: {e}") | |
| if json_objects: | |
| return json_objects # Gibt eine Liste von JSON-Objekten zurück | |
| return [] # Gibt eine leere Liste zurück, wenn keine relevanten Informationen gefunden werden | |
| # Funktion zum Formatieren der abgerufenen Informationen | |
| def format_retrieved_info(retrieved_info_list): | |
| if not retrieved_info_list or isinstance(retrieved_info_list, str): | |
| return retrieved_info_list | |
| formatted_info_list = [] # Initialisiere eine Liste zur Speicherung der formatierten Informationen | |
| for index, retrieved_info in enumerate(retrieved_info_list, start=1): | |
| title = retrieved_info.get("title_full", "Titel nicht verfügbar") # Titel abrufen | |
| authors = ", ".join(retrieved_info.get("author", [])) or "Autor(en) nicht verfügbar" # Autoren abrufen | |
| publish_date = ", ".join(retrieved_info.get("publishDate", [])) or "Veröffentlichungsdatum nicht verfügbar" # Veröffentlichungsdatum abrufen | |
| urls = "\n".join(retrieved_info.get("url", [])) or "URL nicht verfügbar" # URLs abrufen | |
| topics = ", ".join(retrieved_info.get("topic", [])) or "Themen nicht verfügbar" # Themen abrufen | |
| languages = ", ".join(retrieved_info.get("language", [])) or "Sprache(n) nicht verfügbar" # Sprachen abrufen | |
| formats = ", ".join(retrieved_info.get("format", [])) or "Format(e) nicht verfügbar" # Formate abrufen | |
| publisher = ", ".join(retrieved_info.get("publisher", [])) or "Verlag nicht verfügbar" # Verlag abrufen | |
| # Hier wird die ISBN-Liste flach gemacht | |
| isbn_list = retrieved_info.get("isbn", []) | |
| flattened_isbn = ", ".join([item if isinstance(item, str) else ", ".join(item) for item in isbn_list]) | |
| isbn = flattened_isbn or "ISBN nicht verfügbar" # ISBN abrufen | |
| physical = ", ".join(retrieved_info.get("physical", [])) or "Physische Beschreibung nicht verfügbar" # Physische Beschreibung abrufen | |
| catalog_id = retrieved_info.get("id", "ID nicht verfügbar") # Katalog-ID abrufen | |
| # Erstelle die formatierte Ausgabe | |
| formatted_info = ( | |
| f"{index}. Titel: {title}\n" | |
| f" Autor(en): {authors}\n" | |
| f" Veröffentlichungsdatum: {publish_date}\n" | |
| f" URL(s):\n {urls}\n" | |
| f" Themen: {topics}\n" | |
| f" Sprache(n): {languages}\n" | |
| f" Format(e): {formats}\n" | |
| f" Verlag: {publisher}\n" | |
| f" ISBN: {isbn}\n" | |
| f" Physische Beschreibung: {physical}\n" | |
| f" <a href='https://katalog.ulb.hhu.de/Record/{catalog_id}' target='_blank'>Katalog Link</a>\n" | |
| ) | |
| formatted_info_list.append(formatted_info) # Füge die formatierte Information zur Liste hinzu | |
| return "\n\n".join(formatted_info_list) # Gibt die formatierten Informationen als einen einzigen String zurück | |
| # Funktion zum Speichern der Interaktion zwischen Benutzer und Bot | |
| def save_interaction(user_input, bot_response, file_format='json'): | |
| interaction = { | |
| 'user_input': user_input, | |
| 'bot_response': bot_response | |
| } | |
| if file_format == 'json': | |
| with open('interactions.json', 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(interaction, ensure_ascii=False) + '\n') # Speichert als JSON | |
| elif file_format == 'csv': | |
| file_exists = os.path.isfile('interactions.csv') | |
| with open('interactions.csv', 'a', encoding='utf-8', newline='') as f: | |
| writer = csv.DictWriter(f, fieldnames=interaction.keys()) | |
| if not file_exists: | |
| writer.writeheader() # Schreibe die Header, wenn die Datei neu ist | |
| writer.writerow(interaction) # Speichert als CSV | |
| # Funktion zum Chatten mit dem Bot | |
| def chat_with_bot(model_choice, user_input): | |
| if not API_KEY: | |
| return "API-Schlüssel ist erforderlich. Bitte geben Sie Ihren API-Schlüssel ein." | |
| if model_choice is None: | |
| return "Modellwahl ist erforderlich. Bitte wähle ein Modell aus der Dropdown-Liste." | |
| if not user_input: | |
| return "Bitte geben Sie eine Frage oder Anweisung ein." | |
| try: | |
| model_choice = int(model_choice) # Versucht, die Modellwahl in eine Zahl umzuwandeln | |
| except ValueError: | |
| return "Ungültige Modellwahl. Bitte wähle eine Zahl zwischen 0 und 3." | |
| if model_choice < 0 or model_choice >= len(MODELS): | |
| return "Ungültige Modellwahl. Bitte wähle eine Zahl zwischen 0 und 3." | |
| selected_model = MODELS[model_choice]['name'] # Wählt das Modell basierend auf der Auswahl | |
| retrieved_info = get_retrieved_info(user_input) # Holt relevante Informationen aus der Datenbank | |
| if isinstance(retrieved_info, list) and retrieved_info: # Überprüfen, ob retrieved_info eine Liste ist und nicht leer | |
| formatted_info = format_retrieved_info(retrieved_info) # Formatiert die abgerufenen Informationen | |
| else: | |
| formatted_info = "Keine relevanten Informationen gefunden." # Fehlerbehandlung | |
| # Erstellt die Nachricht für den KISSKI API-Aufruf | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": ( | |
| "Du bist ein KI-Assistent, der ausschließlich auf den bereitgestellten Informationen basiert. " | |
| "Bitte antworte nur mit Informationen aus den abgerufenen Dokumenten und ignoriere alle anderen " | |
| "erlernten Daten oder allgemeinen Wissensquellen. " | |
| "Wenn die Antwort nicht im angegebenen Kontext steht, erkläre dies klar." | |
| ), | |
| }, | |
| { | |
| "role": "user", | |
| "content": user_input | |
| }, | |
| { | |
| "role": "assistant", | |
| "content": formatted_info # Verwende die formatierten Informationen | |
| } | |
| ] | |
| kisski_response = send_message_to_kisski(selected_model, messages) # Sendet die Anfrage an KISSKI | |
| if kisski_response and kisski_response.get('choices'): | |
| bot_response = f"Antwort auf Ihre Frage: {kisski_response['choices'][0]['message']['content']}\n\n" | |
| else: | |
| bot_response = "Keine Antwort von KISSKI erhalten." | |
| # Füge die abgerufenen Informationen zur Antwort hinzu | |
| full_response = f"{bot_response}<br><br><strong>Zusätzliche Informationen:</strong><br>{formatted_info}" | |
| save_interaction(user_input, full_response, file_format='json') # Speichert die Interaktion | |
| return full_response # Rückgabe der vollständigen Antwort | |
| # Gradio-Interface zur Benutzerinteraktion | |
| def gradio_interface(): | |
| with gr.Blocks(css=".footer {display: none;}") as demo: # CSS zur Ausblendung der Fußzeile | |
| # Header mit Markdown | |
| gr.Markdown( | |
| """ | |
| <h1 style='text-align: center;'>Chatbot über Klimawandel</h1> | |
| <p style='text-align: center;'>Wähle ein Modell und stelle deine Frage über den Klimawandel. Der Chatbot antwortet basierend auf abgerufenen Informationen aus der Ulb-Katalog!</p> | |
| """, | |
| elem_id="header" | |
| ) | |
| with gr.Column(): | |
| # API-Schlüssel Eingabefeld | |
| api_key_input = gr.Textbox(label="API-Schlüssel", type="password", placeholder="Geben Sie Ihren API-Schlüssel ein", lines=1) | |
| api_key_button = gr.Button("API-Schlüssel speichern", variant="primary") | |
| # Chat-Interface (anfangs versteckt) | |
| with gr.Column(visible=False) as chat_interface: | |
| model_dropdown = gr.Dropdown(choices=[(model['name'], str(model['value'])) for model in MODELS], label="Modellwahl", value="0") | |
| text_input = gr.Textbox(lines=5, label="Benutzereingabe", placeholder="Stelle deine Frage hier...") | |
| output = gr.HTML(label="Antwort des Bots", elem_id="output") | |
| submit_button = gr.Button("Los geht's", variant="primary") | |
| # Verbindung der Eingaben mit der Chat-Funktion | |
| submit_button.click(fn=chat_with_bot, inputs=[model_dropdown, text_input], outputs=output) | |
| # Funktion zur Anzeige des Chat-Interfaces nach Eingabe des API-Schlüssels | |
| def show_chat_interface(api_key): | |
| global API_KEY | |
| API_KEY = api_key # Setze den globalen API-Schlüssel | |
| return ( | |
| gr.update(visible=False), # Versteckt das API-Schlüssel-Eingabefeld | |
| gr.update(visible=False), # Versteckt den API-Schlüssel-Button | |
| gr.update(visible=True) # Zeigt das Chat-Interface | |
| ) | |
| api_key_button.click(fn=show_chat_interface, inputs=api_key_input, outputs=[api_key_input, api_key_button, chat_interface]) | |
| demo.launch(debug=True, share=True) # Startet die Gradio-Oberfläche | |
| if __name__ == "__main__": | |
| gradio_interface() # Hauptfunktion zum Ausführen des Gradio-Interfaces |