Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import gradio as gr | |
| import zipfile | |
| import tempfile | |
| import requests | |
| import urllib.parse | |
| import io | |
| from huggingface_hub import HfApi, login | |
| from PyPDF2 import PdfReader | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_groq import ChatGroq | |
| from dotenv import load_dotenv | |
| from langchain.docstore.document import Document | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Load configuration from JSON file | |
| with open('config.json') as config_file: | |
| config = json.load(config_file) | |
| PERSIST_DIRECTORY = config["persist_directory"] | |
| CHUNK_SIZE = config["chunk_size"] | |
| CHUNK_OVERLAP = config["chunk_overlap"] | |
| EMBEDDING_MODEL_NAME = config["embedding_model"] | |
| LLM_MODEL_NAME = config["llm_model"] | |
| LLM_TEMPERATURE = config["llm_temperature"] | |
| GITLAB_API_URL = config["gitlab_api_url"] | |
| HF_SPACE_NAME = config["hf_space_name"] | |
| REPOSITORY_DIRECTORY = config["repository_directory"] | |
| GROQ_API_KEY = os.environ["GROQ_API_KEY"] | |
| HF_TOKEN = os.environ["HF_Token"] | |
| login(HF_TOKEN) | |
| api = HfApi() | |
| def load_project_id(json_file): | |
| with open(json_file, 'r') as f: | |
| data = json.load(f) | |
| return data['project_id'] | |
| def download_gitlab_repo(): | |
| print("Start the upload_gitRepository function") | |
| project_id = load_project_id('repository_ids.json') | |
| encoded_project_id = urllib.parse.quote_plus(project_id) | |
| # Define the URL to download the repository archive | |
| archive_url = f"{GITLAB_API_URL}/projects/{encoded_project_id}/repository/archive.zip" | |
| # Download the repository archive | |
| response = requests.get(archive_url) | |
| archive_bytes = io.BytesIO(response.content) | |
| # Retrieve the original file name from the response headers | |
| content_disposition = response.headers.get('content-disposition') | |
| if content_disposition: | |
| filename = content_disposition.split('filename=')[-1].strip('\"') | |
| else: | |
| filename = 'archive.zip' # Fallback to a default name if not found | |
| # Check if the file already exists in the repository | |
| existing_files = api.list_repo_files(repo_id=HF_SPACE_NAME, repo_type='space') | |
| target_path = f"{REPOSITORY_DIRECTORY}/{filename}" | |
| print(f"Target Path: '{target_path}'") | |
| print(f"Existing Files: {[repr(file) for file in existing_files]}") | |
| if target_path in existing_files: | |
| print(f"File '{target_path}' already exists in the repository. Skipping upload...") | |
| else: | |
| # Upload the ZIP file to the new folder in the Hugging Face space repository | |
| print("Uploading File to directory:") | |
| print(f"Archive Bytes: {repr(archive_bytes.getvalue())[:100]}") # Show a preview of bytes | |
| print(f"Target Path in Repo: '{target_path}'") | |
| api.upload_file( | |
| path_or_fileobj=archive_bytes, | |
| path_in_repo=target_path, | |
| repo_id=HF_SPACE_NAME, | |
| repo_type='space' | |
| ) | |
| print("Upload complete") | |
| def get_all_files_in_folder(temp_dir, partial_paths): | |
| print("inner method of get all files in folder") | |
| target_dir = os.path.join(temp_dir, partial_path) | |
| print(target_dir) | |
| for root, dirs, files in os.walk(target_dir): | |
| print(f"Files in current directory ({root}): {files}") | |
| for file in files: | |
| print(f"Processing file: {file}") | |
| all_files.append(os.path.join(root, file)) | |
| return all_files | |
| def get_file(temp_dir, file_path): | |
| full_path = os.path.join(temp_dir, file_path) | |
| return full_path | |
| def find_folder_path(root_directory, target_folder): | |
| for root, dirs, _ in os.walk(root_directory): | |
| if target_folder in dirs: | |
| folder_path = os.path.join(root, target_folder) | |
| return folder_path | |
| return | |
| def process_directory(directory, partial_paths=None, file_paths=None): | |
| all_texts = [] | |
| file_references = [] | |
| zip_files = [file for file in os.listdir(directory) if file.endswith('.zip')] | |
| if not zip_files: | |
| print("No zip file found in the directory.") | |
| return all_texts, file_references | |
| if len(zip_files) > 1: | |
| print("More than one zip file found.") | |
| return all_texts, file_references | |
| else: | |
| zip_file_path = os.path.join(directory, zip_files[0]) | |
| # Create a temporary directory for the zip file | |
| with tempfile.TemporaryDirectory() as tmpdirname: | |
| # Unzip the file into the temporary directory | |
| with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | |
| zip_ref.extractall(tmpdirname) | |
| print(f"Extracted {zip_file_path} to {tmpdirname}") | |
| files = [] | |
| unzipped_root = os.listdir(tmpdirname) | |
| if len(unzipped_root) == 1 and os.path.isdir(os.path.join(tmpdirname, unzipped_root[0])): | |
| tmpsubdirpath= os.path.join(tmpdirname, unzipped_root[0]) | |
| else: | |
| tmpsubdirpath = tmpdirname | |
| if partial_paths: | |
| print("Go in partial_paths") | |
| for path in partial_paths: | |
| files += get_all_files_in_folder(tmpsubdirpath, partial_paths) | |
| else: | |
| print("wtf") | |
| for root, _, files_list in os.walk(tmpdirname): | |
| for file in files_list: | |
| files.append(os.path.join(root, file)) | |
| if file_paths: | |
| print("Go in normal paths") | |
| files += [get_file(tmpdirname, file_path) for file_path in file_paths] | |
| print(f"Total number of files: {len(files)}") | |
| for file_path in files: | |
| print(f"Paths of files: {files}") | |
| file_ext = os.path.splitext(file_path)[1] | |
| if os.path.getsize(file_path) == 0: | |
| print(f"Skipping an empty file: {file_path}") | |
| continue | |
| with open(file_path, 'rb') as f: | |
| if file_ext in ['.rst', '.md', '.txt', '.html', '.json', '.yaml', '.py']: | |
| text = f.read().decode('utf-8') | |
| print(f"Extracted text from {file_path}:\n{text[:50]}...\n") | |
| elif file_ext in ['.svg']: | |
| text = f"SVG file content from {file_path}" | |
| elif file_ext in ['.png', '.ico']: | |
| text = f"Image metadata from {file_path}" | |
| else: | |
| continue | |
| all_texts.append(text) | |
| file_references.append(file_path) | |
| print(f"Print the text for testing broooo {all_texts}") | |
| return all_texts, file_references | |
| # Split text into chunks | |
| def split_into_chunks(texts, references, chunk_size, chunk_overlap): | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
| chunks = [] | |
| for text, reference in zip(texts, references): | |
| chunks.extend([Document(page_content=chunk, metadata={"source": reference}) for chunk in text_splitter.split_text(text)]) | |
| print(f"Total number of chunks: {len(chunks)}") | |
| return chunks | |
| # Setup Chroma | |
| def setup_chroma(chunks, model_name="sentence-transformers/all-mpnet-base-v2", persist_directory="chroma_data"): | |
| embedding_model = HuggingFaceEmbeddings(model_name=model_name) | |
| vectorstore = Chroma.from_documents(chunks, embedding=embedding_model, persist_directory=persist_directory) | |
| return vectorstore | |
| # Setup LLM | |
| def setup_llm(model_name, temperature, api_key): | |
| llm = ChatGroq(model=model_name, temperature=temperature, api_key=api_key) | |
| return llm | |
| def query_chroma(vectorstore, query, k): | |
| results = vectorstore.similarity_search(query, k=k) | |
| chunks_with_references = [(result.page_content, result.metadata["source"]) for result in results] | |
| # Print the chosen chunks and their sources to the console | |
| print("\nChosen chunks and their sources for the query:") | |
| for chunk, source in chunks_with_references: | |
| print(f"Source: {source}\nChunk: {chunk}\n") | |
| print("-" * 50) | |
| return chunks_with_references | |
| def rag_workflow(query): | |
| docs = query_chroma(vectorstore, query, k=10) | |
| context = "\n\n".join([doc for doc, _ in docs]) | |
| references = "\n".join([f"[{i+1}] {ref}" for i, (_, ref) in enumerate(docs)]) | |
| print(f"Context for the query:\n{context}\n") | |
| print(f"References for the query:\n{references}\n") | |
| prompt = f"""You are an expert python developer. Provide a clear and consice answer based only on the information in the retrieved context. | |
| The retrieved context contains source code and documenation of an api library. | |
| If no related Information is found from the context to answer the query, reply that you do not know. | |
| Context: | |
| {context} | |
| Query: | |
| {query} | |
| """ | |
| response = llm.invoke(prompt) | |
| return response.content, references | |
| def initialize(): | |
| global vectorstore, chunks, llm | |
| partial_paths = ['kadi-apy-master/docs/source/setup/', 'kadi-apy-master/kadi_apy/lib/','kadi-apy-master/docs/source/usage/'] | |
| file_paths = [] | |
| all_texts, file_references = process_directory(REPOSITORY_DIRECTORY, partial_paths, file_paths) | |
| chunks = split_into_chunks(all_texts, file_references, 512, 0) | |
| print(f"Total number of chunks: {len(chunks)}") | |
| # vectorstore = setup_chroma(chunks, EMBEDDING_MODEL_NAME, PERSIST_DIRECTORY) | |
| # llm = setup_llm(LLM_MODEL_NAME, LLM_TEMPERATURE, GROQ_API_KEY) | |
| initialize() | |
| # Gradio utils | |
| def check_input_text(text): | |
| if not text: | |
| gr.Warning("Please input a question.") | |
| raise TypeError | |
| return True | |
| def add_text(history, text): | |
| history = history + [(text, None)] | |
| yield history, "" | |
| import gradio as gr | |
| def bot_kadi(history): | |
| user_query = history[-1][0] | |
| response, references = rag_workflow(user_query) | |
| history[-1] = (user_query, response) | |
| # Format references for display with text passages | |
| formatted_references = "" | |
| docs = query_chroma(vectorstore, user_query, k=5) | |
| for i, (doc, ref) in enumerate(docs): | |
| formatted_references += f""" | |
| <div style="border: 1px solid #ddd; padding: 10px; margin-bottom: 10px; border-radius: 5px;"> | |
| <h3 style="margin-top: 0;">Reference {i+1}</h3> | |
| <p><strong>Source:</strong> {ref}</p> | |
| <button onclick="var elem = document.getElementById('text-{i}'); var button = this; if (elem.style.display === 'block') {{ elem.style.display = 'none'; button.innerHTML = '▶ show source text'; }} else {{ elem.style.display = 'block'; button.innerHTML = '▼ hide source text'; }}">{{'▶ show source text'}}</button> | |
| <div id="text-{i}" style="display: none;"> | |
| <p><strong>Text:</strong> {doc}</p> | |
| </div> | |
| </div> | |
| """ | |
| yield history, formatted_references | |
| def main(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## Kadi4Mat - AI Chat-Bot") | |
| gr.Markdown("AI assistant for Kadi4Mat based on RAG architecture powered by LLM") | |
| with gr.Tab("Kadi4Mat - AI Assistant"): | |
| with gr.Row(): | |
| with gr.Column(scale=10): | |
| chatbot = gr.Chatbot([], elem_id="chatbot", label="Kadi Bot", bubble_full_width=False, show_copy_button=True) | |
| user_txt = gr.Textbox(label="Question", placeholder="Type in your question and press Enter or click Submit") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| submit_btn = gr.Button("Submit", variant="primary") | |
| with gr.Column(scale=1): | |
| clear_btn = gr.Button("Clear", variant="stop") | |
| gr.Examples( | |
| examples=[ | |
| "Who is working on Kadi4Mat?", | |
| "How do i install the Kadi-Apy library?", | |
| "How do i install the Kadi-Apy library for development?", | |
| "I need a method to upload a file to a record", | |
| ], | |
| inputs=user_txt, | |
| outputs=chatbot, | |
| fn=add_text, | |
| label="Try asking...", | |
| cache_examples=False, | |
| examples_per_page=3, | |
| ) | |
| with gr.Column(scale=3): | |
| with gr.Tab("References"): | |
| doc_citation = gr.HTML("<p>References used in answering the question will be displayed below.</p>") | |
| #user_txt.submit(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot]) | |
| #submit_btn.click(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot]) | |
| user_txt.submit(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot, doc_citation]) | |
| submit_btn.click(check_input_text, user_txt, None).success(add_text, [chatbot, user_txt], [chatbot, user_txt]).then(bot_kadi, [chatbot], [chatbot, doc_citation]) | |
| clear_btn.click(lambda: None, None, chatbot, queue=False) | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() |