File size: 6,528 Bytes
cdfaada
0fa1c31
2709ca0
 
9a7ca89
 
 
 
 
 
 
 
 
 
 
0fa1c31
2709ca0
 
 
0fa1c31
 
2709ca0
 
 
0fa1c31
 
cdfaada
 
 
 
 
 
1ad3242
cdfaada
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import os

import os

# Use /tmp (always writable in Hugging Face Spaces & Docker)
os.environ["HF_HOME"] = "/tmp/huggingface"
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers"
os.environ["HF_HUB_CACHE"] = "/tmp/hfhub"

# Create directories if not exist
os.makedirs(os.environ["HF_HOME"], exist_ok=True)
os.makedirs(os.environ["TRANSFORMERS_CACHE"], exist_ok=True)
os.makedirs(os.environ["HF_HUB_CACHE"], exist_ok=True)


# Redirect Hugging Face cache to writable directory
# os.environ["HF_HOME"] = "/app/.cache/huggingface"
# os.environ["TRANSFORMERS_CACHE"] = "/app/.cache/transformers"
# os.environ["HF_HUB_CACHE"] = "/app/.cache/hub"

# Make sure the folders exist
# os.makedirs(os.environ["HF_HOME"], exist_ok=True)
# os.makedirs(os.environ["TRANSFORMERS_CACHE"], exist_ok=True)
# os.makedirs(os.environ["HF_HUB_CACHE"], exist_ok=True)

import streamlit as st
import json
import re
import fitz  # PyMuPDF
from langdetect import detect, DetectorFactory
from googletrans import Translator
from transformers import pipeline

# for model serialization
import joblib

# for creating a folder
import os

# for hugging face space authentication to upload files
from huggingface_hub import login, HfApi

DetectorFactory.seed = 0

# Initialize Translator & Summarizer
# -------------------------
# Note: Initializing models here will load them when the app starts.
# Consider caching or lazy loading for performance in production.
translator = Translator()
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")

# -------------------------
# Extract text from PDF
# -------------------------
def extract_text_from_pdf(pdf_file):
    text = ""
    # Save the uploaded file temporarily to process it with fitz
    temp_file_path = "temp.pdf"
    with open(temp_file_path, "wb") as f:
        f.write(pdf_file.getvalue())

    try:
        with fitz.open(temp_file_path) as doc:
            for page in doc:
                text += page.get_text("text")
    finally:
        # Ensure the temporary file is removed
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

    return text.strip()

# Translate text to English using Google Translate
# -------------------------
def translate_text_google(text):
    if not text:
        return ""

    max_chunk = 5000  # Google Translate handles large text but splitting is safer
    chunks = [text[i:i+max_chunk] for i in range(0, len(text), max_chunk)]
    translations = []
    for chunk in chunks:
        translated = translator.translate(chunk, dest='en')
        translations.append(translated.text)
    return " ".join(translations)

# Summarize text safely
# -------------------------
def safe_summarize(text, max_length=150, min_length=30):
    if not text or len(text.split()) < 10:
        return text  # too short to summarize
    try:
        summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False)
        return summary[0]['summary_text']
    except Exception as e:
        st.warning(f"⚠️ Summarization failed: {e}")
        return text

# Extract entities
# -------------------------
def extract_entities(text):
    entities = {}

    # PAN format: AAAAA9999A
    pan_match = re.search(r"\b[A-Z]{5}\d{4}[A-Z]\b", text, re.IGNORECASE)

    # Account Number
    acc_match = re.search(r"account\s*number\s*[:\-]?\s*([A-Za-z0-9]+)", text, re.IGNORECASE)

    # Penalty (accepts 'penalty' or 'penalties')
    penalty_match = re.search(r"\bpenalt(?:y|ies)\s*[:\-]?\s*([\d,]+)", text, re.IGNORECASE)

    # Deactivation keywords
    deactivate_match = re.search(r"\bdeactivat(?:e|ed|ion)\b", text, re.IGNORECASE)

    if pan_match:
        entities["PAN"] = pan_match.group(0).upper()
    if acc_match:
        entities["Account_Number"] = acc_match.group(1)
    if penalty_match:
        entities["Penalty"] = penalty_match.group(1).replace(",", "")
    if deactivate_match:
        entities["Deactivate"] = deactivate_match.group(0).lower()

    return entities

 # Trigger actions
# -------------------------
def trigger_action(entities):
    if "Penalty" in entities:
        return f"Penalty of {entities['Penalty']} recorded for account {entities.get('Account_Number', 'N/A')} (PAN: {entities.get('PAN', 'N/A')})"
    elif "Deactivate" in entities:
        return f"Kindy Deactivate {entities.get('Account_Number', 'N/A')} as per request having (PAN: {entities.get('PAN', 'N/A')})"
    elif "Account_Number" in entities:
        return f"Account {entities['Account_Number']} flagged for review."
    else:
        return "No action required"

# Process single PDF - adapted for Streamlit FileUploader
# -------------------------
def process_uploaded_pdf(pdf_file):
    raw_text = extract_text_from_pdf(pdf_file)
    lang = detect(raw_text)
    translated_text = translate_text_google(raw_text) if lang != "en" else raw_text
    summary = safe_summarize(translated_text)
    entities = extract_entities(translated_text)
    action_result = trigger_action(entities)

    result = {
        "file_name": pdf_file.name,
        "detected_language": lang,
        "raw_text_snippet": raw_text[:500] + ("..." if len(raw_text) > 500 else ""),
        "translated_text_snippet": translated_text[:500] + ("..." if len(translated_text) > 500 else ""),
        "summary": summary,
        "entities": entities,
        "action_triggered": action_result
    }

    return result


st.title("PDF Document Processor")
st.write("Upload a PDF file to extract text, translate (if needed), summarize, identify key entities, and suggest actions.")

uploaded_file = st.file_uploader("Choose a PDF file", type="pdf")

if uploaded_file is not None:
    st.write("Processing PDF...")
    try:
        processed_data = process_uploaded_pdf(uploaded_file)

        st.subheader("Processing Results:")
        st.write(f"**File Name:** {processed_data['file_name']}")
        st.write(f"**Detected Language:** {processed_data['detected_language']}")
        st.write(f"**Raw Text Snippet:** {processed_data['raw_text_snippet']}")
        st.write(f"**Translated Text Snippet:** {processed_data['translated_text_snippet']}")
        st.write(f"**Summary:** {processed_data['summary']}")
        st.write(f"**Extracted Entities:**")
        for key, value in processed_data['entities'].items():
            st.write(f"- {key}: {value}")
        st.write(f"**Action Triggered:** {processed_data['action_triggered']}")

    except Exception as e:
        st.error(f"An error occurred during processing: {e}")