| import gradio as gr |
| import os |
| import io |
| |
| |
| import PyPDF2 |
| import docx |
| import pandas as pd |
| from typing import List, Tuple, Optional |
| import requests |
|
|
| |
| import pytesseract |
| import cv2 |
| import numpy as np |
| import pdfplumber |
| from pdf2image import convert_from_path |
| from PIL import Image |
|
|
| |
| OPENROUTER_API_KEY = "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" |
|
|
|
|
| def get_openrouter_completion(messages, max_tokens=600, temperature=0.7): |
| """Get completion from OpenRouter API using Mistral model.""" |
| url = "https://openrouter.ai/api/v1/chat/completions" |
| headers = { |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", |
| "Content-Type": "application/json", |
| "HTTP-Referer": "http://localhost:7860", |
| "X-Title": "AI Chatbot" |
| } |
| |
| |
| formatted_messages = [] |
| for msg in messages: |
| if isinstance(msg, dict) and "role" in msg and "content" in msg: |
| content = str(msg["content"]).strip() |
| |
| if len(content) > 10000: |
| content = content[:10000] + "... [content truncated]" |
| formatted_messages.append({ |
| "role": msg["role"], |
| "content": content |
| }) |
| |
| json_data = { |
| "model": "mistralai/mistral-7b-instruct-v0.1", |
| "messages": formatted_messages, |
| "max_tokens": min(max_tokens, 800), |
| "temperature": max(0.1, min(temperature, 1.0)), |
| "top_p": 0.9, |
| "stream": False |
| } |
| |
| try: |
| print(f"π Making API request to OpenRouter...") |
| response = requests.post(url, headers=headers, json=json_data, timeout=30) |
| |
| print(f"π‘ Response status: {response.status_code}") |
| |
| if response.status_code == 400: |
| try: |
| error_details = response.json() |
| print(f"β 400 Error details: {error_details}") |
| error_msg = error_details.get('error', {}).get('message', 'Bad Request') |
| return f"API Error: {error_msg}. Please check the API key and request format." |
| except: |
| return "API Error: 400 Bad Request. Please check your API configuration." |
| |
| elif response.status_code == 401: |
| return "API Error: Invalid API key. Please check your OpenRouter API key." |
| |
| elif response.status_code == 429: |
| return "API Error: Rate limit exceeded. Please try again in a moment." |
| |
| elif response.status_code != 200: |
| return f"API Error {response.status_code}: {response.text[:200]}..." |
| |
| response_data = response.json() |
| |
| if "choices" in response_data and len(response_data["choices"]) > 0: |
| return response_data["choices"][0]["message"]["content"] |
| else: |
| return "No response generated from API" |
| |
| except requests.exceptions.Timeout: |
| return "β±οΈ Request timeout - please try again" |
| except requests.exceptions.ConnectionError: |
| return "π Connection error - check your internet connection" |
| except Exception as e: |
| print(f"β API Exception: {str(e)}") |
| return f"Unexpected error: {str(e)}" |
|
|
|
|
| def get_fallback_response(message, file_content=""): |
| """Provide a helpful fallback response when API is unavailable.""" |
| if file_content: |
| file_summary = f"I can see you've uploaded files with content. Here's a basic analysis:\n\n" |
| file_summary += f"Content length: {len(file_content)} characters\n" |
| |
| |
| if "price" in file_content.lower() or "$" in file_content: |
| file_summary += "β’ I notice pricing information in the uploaded content\n" |
| if "plan" in file_content.lower(): |
| file_summary += "β’ I see plan-related information\n" |
| if any(word in file_content.lower() for word in ["phone", "mobile", "data", "gb", "mb"]): |
| file_summary += "β’ This appears to contain telecommunications/mobile plan information\n" |
| |
| file_summary += f"\nYou asked: '{message}'\n\n" |
| file_summary += "I'm currently in fallback mode due to API issues, but I can see your file content has been processed successfully. For full AI analysis, please check the API configuration." |
| |
| return file_summary |
| else: |
| return f"I understand you said: '{message}'. I'm currently in fallback mode due to API connectivity issues. I can still process your files - try uploading a document and I'll extract its content for you." |
|
|
|
|
| def extract_text_from_image(image_path: str) -> str: |
| """Extract text from image using OCR (Tesseract).""" |
| try: |
| |
| if isinstance(image_path, str): |
| image = Image.open(image_path) |
| else: |
| image = image_path |
| |
| |
| img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
| gray = cv2.cvtColor(img_cv, cv2.COLOR_BGR2GRAY) |
| |
| |
| _, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY) |
| |
| |
| text = pytesseract.image_to_string(thresh) |
| |
| |
| if text.strip(): |
| cleaned_text = ' '.join(text.split()) |
| return cleaned_text if cleaned_text else "No meaningful text found in image" |
| else: |
| return "No text found in image" |
| |
| except Exception as e: |
| return f"Error extracting text from image: {str(e)}" |
|
|
|
|
| def extract_text_from_pdf_advanced(file_path: str) -> str: |
| """Extract text from PDF with fallback to OCR for image-based PDFs.""" |
| try: |
| |
| with pdfplumber.open(file_path) as pdf: |
| text_content = [] |
| |
| for page_num, page in enumerate(pdf.pages): |
| page_text = page.extract_text() |
| if page_text and page_text.strip(): |
| text_content.append(f"--- Page {page_num + 1} ---\n{page_text.strip()}") |
| |
| if text_content: |
| return "\n\n".join(text_content) |
| |
| |
| print("π No text found in PDF, trying OCR...") |
| images = convert_from_path(file_path) |
| ocr_text = [] |
| |
| for i, image in enumerate(images): |
| page_text = extract_text_from_image(image) |
| if page_text and not page_text.startswith("Error"): |
| ocr_text.append(f"--- Page {i + 1} (OCR) ---\n{page_text}") |
| |
| return "\n\n".join(ocr_text) if ocr_text else "No text could be extracted from this PDF" |
| |
| except Exception as e: |
| return f"Error processing PDF: {str(e)}" |
|
|
|
|
| def extract_text_from_file(file_path: str) -> str: |
| """Extract text from various file formats with advanced OCR capabilities.""" |
| if not file_path: |
| return "No file path provided" |
| |
| |
| if hasattr(file_path, 'name'): |
| actual_path = file_path.name |
| else: |
| actual_path = str(file_path) |
| |
| if not os.path.exists(actual_path): |
| return f"File not found: {actual_path}" |
| |
| file_extension = os.path.splitext(actual_path)[1].lower() |
| |
| try: |
| |
| if file_extension in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.gif']: |
| return extract_text_from_image(actual_path) |
| |
| |
| elif file_extension == '.pdf': |
| return extract_text_from_pdf_advanced(actual_path) |
| |
| |
| elif file_extension == '.docx': |
| try: |
| doc = docx.Document(actual_path) |
| text = "" |
| for paragraph in doc.paragraphs: |
| if paragraph.text.strip(): |
| text += paragraph.text + "\n" |
| return text if text.strip() else "No text found in this Word document." |
| except Exception as e: |
| return f"Error reading Word document: {str(e)}" |
| |
| |
| elif file_extension in ['.xlsx', '.xls']: |
| try: |
| |
| excel_file = pd.ExcelFile(actual_path) |
| all_text = "" |
| for sheet_name in excel_file.sheet_names: |
| df = pd.read_excel(actual_path, sheet_name=sheet_name) |
| all_text += f"--- Sheet: {sheet_name} ---\n" |
| all_text += df.to_string(index=False) + "\n\n" |
| return all_text if all_text.strip() else "No data found in this Excel file." |
| except Exception as e: |
| return f"Error reading Excel file: {str(e)}" |
| |
| |
| elif file_extension == '.csv': |
| try: |
| df = pd.read_csv(actual_path) |
| return df.to_string(index=False) |
| except Exception as e: |
| return f"Error reading CSV file: {str(e)}" |
| |
| |
| elif file_extension == '.txt': |
| try: |
| encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252'] |
| for encoding in encodings: |
| try: |
| with open(actual_path, 'r', encoding=encoding) as file: |
| return file.read() |
| except UnicodeDecodeError: |
| continue |
| return "Could not decode text file with any supported encoding." |
| except Exception as e: |
| return f"Error reading text file: {str(e)}" |
| |
| else: |
| |
| try: |
| encodings = ['utf-8', 'utf-16', 'latin-1', 'cp1252'] |
| for encoding in encodings: |
| try: |
| with open(actual_path, 'r', encoding=encoding) as file: |
| content = file.read() |
| return f"File read as text (encoding: {encoding}):\n{content}" |
| except UnicodeDecodeError: |
| continue |
| return f"Unsupported file format: {file_extension}. Try converting to PDF, image, or text format." |
| except Exception as e: |
| return f"Error reading file: {str(e)}" |
| |
| except Exception as e: |
| return f"Error processing file: {str(e)}" |
|
|
|
|
| def respond( |
| message: str, |
| history: List[Tuple[str, str]], |
| uploaded_files: Optional[List] = None, |
| system_message: str = "You are a helpful AI assistant.", |
| max_tokens: int = 512, |
| temperature: float = 0.7, |
| ): |
| """Generate response using the local model with file context.""" |
| |
| |
| file_content = "" |
| if uploaded_files: |
| for i, file in enumerate(uploaded_files): |
| try: |
| |
| if hasattr(file, 'name'): |
| file_path = file.name |
| file_name = os.path.basename(file_path) |
| elif isinstance(file, str): |
| file_path = file |
| file_name = os.path.basename(file_path) |
| else: |
| file_path = str(file) |
| file_name = f"file_{i+1}" |
| |
| content = extract_text_from_file(file_path) |
| |
| if content and not content.startswith("Error"): |
| file_content += f"\n\n--- Content from {file_name} ---\n{content}\n" |
| else: |
| file_content += f"\n\n--- Error processing {file_name} ---\n{content}\n" |
| |
| except Exception as e: |
| error_msg = f"Error processing file {i+1}: {str(e)}" |
| file_content += f"\n\n--- {error_msg} ---\n" |
| |
| |
| messages = [{"role": "system", "content": system_message}] |
| |
| |
| for user_msg, assistant_msg in history: |
| if user_msg: |
| messages.append({"role": "user", "content": user_msg}) |
| if assistant_msg: |
| messages.append({"role": "assistant", "content": assistant_msg}) |
| |
| |
| current_message = message |
| if file_content: |
| current_message = f"{message}\n\nAdditional context from uploaded files:{file_content}" |
| |
| messages.append({"role": "user", "content": current_message}) |
| |
| try: |
| |
| response = get_openrouter_completion( |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=temperature |
| ) |
| |
| |
| if response.startswith("API Error") or response.startswith("β") or response.startswith("β±οΈ") or response.startswith("π"): |
| print("π API failed, using fallback response...") |
| return get_fallback_response(message, file_content) |
| |
| return response if response else "Sorry, I couldn't generate a response." |
| |
| except Exception as e: |
| print(f"β Exception in respond function: {str(e)}") |
| return get_fallback_response(message, file_content) |
|
|
|
|
| """ |
| ChatGPT-like interface with file upload support using Mistral AI via OpenRouter API |
| """ |
|
|
| |
| with gr.Blocks(title="AI Chatbot with File Upload & Mistral AI", theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# π€ AI Chatbot with Advanced File Upload & OCR (Powered by Mistral AI)") |
| gr.Markdown("Upload files (PDF, DOCX, TXT, CSV, XLSX, Images) and chat with AI about their content! Uses Mistral AI for intelligent responses and includes OCR for images and scanned PDFs.") |
| |
| with gr.Row(): |
| with gr.Column(scale=3): |
| chatbot = gr.Chatbot( |
| height=500, |
| show_label=False, |
| avatar_images=["π€", "π€"] |
| ) |
| |
| with gr.Row(): |
| msg = gr.Textbox( |
| placeholder="Type your message here...", |
| show_label=False, |
| scale=4 |
| ) |
| send_btn = gr.Button("Send", variant="primary") |
| |
| file_upload = gr.Files( |
| label="Upload Files (PDF, DOCX, TXT, CSV, XLSX, Images: PNG, JPG, etc.)", |
| file_types=None, |
| file_count="multiple" |
| ) |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### Settings") |
| system_message = gr.Textbox( |
| value="You are a helpful AI assistant powered by Mistral AI. You can analyze uploaded files and answer questions about their content. Provide detailed, accurate, and helpful responses.", |
| label="System Message", |
| lines=3 |
| ) |
| max_tokens = gr.Slider( |
| minimum=50, |
| maximum=2048, |
| value=512, |
| step=50, |
| label="Max Tokens" |
| ) |
| temperature = gr.Slider( |
| minimum=0.1, |
| maximum=2.0, |
| value=0.7, |
| step=0.1, |
| label="Temperature" |
| ) |
| clear_btn = gr.Button("Clear Chat", variant="secondary") |
| |
| |
| def user_message(message, history, files): |
| if message.strip() == "": |
| return "", history, files |
| return "", history + [[message, None]], files |
| |
| def bot_response(history, files, system_msg, max_tok, temp): |
| if not history or history[-1][1] is not None: |
| return history |
| |
| user_message = history[-1][0] |
| bot_reply = respond(user_message, history[:-1], files, system_msg, max_tok, temp) |
| history[-1][1] = bot_reply |
| return history |
| |
| def clear_chat(): |
| return [], None |
| |
| |
| msg.submit( |
| user_message, |
| [msg, chatbot, file_upload], |
| [msg, chatbot, file_upload] |
| ).then( |
| bot_response, |
| [chatbot, file_upload, system_message, max_tokens, temperature], |
| chatbot |
| ) |
| |
| send_btn.click( |
| user_message, |
| [msg, chatbot, file_upload], |
| [msg, chatbot, file_upload] |
| ).then( |
| bot_response, |
| [chatbot, file_upload, system_message, max_tokens, temperature], |
| chatbot |
| ) |
| |
| clear_btn.click(clear_chat, outputs=[chatbot, file_upload]) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|