import os # ✅ Always use /tmp for Hugging Face cache in Spaces os.environ["HF_HOME"] = "/tmp/huggingface" os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers" os.environ["HF_HUB_CACHE"] = "/tmp/hfhub" # Create dirs (now in /tmp, which is writable) os.makedirs(os.environ["HF_HOME"], exist_ok=True) os.makedirs(os.environ["TRANSFORMERS_CACHE"], exist_ok=True) os.makedirs(os.environ["HF_HUB_CACHE"], exist_ok=True) import streamlit as st import json import re import fitz # PyMuPDF from langdetect import detect, DetectorFactory from googletrans import Translator from transformers import pipeline # for model serialization import joblib # for creating a folder import os # for hugging face space authentication to upload files from huggingface_hub import login, HfApi DetectorFactory.seed = 0 # Initialize Translator & Summarizer # ------------------------- # Note: Initializing models here will load them when the app starts. # Consider caching or lazy loading for performance in production. translator = Translator() summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") # ------------------------- # Extract text from PDF # ------------------------ def extract_text_from_pdf(pdf_file): text = "" # ✅ Save the uploaded file into /tmp (writable in Spaces/Docker) temp_file_path = os.path.join("/tmp", "temp.pdf") with open(temp_file_path, "wb") as f: f.write(pdf_file.getvalue()) try: with fitz.open(temp_file_path) as doc: for page in doc: text += page.get_text("text") finally: # Ensure the temporary file is removed if os.path.exists(temp_file_path): os.remove(temp_file_path) return text.strip() # Translate text to English using Google Translate # ------------------------- def translate_text_google(text): if not text: return "" max_chunk = 5000 # Google Translate handles large text but splitting is safer chunks = [text[i:i+max_chunk] for i in range(0, len(text), max_chunk)] translations = [] for chunk in chunks: translated = translator.translate(chunk, dest='en') translations.append(translated.text) return " ".join(translations) # Summarize text safely # ------------------------- def safe_summarize(text, max_length=150, min_length=30): if not text or len(text.split()) < 10: return text # too short to summarize try: summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) return summary[0]['summary_text'] except Exception as e: st.warning(f"⚠️ Summarization failed: {e}") return text # Extract entities # ------------------------- def extract_entities(text): entities = {} # PAN format: AAAAA9999A # pan_match = re.search(r"\b[A-Z]{5}\d{4}[A-Z]\b", text, re.IGNORECASE) pan_match = re.search(r"PAN (\w{5}\d{4}\w)", text) # Account Number # acc_match = re.search(r"account\s*number\s*[:\-]?\s*([A-Za-z0-9]+)", text, re.IGNORECASE) acc_match = re.search(r"Account Number (\w+)", text, re.IGNORECASE) # Penalty (accepts 'penalty' or 'penalties') # penalty_match = re.search(r"\bpenalt(?:y|ies)\s*[:\-]?\s*([\d,]+)", text, re.IGNORECASE) penalty_match = re.search(r"INR ([\d,]+)", text, re.IGNORECASE) # Deactivation keywords deactivate_match = re.search(r"\bdeactivat(?:e|ed|ion)\b", text, re.IGNORECASE) if pan_match: #entities["PAN"] = pan_match.group(0).upper() entities["PAN"] = pan_match.group(1) if pan_match else None if acc_match: entities["Account_Number"] = acc_match.group(1) if acc_match else None if penalty_match: #entities["Penalty"] = penalty_match.group(1).replace(",", "") entities["Penalty"] = penalty_match.group(1) if penalty_match else None if deactivate_match: entities["Deactivate"] = deactivate_match.group(0).lower() return entities # Trigger actions # ------------------------- def trigger_action(entities): if "Penalty" in entities: return f"Penalty of {entities['Penalty']} recorded for account {entities.get('Account_Number', 'N/A')} (PAN: {entities.get('PAN', 'N/A')})" elif "Deactivate" in entities: return f"Kindy Deactivate {entities.get('Account_Number', 'N/A')} as per request having (PAN: {entities.get('PAN', 'N/A')})" elif "Account_Number" in entities: return f"Account {entities['Account_Number']} flagged for review." else: return "No action required" # Process single PDF - adapted for Streamlit FileUploader # ------------------------- def process_uploaded_pdf(pdf_file): raw_text = extract_text_from_pdf(pdf_file) lang = detect(raw_text) translated_text = translate_text_google(raw_text) if lang != "en" else raw_text summary = safe_summarize(translated_text) entities = extract_entities(translated_text) action_result = trigger_action(entities) result = { "file_name": pdf_file.name, "detected_language": lang, "raw_text_snippet": raw_text[:500] + ("..." if len(raw_text) > 500 else ""), "translated_text_snippet": translated_text[:500] + ("..." if len(translated_text) > 500 else ""), "summary": summary, "entities": entities, "action_triggered": action_result } return result st.title("PDF Document Processor") st.write("Upload a PDF file to extract text, translate (if needed), summarize, identify key entities, and suggest actions.") uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") if uploaded_file is not None: st.write("Processing PDF...") try: processed_data = process_uploaded_pdf(uploaded_file) st.subheader("Processing Results:") st.write(f"**File Name:** {processed_data['file_name']}") st.write(f"**Detected Language:** {processed_data['detected_language']}") st.write(f"**Raw Text Snippet:** {processed_data['raw_text_snippet']}") st.write(f"**Translated Text Snippet:** {processed_data['translated_text_snippet']}") st.write(f"**Summary:** {processed_data['summary']}") st.write(f"**Extracted Entities:**") for key, value in processed_data['entities'].items(): st.write(f"- {key}: {value}") st.write(f"**Action Triggered:** {processed_data['action_triggered']}") except Exception as e: st.error(f"An error occurred during processing: {e}")