Spaces:
Build error
Build error
| import streamlit as st | |
| from PIL import Image, UnidentifiedImageError | |
| import io | |
| import zipfile | |
| from pypdf import PdfReader | |
| from pdf2image import convert_from_bytes | |
| import pytesseract | |
| import os | |
| # Set the Streamlit config directory explicitly as an environment variable *before* any Streamlit import or usage. | |
| os.environ["STREAMLIT_CONFIG_PATH"] = "/app/.streamlit" | |
| # Placeholder for TituLLMs integration | |
| def process_text_with_titullm(text): | |
| """Placeholder for TituLLM post-processing.""" | |
| return text | |
| def perform_ocr_and_postprocess(image, lang='ben'): | |
| """Performs OCR on a PIL Image and then post-processes the text.""" | |
| try: | |
| ocr_text = pytesseract.image_to_string(image, lang=lang) | |
| print(f"OCR Output:\n---\n{ocr_text}\n---\n") | |
| final_text = process_text_with_titullm(ocr_text) | |
| print(f"LLM Post-processed Output:\n---\n{final_text}\n---\n") | |
| return final_text | |
| except pytesseract.TesseractError as te: | |
| st.error( | |
| f"Tesseract Error: {te}. Ensure Tesseract and Bengali language pack are installed in your environment." | |
| ) | |
| return None | |
| except Exception as e: | |
| print(f"An error occurred during OCR/post-processing: {e}") | |
| st.error(f"An error occurred during OCR: {e}") | |
| return None | |
| def process_pdf_to_images(file_bytes): | |
| """Converts PDF file bytes into a list of PIL Images using pdf2image.""" | |
| images = [] | |
| try: | |
| pil_images = convert_from_bytes(file_bytes) | |
| for pil_image in pil_images: | |
| images.append(pil_image) | |
| except Exception as e: | |
| st.error(f"Error processing PDF with pdf2image: {e}") | |
| return images | |
| def process_zip_to_images(file_bytes): | |
| """Extracts images from a ZIP file's bytes into a list of PIL Images.""" | |
| images = [] | |
| supported_image_extensions = ('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff') | |
| try: | |
| with zipfile.ZipFile(io.BytesIO(file_bytes)) as zf: | |
| for member_name in zf.namelist(): | |
| if ( | |
| member_name.lower().endswith(supported_image_extensions) | |
| and not member_name.startswith('__MACOSX/') | |
| ): | |
| try: | |
| image_data = zf.read(member_name) | |
| img = Image.open(io.BytesIO(image_data)) | |
| images.append(img) | |
| except UnidentifiedImageError: | |
| st.warning( | |
| f"Could not identify or open image file '{member_name}' in the ZIP. Skipping." | |
| ) | |
| except Exception as e: | |
| st.warning( | |
| f"Error processing image '{member_name}' in ZIP: {e}. Skipping." | |
| ) | |
| except zipfile.BadZipFile: | |
| st.error("Invalid or corrupted ZIP file.") | |
| except Exception as e: | |
| st.error(f"Error processing ZIP file: {e}") | |
| return images | |
| def main(): | |
| """Main function to run the Streamlit app.""" | |
| st.set_page_config(layout="wide") | |
| st.title("Bengali OCR with LLM Post-processing (PDF & ZIP Support)") | |
| # Initialize session state variables | |
| if 'processed_images' not in st.session_state: | |
| st.session_state.processed_images = [] | |
| if 'current_page_index' not in st.session_state: | |
| st.session_state.current_page_index = 0 | |
| if 'file_id' not in st.session_state: # To track if a new file is uploaded | |
| st.session_state.file_id = None | |
| uploaded_file = st.file_uploader( | |
| "Upload a Bengali image, multi-page PDF, or a ZIP file with images", | |
| type=["png", "jpg", "jpeg", "pdf", "zip"], | |
| key="file_uploader_key", | |
| ) | |
| if uploaded_file is not None: | |
| # Check if a new file has been uploaded | |
| new_file_uploaded = False | |
| # Using uploaded_file.id is more reliable if available and unique per upload | |
| current_file_id = getattr(uploaded_file, 'id', uploaded_file.name + str(uploaded_file.size)) | |
| if st.session_state.file_id != current_file_id: | |
| st.session_state.file_id = current_file_id | |
| st.session_state.processed_images = [] # Reset images | |
| st.session_state.current_page_index = 0 # Reset page index | |
| new_file_uploaded = True | |
| print(f"New file uploaded: {uploaded_file.name}") | |
| if ( | |
| new_file_uploaded or not st.session_state.processed_images | |
| ): # Process only if it's a new file or images aren't processed yet | |
| with st.spinner(f"Processing {uploaded_file.name}..."): | |
| file_bytes = uploaded_file.getvalue() | |
| file_extension = os.path.splitext(uploaded_file.name)[1].lower() | |
| if file_extension == ".pdf": | |
| st.session_state.processed_images = process_pdf_to_images( | |
| file_bytes) | |
| elif file_extension == ".zip": | |
| st.session_state.processed_images = process_zip_to_images( | |
| file_bytes) | |
| else: # Single image file | |
| try: | |
| img = Image.open(io.BytesIO(file_bytes)) | |
| st.session_state.processed_images = [img] | |
| except UnidentifiedImageError: | |
| st.error( | |
| f"Could not identify or open the uploaded image file: {uploaded_file.name}" | |
| ) | |
| st.session_state.processed_images = [] | |
| except Exception as e: | |
| st.error(f"Error processing image file: {e}") | |
| st.session_state.processed_images = [] | |
| if not st.session_state.processed_images: | |
| st.warning( | |
| "No images were successfully processed from the uploaded file.") | |
| if st.session_state.processed_images: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Uploaded Image Viewer") | |
| num_images = len(st.session_state.processed_images) | |
| if num_images > 1: | |
| # Create page numbers starting from 1 for user display | |
| page_options = [f"Page {i + 1}" for i in range(num_images)] | |
| # The selectbox returns the string "Page X", we need to convert back to 0-based index | |
| selected_page_str = st.selectbox( | |
| "Select Page:", | |
| options=page_options, | |
| index=st.session_state.current_page_index, | |
| key="page_selector", | |
| ) | |
| # Update current_page_index based on selection | |
| st.session_state.current_page_index = page_options.index( | |
| selected_page_str) | |
| # Display the current image | |
| current_image_to_display = st.session_state.processed_images[ | |
| st.session_state.current_page_index | |
| ] | |
| st.image( | |
| current_image_to_display, | |
| use_container_width=True, | |
| caption=f"Displaying image {st.session_state.current_page_index + 1} of {num_images}", | |
| ) | |
| with col2: | |
| st.subheader("Extracted Text") | |
| # Perform OCR on the currently displayed image | |
| # Ensure current_image_to_display is valid before OCR | |
| if current_image_to_display: | |
| extracted_text = perform_ocr_and_postprocess( | |
| current_image_to_display, lang='ben') | |
| if extracted_text is not None: | |
| # Adjust text area height based on the current image's original height | |
| image_height_px = current_image_to_display.size[1] | |
| st.text_area( | |
| "OCR Result", | |
| value=extracted_text, | |
| height=image_height_px, | |
| key="extracted_text_area", | |
| ) | |
| else: | |
| st.info( | |
| "OCR could not be performed or returned no text for the current page.") | |
| else: | |
| st.info("No image selected or available for OCR.") | |
| elif uploaded_file and not st.session_state.processed_images: # If a new file was uploaded but resulted in no images | |
| st.warning("No images found or processed from the uploaded file.") | |
| if __name__ == "__main__": | |
| # --- Tesseract Path Configuration (Simplified for Spaces) --- | |
| # In Hugging Face Spaces with the Dockerfile, Tesseract should be in the system's PATH. | |
| try: | |
| pytesseract.get_tesseract_version() | |
| print("Tesseract found by Pytesseract.") | |
| except pytesseract.TesseractNotFoundError: | |
| st.error( | |
| "Pytesseract could not find Tesseract. Ensure it's installed correctly in the Space (check your Dockerfile)." | |
| ) | |
| print("Pytesseract could not find Tesseract.") | |
| # --- Streamlit App Execution --- | |
| main() | |