Spaces:
Runtime error
Runtime error
| import os | |
| from uuid import uuid4 | |
| import gradio as gr | |
| import chromadb | |
| from chromadb.config import Settings | |
| from pypdf import PdfReader | |
| from openai import OpenAI | |
| # Global ChromaDB client and collection | |
| chroma_client = None | |
| chroma_collection = None | |
| def get_text_from_file(file_path: str) -> str: | |
| """Read text from a .txt or .pdf file.""" | |
| if file_path.lower().endswith(".txt"): | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| elif file_path.lower().endswith(".pdf"): | |
| reader = PdfReader(file_path) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() or "" | |
| text += page_text + "\n" | |
| return text | |
| else: | |
| raise ValueError("Unsupported file type. Please upload .txt or .pdf.") | |
| def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200): | |
| """Simple character-based chunking with overlap.""" | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| n = len(text) | |
| while start < n: | |
| end = min(start + chunk_size, n) | |
| chunk = text[start:end] | |
| chunk = chunk.strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| start = end - overlap | |
| if start < 0: | |
| start = 0 | |
| return chunks | |
| def embed_texts(texts, openai_key: str): | |
| """Get OpenAI embeddings for a list of texts.""" | |
| client = OpenAI(api_key=openai_key) | |
| response = client.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=texts, | |
| ) | |
| return [d.embedding for d in response.data] | |
| def ingest_document(openai_key: str, file_path: str): | |
| """ | |
| Ingest (index) the uploaded document into ChromaDB. | |
| """ | |
| global chroma_client, chroma_collection | |
| if not openai_key: | |
| return "Please enter your OpenAI API key first." | |
| if file_path is None: | |
| return "Please upload a .txt or .pdf document." | |
| try: | |
| text = get_text_from_file(file_path) | |
| except Exception as e: | |
| return f"Error reading file: {e}" | |
| if not text.strip(): | |
| return "No text could be extracted from the document." | |
| chunks = chunk_text(text) | |
| if not chunks: | |
| return "Document has no readable text to index." | |
| # Initialize ChromaDB (persistent) | |
| if chroma_client is None: | |
| chroma_client = chromadb.PersistentClient(path="chroma_db") | |
| # Re-create a fresh collection for this session/document | |
| try: | |
| chroma_client.delete_collection("docs") | |
| except Exception: | |
| pass # Ignore if it doesn't exist | |
| chroma_collection = chroma_client.create_collection("docs") | |
| # Embed chunks and add to ChromaDB | |
| try: | |
| embeddings = embed_texts(chunks, openai_key) | |
| except Exception as e: | |
| return f"Error creating embeddings: {e}" | |
| ids = [str(uuid4()) for _ in chunks] | |
| chroma_collection.add(ids=ids, documents=chunks, embeddings=embeddings) | |
| return f"Document ingested successfully with {len(chunks)} chunks." | |
| def answer_question(openai_key: str, question: str): | |
| """ | |
| Answer a question using RAG over the ingested document. | |
| """ | |
| global chroma_collection | |
| if not openai_key: | |
| return "Please enter your OpenAI API key first." | |
| if chroma_collection is None: | |
| return "Please upload and ingest a document first." | |
| if not question or not question.strip(): | |
| return "Please enter a question." | |
| question = question.strip() | |
| # Embed the question | |
| try: | |
| q_embedding = embed_texts([question], openai_key)[0] | |
| except Exception as e: | |
| return f"Error creating question embedding: {e}" | |
| # Retrieve relevant chunks from ChromaDB | |
| try: | |
| results = chroma_collection.query( | |
| query_embeddings=[q_embedding], | |
| n_results=4, | |
| ) | |
| except Exception as e: | |
| return f"Error querying ChromaDB: {e}" | |
| docs = results.get("documents", []) | |
| if not docs or not docs[0]: | |
| return "No relevant context found in the document index." | |
| context = "\n\n".join(docs[0]) | |
| # Call OpenAI chat completion with retrieved context | |
| client = OpenAI(api_key=openai_key) | |
| system_message = ( | |
| "You are a helpful assistant that answers questions using ONLY the " | |
| "provided context. If the answer is not in the context, say you don't know." | |
| ) | |
| user_message = f"Context:\n{context}\n\nQuestion: {question}" | |
| try: | |
| response = client.chat.completions.create( | |
| model="gpt-4.1-mini", | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": user_message}, | |
| ], | |
| temperature=0.2, | |
| ) | |
| answer = response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error calling OpenAI Chat Completion: {e}" | |
| return answer | |
| def build_interface(): | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## RAG Demo: ChromaDB + OpenAI + Gradio") | |
| openai_key = gr.Textbox( | |
| label="OpenAI API Key", | |
| type="password", | |
| placeholder="Enter your OpenAI key (sk-...)", | |
| ) | |
| with gr.Row(): | |
| file_input = gr.File( | |
| label="Upload a .txt or .pdf document (drag & drop)", | |
| file_types=[".txt", ".pdf"], | |
| type="filepath", | |
| ) | |
| ingest_button = gr.Button("Ingest Document") | |
| ingest_status = gr.Textbox( | |
| label="Ingestion Status", | |
| interactive=False, | |
| ) | |
| question = gr.Textbox( | |
| label="Ask a question about the ingested document", | |
| placeholder="Type your question here...", | |
| ) | |
| answer = gr.Textbox( | |
| label="Answer", | |
| lines=8, | |
| ) | |
| ask_button = gr.Button("Ask") | |
| # Wire events | |
| ingest_button.click( | |
| fn=ingest_document, | |
| inputs=[openai_key, file_input], | |
| outputs=ingest_status, | |
| ) | |
| ask_button.click( | |
| fn=answer_question, | |
| inputs=[openai_key, question], | |
| outputs=answer, | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = build_interface() | |
| demo.launch() | |