Spaces:
Runtime error
Runtime error
| import os | |
| import cv2 | |
| import numpy as np | |
| from PIL import Image | |
| import pytesseract | |
| import gradio as gr | |
| from pdf2image import convert_from_path | |
| import PyPDF2 | |
| from llama_index.core import VectorStoreIndex, Document | |
| from llama_index.embeddings.openai import OpenAIEmbedding | |
| from llama_index.llms.openai import OpenAI | |
| from llama_index.core import get_response_synthesizer | |
| from sentence_transformers import SentenceTransformer, util | |
| import logging | |
| from openai_tts_tool import generate_audio_and_text | |
| import tempfile | |
| # Set up logging configuration | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s') | |
| # Initialize global variables | |
| vector_index = None | |
| query_log = [] | |
| sentence_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| # Define available languages for TTS | |
| AVAILABLE_LANGUAGES = [ | |
| "English", "Arabic", "German", "Marathi", "Kannada", | |
| "Filipino (Tagalog)", "French", "Gujarati", "Hindi", | |
| "Malayalam", "Tamil", "Telugu", "Urdu", "Sinhala" | |
| ] | |
| LANGUAGE_CODES = { | |
| "English": "en", "Arabic": "ar", "German": "de", | |
| "Marathi": "mr", "Kannada": "kn", "Filipino (Tagalog)": "tl", | |
| "French": "fr", "Gujarati": "gu", "Hindi": "hi", | |
| "Malayalam": "ml", "Tamil": "ta", "Telugu": "te", | |
| "Urdu": "ur", "Sinhala": "si" | |
| } | |
| # Get available languages for OCR | |
| try: | |
| langs = os.popen('tesseract --list-langs').read().split('\n')[1:-1] | |
| except: | |
| langs = ['eng'] # Fallback to English if tesseract isn't properly configured | |
| def create_temp_dir(): | |
| """Create temporary directory if it doesn't exist""" | |
| temp_dir = os.path.join(os.getcwd(), 'temp') | |
| if not os.path.exists(temp_dir): | |
| os.makedirs(temp_dir) | |
| return temp_dir | |
| def preprocess_image(image_path): | |
| """Preprocess the image for better OCR results""" | |
| img = cv2.imread(image_path) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| gray = cv2.equalizeHist(gray) | |
| gray = cv2.GaussianBlur(gray, (5, 5), 0) | |
| processed_image = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2) | |
| temp_dir = create_temp_dir() | |
| temp_filename = os.path.join(temp_dir, "processed_image.png") | |
| cv2.imwrite(temp_filename, processed_image) | |
| return temp_filename | |
| def extract_text_from_image(image_path, lang='eng'): | |
| """Extract text from image using OCR""" | |
| processed_image_path = preprocess_image(image_path) | |
| text = pytesseract.image_to_string(Image.open(processed_image_path), lang=lang) | |
| try: | |
| os.remove(processed_image_path) | |
| except: | |
| pass | |
| return text | |
| def extract_text_from_pdf(pdf_path, lang='eng'): | |
| """Extract text from PDF file""" | |
| text = "" | |
| temp_dir = create_temp_dir() | |
| try: | |
| with open(pdf_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page_num in range(len(pdf_reader.pages)): | |
| page = pdf_reader.pages[page_num] | |
| page_text = page.extract_text() | |
| if page_text.strip(): | |
| text += page_text | |
| else: | |
| images = convert_from_path(pdf_path, first_page=page_num + 1, last_page=page_num + 1) | |
| for image in images: | |
| temp_image_path = os.path.join(temp_dir, f'temp_image_{page_num}.png') | |
| image.save(temp_image_path, 'PNG') | |
| text += extract_text_from_image(temp_image_path, lang=lang) | |
| text += f"\n[OCR applied on page {page_num + 1}]\n" | |
| try: | |
| os.remove(temp_image_path) | |
| except: | |
| pass | |
| except Exception as e: | |
| return f"Error processing PDF: {str(e)}" | |
| return text | |
| def extract_text(file_path, lang='eng'): | |
| """Extract text from uploaded file""" | |
| file_ext = file_path.lower().split('.')[-1] | |
| if file_ext in ['pdf']: | |
| return extract_text_from_pdf(file_path, lang) | |
| elif file_ext in ['png', 'jpg', 'jpeg']: | |
| return extract_text_from_image(file_path, lang) | |
| else: | |
| return f"Unsupported file type: {file_ext}" | |
| def process_upload(api_key, files, lang): | |
| """Process uploaded files and create vector index""" | |
| global vector_index | |
| if not api_key: | |
| return "Please provide a valid OpenAI API Key." | |
| if not files: | |
| return "No files uploaded." | |
| documents = [] | |
| error_messages = [] | |
| image_heavy_docs = [] | |
| for file_path in files: | |
| try: | |
| text = extract_text(file_path, lang) | |
| if text.strip(): # Only add non-empty documents | |
| documents.append(Document(text=text)) | |
| else: | |
| error_messages.append(f"No text extracted from {os.path.basename(file_path)}") | |
| except Exception as e: | |
| error_message = f"Error processing file {os.path.basename(file_path)}: {str(e)}" | |
| logging.error(error_message) | |
| error_messages.append(error_message) | |
| if documents: | |
| try: | |
| embed_model = OpenAIEmbedding(model="text-embedding-3-large", api_key=api_key) | |
| vector_index = VectorStoreIndex.from_documents(documents, embed_model=embed_model) | |
| success_message = f"Successfully indexed {len(documents)} files." | |
| if error_messages: | |
| success_message += f"\nErrors: {'; '.join(error_messages)}" | |
| return success_message | |
| except Exception as e: | |
| return f"Error creating index: {str(e)}" | |
| else: | |
| return f"No valid documents were indexed. Errors: {'; '.join(error_messages)}" | |
| def query_app(query, model_name, use_similarity_check, api_key): | |
| """Process query and return response""" | |
| global vector_index, query_log | |
| if vector_index is None: | |
| return "No documents indexed yet. Please upload documents first." | |
| if not api_key: | |
| return "Please provide a valid OpenAI API Key." | |
| try: | |
| llm = OpenAI(model=model_name, api_key=api_key) | |
| response_synthesizer = get_response_synthesizer(llm=llm) | |
| query_engine = vector_index.as_query_engine(llm=llm, response_synthesizer=response_synthesizer) | |
| response = query_engine.query(query) | |
| return response.response | |
| except Exception as e: | |
| logging.error(f"Error during query processing: {e}") | |
| return f"Error during query processing: {str(e)}" | |
| def create_gradio_interface(): | |
| """Create and configure the Gradio interface""" | |
| with gr.Blocks(title="Document Processing and TTS App") as demo: | |
| gr.Markdown("# ๐ Document Processing, Text & Audio Generation App") | |
| with gr.Tab("๐ค Upload Documents"): | |
| api_key_input = gr.Textbox( | |
| label="Enter OpenAI API Key", | |
| placeholder="Paste your OpenAI API Key here", | |
| type="password" | |
| ) | |
| file_upload = gr.File(label="Upload Files", file_count="multiple", type="filepath") | |
| lang_dropdown = gr.Dropdown(choices=langs, label="Select OCR Language", value='eng') | |
| upload_button = gr.Button("Upload and Index") | |
| upload_status = gr.Textbox(label="Status", interactive=False) | |
| with gr.Tab("โ Ask a Question"): | |
| query_input = gr.Textbox(label="Enter your question") | |
| model_dropdown = gr.Dropdown( | |
| choices=["gpt-4o-mini", "gpt-4o", "gpt-4"], | |
| label="Select Model", | |
| value="gpt-4o-mini" | |
| ) | |
| similarity_checkbox = gr.Checkbox(label="Use Similarity Check", value=False) | |
| query_button = gr.Button("Ask") | |
| answer_output = gr.Textbox(label="Answer", interactive=False) | |
| with gr.Tab("๐ฃ๏ธ Generate Audio and Text"): | |
| text_input = gr.Textbox(label="Enter text for generation") | |
| voice_type = gr.Dropdown( | |
| choices=["alloy", "echo", "fable", "onyx", "nova", "shimmer"], | |
| label="Voice Type", | |
| value="alloy" | |
| ) | |
| voice_speed = gr.Slider( | |
| minimum=0.25, | |
| maximum=4.0, | |
| value=1.0, | |
| label="Voice Speed" | |
| ) | |
| language = gr.Dropdown( | |
| choices=AVAILABLE_LANGUAGES, | |
| label="Language for Audio and Script (Target Language)", | |
| value="English" | |
| ) | |
| translation_model = gr.Dropdown( | |
| choices=["gpt-4o-mini", "gpt-4o", "gpt-4"], | |
| label="Translation Model", | |
| value="gpt-4o-mini" | |
| ) | |
| output_option = gr.Radio( | |
| choices=["audio", "script_text", "both"], | |
| label="Output Option", | |
| value="both" | |
| ) | |
| generate_button = gr.Button("Generate") | |
| audio_output = gr.Audio(label="Generated Audio") | |
| script_output = gr.File(label="Script Text File") | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| # Wire up the components | |
| upload_button.click( | |
| fn=process_upload, | |
| inputs=[api_key_input, file_upload, lang_dropdown], | |
| outputs=[upload_status] | |
| ) | |
| query_button.click( | |
| fn=query_app, | |
| inputs=[query_input, model_dropdown, similarity_checkbox, api_key_input], | |
| outputs=[answer_output] | |
| ) | |
| answer_output.change( | |
| fn=lambda ans: ans, | |
| inputs=[answer_output], | |
| outputs=[text_input] | |
| ) | |
| def process_generation(*args): | |
| args = list(args) | |
| # Convert language name to code | |
| args[5] = LANGUAGE_CODES[args[5]] | |
| return generate_audio_and_text(*args) | |
| generate_button.click( | |
| fn=process_generation, | |
| inputs=[ | |
| api_key_input, text_input, translation_model, voice_type, | |
| voice_speed, language, output_option | |
| ], | |
| outputs=[audio_output, script_output, status_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface() | |
| demo.launch() |