RAG-Chatbot / app.py
samanehilchi's picture
Update app.py
b0d4bce verified
# Importiert die notwendigen Module und Klassen aus LangChain
#from langchain.document_loaders import JSONLoader
from langchain_community.document_loaders import JSONLoader
# Beispiel für den korrekten Dateipfad
file_path = "search_results.json"
# Definiert das JQ-Schema, das angibt, wie die JSON-Daten verarbeitet werden sollen
jq_schema = '.[]'
# Initialisiert den JSONLoader mit dem angegebenen Dateipfad und JQ-Schema
# text_content=False bedeutet, dass der Inhalt der JSON-Datei nicht als reiner Text geladen wird,
# sondern dass die Struktur der JSON-Daten beibehalten wird
loader = JSONLoader(file_path=file_path, jq_schema=jq_schema, text_content=False)
# Importiere die notwendigen Module und Klassen
from langchain.schema import Document
from langchain.text_splitter import CharacterTextSplitter
import json
import tiktoken
# Funktion zur Berechnung der Anzahl der Tokens
def num_tokens_from_string(string: str) -> int:
"""Gibt die Anzahl der Tokens in einem Textstring zurück."""
# Verwendet das 'cl100k_base' Encoding, um die Anzahl der Tokens zu berechnen
encoding = tiktoken.get_encoding('cl100k_base')
# Kodiert den String und berechnet die Länge der Tokens
num_tokens = len(encoding.encode(string))
return num_tokens
# Definiere den Textsplitter
text_splitter = CharacterTextSplitter(
separator="\n\n", # Trennt den Text bei doppelten Zeilenumbrüchen
chunk_size=400, # Maximale Größe der Textblöcke: 400 Zeichen
chunk_overlap=100, # Überlappung zwischen Chunks: 100 Zeichen
length_function=len, # Verwendet die Standardlen-Funktion von Python zur Längenberechnung
is_separator_regex=False, # Trenner ist kein regulärer Ausdruck
)
# Lädt die Dokumente aus der JSON-Datei
docs = loader.load()
# Teilt die Dokumente in Chunks auf
documents = [] # Initialisiert eine leere Liste, um die aufgeteilten Dokumente zu speichern
for d in docs:
try:
# Dekodiert den JSON-Inhalt des Dokuments in ein Wörterbuch
page_content_dict = json.loads(d.page_content)
# Konvertiert das Wörterbuch in einen JSON-String mit richtiger Unicode-Darstellung
json_string = json.dumps(page_content_dict, ensure_ascii=False)
# Teilt den JSON-String in Chunks auf
chunks = text_splitter.split_text(json_string)
for i, chunk in enumerate(chunks):
# Erstellt ein neues Dokument für jeden Chunk mit Metadaten
documents.append(Document(page_content=chunk, metadata={'chunk': i, **d.metadata}))
except json.JSONDecodeError as e:
# Gibt einen Fehler aus, wenn das Dekodieren des JSON fehlschlägt
print(f"JSON-Dekodierungsfehler für Dokument {d.metadata}: {e}")
continue
# Gibt die Anzahl der Dokumente und ein Beispiel-Dokument zur Überprüfung aus
print(f"Gesamtanzahl der Dokumente: {len(documents)}")
print(f"""\nBeispiel-Inhalt der Seite:\n{documents[0].page_content}\n""")
import requests
from langchain.vectorstores import Chroma
from langchain.embeddings import SentenceTransformerEmbeddings
# Verwende ein vortrainiertes Modell für mehrsprachige Einbettungen
embedding_function = SentenceTransformerEmbeddings(model_name="paraphrase-multilingual-MiniLM-L12-v2")
# Erstelle eine Chroma-Datenbank mit den Dokumenten und der Einbettungsfunktion
db = Chroma.from_documents(documents, embedding_function)
from dotenv import load_dotenv
import os
import gradio as gr
import json
import csv
import os
import requests
from langchain_huggingface import HuggingFaceEmbeddings # Aktualisierter Import
# Verfügbare Modelle
MODELS = [
{'name': 'mixtral-8x7b-instruct', 'value': 0},
{'name': 'meta-llama-3-70b-instruct', 'value': 1}
]
# Globale Variable für den API-Schlüssel
API_KEY = None
# Funktion zum Senden der Nachricht an KISSKI
def send_message_to_kisski(selected_model, messages):
API_ENDPOINT = 'https://chat-ai.academiccloud.de/v1/chat/completions'
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {API_KEY}'
}
payload = {
'model': selected_model,
'messages': messages
}
try:
response = requests.post(API_ENDPOINT, headers=headers, json=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Fehler beim Senden der Anfrage an KISSKI: {e}")
return None
# Funktion zum Abrufen von Informationen aus der Datenbank
#def get_retrieved_info(query):
# results = db.similarity_search(query, k=1)
# if results:
# return json.loads(results[0].page_content)
#return "Keine relevanten Informationen gefunden."
# Funktion zum Abrufen von Informationen aus der Datenbank
def get_retrieved_info(query):
results = db.similarity_search(query, k=3) # Ähnlichkeitssuche in der Datenbank, hier auf 3 belassen oder erhöhen
json_objects = [] # Initialisiere eine Liste zur Speicherung der abgerufenen JSON-Objekte
if results:
for result in results:
try:
# Versuche, das `page_content` des Ergebnisses zu dekodieren
json_object = json.loads(result.page_content)
json_objects.append(json_object) # Füge das JSON-Objekt zur Liste hinzu
except json.JSONDecodeError as e:
print(f"JSON-Dekodierungsfehler für das Dokument mit Metadaten {result.metadata}: {e}")
if json_objects:
return json_objects # Gibt eine Liste von JSON-Objekten zurück
return [] # Gibt eine leere Liste zurück, wenn keine relevanten Informationen gefunden werden
# Funktion zum Formatieren der abgerufenen Informationen
def format_retrieved_info(retrieved_info_list):
if not retrieved_info_list or isinstance(retrieved_info_list, str):
return retrieved_info_list
formatted_info_list = [] # Initialisiere eine Liste zur Speicherung der formatierten Informationen
for index, retrieved_info in enumerate(retrieved_info_list, start=1):
title = retrieved_info.get("title_full", "Titel nicht verfügbar") # Titel abrufen
authors = ", ".join(retrieved_info.get("author", [])) or "Autor(en) nicht verfügbar" # Autoren abrufen
publish_date = ", ".join(retrieved_info.get("publishDate", [])) or "Veröffentlichungsdatum nicht verfügbar" # Veröffentlichungsdatum abrufen
urls = "\n".join(retrieved_info.get("url", [])) or "URL nicht verfügbar" # URLs abrufen
topics = ", ".join(retrieved_info.get("topic", [])) or "Themen nicht verfügbar" # Themen abrufen
languages = ", ".join(retrieved_info.get("language", [])) or "Sprache(n) nicht verfügbar" # Sprachen abrufen
formats = ", ".join(retrieved_info.get("format", [])) or "Format(e) nicht verfügbar" # Formate abrufen
publisher = ", ".join(retrieved_info.get("publisher", [])) or "Verlag nicht verfügbar" # Verlag abrufen
# Hier wird die ISBN-Liste flach gemacht
isbn_list = retrieved_info.get("isbn", [])
flattened_isbn = ", ".join([item if isinstance(item, str) else ", ".join(item) for item in isbn_list])
isbn = flattened_isbn or "ISBN nicht verfügbar" # ISBN abrufen
physical = ", ".join(retrieved_info.get("physical", [])) or "Physische Beschreibung nicht verfügbar" # Physische Beschreibung abrufen
catalog_id = retrieved_info.get("id", "ID nicht verfügbar") # Katalog-ID abrufen
# Erstelle die formatierte Ausgabe
formatted_info = (
f"{index}. Titel: {title}\n"
f" Autor(en): {authors}\n"
f" Veröffentlichungsdatum: {publish_date}\n"
f" URL(s):\n {urls}\n"
f" Themen: {topics}\n"
f" Sprache(n): {languages}\n"
f" Format(e): {formats}\n"
f" Verlag: {publisher}\n"
f" ISBN: {isbn}\n"
f" Physische Beschreibung: {physical}\n"
f" <a href='https://katalog.ulb.hhu.de/Record/{catalog_id}' target='_blank'>Katalog Link</a>\n"
)
formatted_info_list.append(formatted_info) # Füge die formatierte Information zur Liste hinzu
return "\n\n".join(formatted_info_list) # Gibt die formatierten Informationen als einen einzigen String zurück
# Funktion zum Speichern der Interaktion zwischen Benutzer und Bot
def save_interaction(user_input, bot_response, file_format='json'):
interaction = {
'user_input': user_input,
'bot_response': bot_response
}
if file_format == 'json':
with open('interactions.json', 'a', encoding='utf-8') as f:
f.write(json.dumps(interaction, ensure_ascii=False) + '\n') # Speichert als JSON
elif file_format == 'csv':
file_exists = os.path.isfile('interactions.csv')
with open('interactions.csv', 'a', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=interaction.keys())
if not file_exists:
writer.writeheader() # Schreibe die Header, wenn die Datei neu ist
writer.writerow(interaction) # Speichert als CSV
# Funktion zum Chatten mit dem Bot
def chat_with_bot(model_choice, user_input):
if not API_KEY:
return "API-Schlüssel ist erforderlich. Bitte geben Sie Ihren API-Schlüssel ein."
if model_choice is None:
return "Modellwahl ist erforderlich. Bitte wähle ein Modell aus der Dropdown-Liste."
if not user_input:
return "Bitte geben Sie eine Frage oder Anweisung ein."
try:
model_choice = int(model_choice) # Versucht, die Modellwahl in eine Zahl umzuwandeln
except ValueError:
return "Ungültige Modellwahl. Bitte wähle eine Zahl zwischen 0 und 3."
if model_choice < 0 or model_choice >= len(MODELS):
return "Ungültige Modellwahl. Bitte wähle eine Zahl zwischen 0 und 3."
selected_model = MODELS[model_choice]['name'] # Wählt das Modell basierend auf der Auswahl
retrieved_info = get_retrieved_info(user_input) # Holt relevante Informationen aus der Datenbank
if isinstance(retrieved_info, list) and retrieved_info: # Überprüfen, ob retrieved_info eine Liste ist und nicht leer
formatted_info = format_retrieved_info(retrieved_info) # Formatiert die abgerufenen Informationen
else:
formatted_info = "Keine relevanten Informationen gefunden." # Fehlerbehandlung
# Erstellt die Nachricht für den KISSKI API-Aufruf
messages = [
{
"role": "system",
"content": (
"Du bist ein KI-Assistent, der ausschließlich auf den bereitgestellten Informationen basiert. "
"Bitte antworte nur mit Informationen aus den abgerufenen Dokumenten und ignoriere alle anderen "
"erlernten Daten oder allgemeinen Wissensquellen. "
"Wenn die Antwort nicht im angegebenen Kontext steht, erkläre dies klar."
),
},
{
"role": "user",
"content": user_input
},
{
"role": "assistant",
"content": formatted_info # Verwende die formatierten Informationen
}
]
kisski_response = send_message_to_kisski(selected_model, messages) # Sendet die Anfrage an KISSKI
if kisski_response and kisski_response.get('choices'):
bot_response = f"Antwort auf Ihre Frage: {kisski_response['choices'][0]['message']['content']}\n\n"
else:
bot_response = "Keine Antwort von KISSKI erhalten."
# Füge die abgerufenen Informationen zur Antwort hinzu
full_response = f"{bot_response}<br><br><strong>Zusätzliche Informationen:</strong><br>{formatted_info}"
save_interaction(user_input, full_response, file_format='json') # Speichert die Interaktion
return full_response # Rückgabe der vollständigen Antwort
# Gradio-Interface zur Benutzerinteraktion
def gradio_interface():
with gr.Blocks(css=".footer {display: none;}") as demo: # CSS zur Ausblendung der Fußzeile
# Header mit Markdown
gr.Markdown(
"""
<h1 style='text-align: center;'>Chatbot über Klimawandel</h1>
<p style='text-align: center;'>Wähle ein Modell und stelle deine Frage über den Klimawandel. Der Chatbot antwortet basierend auf abgerufenen Informationen aus der Ulb-Katalog!</p>
""",
elem_id="header"
)
with gr.Column():
# API-Schlüssel Eingabefeld
api_key_input = gr.Textbox(label="API-Schlüssel", type="password", placeholder="Geben Sie Ihren API-Schlüssel ein", lines=1)
api_key_button = gr.Button("API-Schlüssel speichern", variant="primary")
# Chat-Interface (anfangs versteckt)
with gr.Column(visible=False) as chat_interface:
model_dropdown = gr.Dropdown(choices=[(model['name'], str(model['value'])) for model in MODELS], label="Modellwahl", value="0")
text_input = gr.Textbox(lines=5, label="Benutzereingabe", placeholder="Stelle deine Frage hier...")
output = gr.HTML(label="Antwort des Bots", elem_id="output")
submit_button = gr.Button("Los geht's", variant="primary")
# Verbindung der Eingaben mit der Chat-Funktion
submit_button.click(fn=chat_with_bot, inputs=[model_dropdown, text_input], outputs=output)
# Funktion zur Anzeige des Chat-Interfaces nach Eingabe des API-Schlüssels
def show_chat_interface(api_key):
global API_KEY
API_KEY = api_key # Setze den globalen API-Schlüssel
return (
gr.update(visible=False), # Versteckt das API-Schlüssel-Eingabefeld
gr.update(visible=False), # Versteckt den API-Schlüssel-Button
gr.update(visible=True) # Zeigt das Chat-Interface
)
api_key_button.click(fn=show_chat_interface, inputs=api_key_input, outputs=[api_key_input, api_key_button, chat_interface])
demo.launch(debug=True, share=True) # Startet die Gradio-Oberfläche
if __name__ == "__main__":
gradio_interface() # Hauptfunktion zum Ausführen des Gradio-Interfaces