Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import os | |
| import tempfile | |
| import shutil | |
| from chromadb_query import ChromaCollection | |
| from chromadb_upload import ChromaUploader | |
| # Global variables to store instances | |
| chroma_collection = None | |
| chroma_uploader = None | |
| current_api_key = None | |
| def initialize_chroma_components(api_key): | |
| """Initialize ChromaDB components with the provided API key""" | |
| global chroma_collection, chroma_uploader, current_api_key | |
| if not api_key: | |
| return "β Please provide an OpenAI API key" | |
| try: | |
| # Set the API key in environment | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| current_api_key = api_key | |
| # Initialize components | |
| db_path = "./db" | |
| os.makedirs(db_path, exist_ok=True) | |
| collection_name = "my_collection" | |
| chroma_collection = ChromaCollection(collection_name, db_path, api_key) | |
| chroma_uploader = ChromaUploader(collection_name, db_path, api_key) | |
| return "β ChromaDB components initialized successfully!" | |
| except Exception as e: | |
| return f"β Error initializing components: {str(e)}" | |
| def query_documents(api_key, query, progress=gr.Progress()): | |
| """Query the document collection with progress tracking""" | |
| global chroma_collection | |
| if not api_key: | |
| return "β Please provide an OpenAI API key" | |
| if not query.strip(): | |
| return "β Please enter a query" | |
| # Validate API key format | |
| if not api_key.startswith("sk-") or len(api_key) < 20: | |
| return "β Invalid OpenAI API key format. It should start with 'sk-' and be longer than 20 characters." | |
| try: | |
| # Step 1: Initialize components | |
| progress(0.1, desc="Initializing ChromaDB...") | |
| if chroma_collection is None or current_api_key != api_key: | |
| init_msg = initialize_chroma_components(api_key) | |
| if "Error" in init_msg: | |
| return init_msg | |
| # Step 2: Search for relevant documents | |
| progress(0.3, desc="Searching for relevant documents...") | |
| results = chroma_collection.query_collection([query], n_results=5) | |
| if not results['documents'][0]: | |
| return """β No documents found in the collection. | |
| π **Next steps:** | |
| 1. Go to the "π Upload Documents" tab | |
| 2. Upload some PDF files first | |
| 3. Come back and ask your question""" | |
| # Step 3: Generate answer using AI | |
| progress(0.7, desc="Generating AI-powered answer...") | |
| answer = chroma_collection.generate_answer(query, results) | |
| # Step 4: Finalize response | |
| progress(1.0, desc="Answer ready!") | |
| # Check if answer indicates an error | |
| if answer.startswith("Error generating answer"): | |
| return f"""β Error generating answer: {answer} | |
| π **Troubleshooting:** | |
| - Check your internet connection | |
| - Verify your OpenAI API key has credits | |
| - Try a simpler question | |
| - Wait a moment and try again""" | |
| # Count documents for context | |
| try: | |
| doc_count = chroma_collection.get_collection_count() | |
| context_info = f"\n\n---\n*Answer based on {len(results['documents'][0])} relevant chunks from {doc_count} total documents*" | |
| except: | |
| context_info = f"\n\n---\n*Answer based on {len(results['documents'][0])} relevant document chunks*" | |
| return f"π€ **Answer:**\n\n{answer}{context_info}" | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if "connection" in error_msg or "timeout" in error_msg: | |
| return f"""β Connection error: {str(e)} | |
| π **Troubleshooting:** | |
| - Check your internet connection | |
| - Verify OpenAI API is accessible | |
| - Try again in a few moments""" | |
| elif "api" in error_msg and "key" in error_msg: | |
| return f"""β API key error: {str(e)} | |
| π **Please check:** | |
| - Your API key is correct | |
| - Your OpenAI account has sufficient credits | |
| - The API key has the necessary permissions""" | |
| else: | |
| return f"β Error querying documents: {str(e)}" | |
| def upload_pdf(api_key, pdf_file, progress=gr.Progress()): | |
| """Upload and process PDF file with progress tracking""" | |
| global chroma_uploader | |
| if not api_key: | |
| return "β Please provide an OpenAI API key" | |
| if pdf_file is None: | |
| return "β Please upload a PDF file" | |
| # Validate API key format | |
| if not api_key.startswith("sk-") or len(api_key) < 20: | |
| return "β Invalid OpenAI API key format. It should start with 'sk-' and be longer than 20 characters." | |
| try: | |
| # Step 1: Initialize components | |
| progress(0.1, desc="Initializing ChromaDB components...") | |
| if chroma_uploader is None or current_api_key != api_key: | |
| init_msg = initialize_chroma_components(api_key) | |
| if "Error" in init_msg: | |
| return init_msg | |
| # Step 2: Read PDF file | |
| progress(0.2, desc="Reading PDF file...") | |
| with open(pdf_file.name, 'rb') as file: | |
| pdf_bytes = file.read() | |
| # Step 3: Extract text | |
| progress(0.4, desc="Extracting text from PDF...") | |
| pdf_text, pdf_lines = chroma_uploader.extract_text_from_pdf_bytes(pdf_bytes) | |
| if not pdf_text or not pdf_lines: | |
| return "β Could not extract text from the PDF file. Make sure it's a text-based PDF (not scanned images)." | |
| progress(0.6, desc=f"Processing {len(pdf_lines)} document chunks...") | |
| # Step 4: Add documents to ChromaDB | |
| progress(0.6, desc="Adding documents to ChromaDB...") | |
| # Create a progress callback function | |
| def progress_callback(prog, desc): | |
| progress(prog, desc=desc) | |
| success = chroma_uploader.add_documents(pdf_lines, progress_callback=progress_callback) | |
| # Step 5: Complete | |
| progress(1.0, desc="Processing complete!") | |
| if success: | |
| # Get updated count | |
| try: | |
| count = chroma_uploader.get_collection_count() | |
| return f"β Successfully processed PDF!\n\nπ Added document chunks from '{os.path.basename(pdf_file.name)}'\nποΈ Total documents in collection: {count}" | |
| except: | |
| return f"β Successfully processed and added document chunks from '{os.path.basename(pdf_file.name)}'!" | |
| else: | |
| return """β Failed to add documents to ChromaDB. | |
| π **Troubleshooting tips:** | |
| - Check your internet connection | |
| - Verify your OpenAI API key has credits | |
| - Try uploading a smaller PDF file | |
| - Wait a moment and try again (rate limits)""" | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if "connection" in error_msg or "timeout" in error_msg: | |
| return f"""β Connection error occurred: {str(e)} | |
| π **Troubleshooting:** | |
| - Check your internet connection | |
| - Verify OpenAI API is accessible | |
| - Try again in a few moments | |
| - If on Hugging Face, the service might be temporarily overloaded""" | |
| elif "api" in error_msg and "key" in error_msg: | |
| return f"""β API key error: {str(e)} | |
| π **Please check:** | |
| - Your API key is correct and starts with 'sk-' | |
| - Your OpenAI account has sufficient credits | |
| - The API key has the necessary permissions""" | |
| else: | |
| return f"β Error processing PDF: {str(e)}" | |
| def test_api_key(api_key): | |
| """Test if the API key is working""" | |
| if not api_key: | |
| return "β Please provide an OpenAI API key" | |
| if not api_key.startswith("sk-") or len(api_key) < 20: | |
| return "β Invalid API key format. OpenAI keys should start with 'sk-' and be longer than 20 characters." | |
| try: | |
| from openai import OpenAI | |
| client = OpenAI(api_key=api_key) | |
| # Test with a simple API call | |
| response = client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[{"role": "user", "content": "Hello"}], | |
| max_tokens=5 | |
| ) | |
| return "β API key is working! You can now upload documents and ask questions." | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| if "api" in error_msg and "key" in error_msg: | |
| return f"β API key error: Invalid or expired API key. Please check your key and account credits." | |
| elif "quota" in error_msg or "limit" in error_msg: | |
| return f"β Quota/rate limit error: Your API key has reached its limit or you're out of credits." | |
| elif "connection" in error_msg or "timeout" in error_msg: | |
| return f"β Connection error: Unable to reach OpenAI API. Check your internet connection." | |
| else: | |
| return f"β Error testing API key: {str(e)}" | |
| # Create Gradio interface | |
| def create_interface(): | |
| with gr.Blocks(title="CV Document Q&A System", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| """ | |
| # CV Document Q&A System | |
| Upload your CV PDF document and ask questions about its content using AI-powered search and retrieval. | |
| **β οΈ Important:** You need to provide your own OpenAI API key to use this application. | |
| """ | |
| ) | |
| # API Key input (will be hidden) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| api_key_input = gr.Textbox( | |
| label="π OpenAI API Key", | |
| placeholder="Enter your OpenAI API key (sk-...)", | |
| type="password", | |
| info="Your API key is not stored and is only used for this session" | |
| ) | |
| with gr.Column(scale=1): | |
| test_key_button = gr.Button("π§ͺ Test API Key", variant="secondary") | |
| api_test_output = gr.Markdown(label="API Key Status", value="") | |
| test_key_button.click( | |
| fn=lambda: gr.update(value="π§ͺ **Testing API key...**\n\n*Verifying your OpenAI API key and checking connectivity.*"), | |
| outputs=api_test_output, | |
| queue=False | |
| ).then( | |
| fn=test_api_key, | |
| inputs=[api_key_input], | |
| outputs=api_test_output, | |
| show_progress=True | |
| ) | |
| with gr.Tabs(): | |
| # Upload Tab (now first) | |
| with gr.Tab("π Upload Documents"): | |
| gr.Markdown("### Upload PDF documents to your knowledge base") | |
| pdf_upload = gr.File( | |
| label="Upload PDF File", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| upload_button = gr.Button("π Process PDF", variant="primary", scale=1) | |
| # Loading status | |
| with gr.Row(): | |
| upload_output = gr.Markdown(label="Upload Status", value="") | |
| # Add loading state management | |
| upload_button.click( | |
| fn=lambda: gr.update(value="β³ **Initializing upload process...**\n\n*Please wait while we process your PDF document.*"), | |
| outputs=upload_output, | |
| queue=False | |
| ).then( | |
| fn=upload_pdf, | |
| inputs=[api_key_input, pdf_upload], | |
| outputs=upload_output, | |
| show_progress=True | |
| ) | |
| # Collection info | |
| # info_button = gr.Button("π Check Collection Status") | |
| # with gr.Row(): | |
| # info_output = gr.Markdown(label="Collection Information", value="") | |
| # info_button.click( | |
| # fn=lambda: gr.update(value="π **Checking collection status...**\n\n*Retrieving information about your document collection.*"), | |
| # outputs=info_output, | |
| # queue=False | |
| # ).then( | |
| # fn=get_collection_info, | |
| # inputs=[api_key_input], | |
| # outputs=info_output, | |
| # show_progress=True | |
| # ) | |
| # Q&A Tab (now second) | |
| with gr.Tab("π€ Ask Questions"): | |
| gr.Markdown("### Ask questions about your uploaded documents") | |
| query_input = gr.Textbox( | |
| label="Your Question", | |
| placeholder="Ask me anything about your documents...", | |
| lines=3 | |
| ) | |
| query_button = gr.Button("π Get Answer", variant="primary", scale=1) | |
| # Loading status | |
| with gr.Row(): | |
| query_output = gr.Markdown(label="Answer", value="") | |
| # Add loading state management | |
| query_button.click( | |
| fn=lambda: gr.update(value="π€ **Searching for relevant information...**\n\n*Analyzing your question and finding the best answers from your documents.*"), | |
| outputs=query_output, | |
| queue=False | |
| ).then( | |
| fn=query_documents, | |
| inputs=[api_key_input, query_input], | |
| outputs=query_output, | |
| show_progress=True | |
| ) | |
| # Instructions | |
| with gr.Accordion("π How to Use & Troubleshooting", open=False): | |
| gr.Markdown( | |
| """ | |
| ### Instructions: | |
| 1. **Enter your OpenAI API Key** - Get one from [OpenAI's website](https://platform.openai.com/api-keys) | |
| 2. **Test your API Key** - Click "π§ͺ Test API Key" to verify it's working | |
| 3. **Upload PDF Documents** - Go to the "Upload Documents" tab and upload your CV PDF file | |
| 4. **Ask Questions** - Switch to the "Ask Questions" tab and query your documents | |
| """ | |
| ) | |
| return demo | |
| # Launch the application | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True # Set to True to create a public link | |
| ) |