|
|
|
|
|
import os |
|
|
|
|
|
|
|
|
os.environ["HF_HOME"] = "/tmp/huggingface" |
|
|
os.environ["TRANSFORMERS_CACHE"] = "/tmp/transformers" |
|
|
os.environ["HF_HUB_CACHE"] = "/tmp/hfhub" |
|
|
|
|
|
|
|
|
os.makedirs(os.environ["HF_HOME"], exist_ok=True) |
|
|
os.makedirs(os.environ["TRANSFORMERS_CACHE"], exist_ok=True) |
|
|
os.makedirs(os.environ["HF_HUB_CACHE"], exist_ok=True) |
|
|
|
|
|
import streamlit as st |
|
|
import json |
|
|
import re |
|
|
import fitz |
|
|
from langdetect import detect, DetectorFactory |
|
|
from googletrans import Translator |
|
|
from transformers import pipeline |
|
|
|
|
|
|
|
|
import joblib |
|
|
|
|
|
|
|
|
import os |
|
|
|
|
|
|
|
|
from huggingface_hub import login, HfApi |
|
|
|
|
|
DetectorFactory.seed = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
translator = Translator() |
|
|
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(pdf_file): |
|
|
text = "" |
|
|
|
|
|
temp_file_path = os.path.join("/tmp", "temp.pdf") |
|
|
with open(temp_file_path, "wb") as f: |
|
|
f.write(pdf_file.getvalue()) |
|
|
|
|
|
try: |
|
|
with fitz.open(temp_file_path) as doc: |
|
|
for page in doc: |
|
|
text += page.get_text("text") |
|
|
finally: |
|
|
|
|
|
if os.path.exists(temp_file_path): |
|
|
os.remove(temp_file_path) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def translate_text_google(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
max_chunk = 5000 |
|
|
chunks = [text[i:i+max_chunk] for i in range(0, len(text), max_chunk)] |
|
|
translations = [] |
|
|
for chunk in chunks: |
|
|
translated = translator.translate(chunk, dest='en') |
|
|
translations.append(translated.text) |
|
|
return " ".join(translations) |
|
|
|
|
|
|
|
|
|
|
|
def safe_summarize(text, max_length=150, min_length=30): |
|
|
if not text or len(text.split()) < 10: |
|
|
return text |
|
|
try: |
|
|
summary = summarizer(text, max_length=max_length, min_length=min_length, do_sample=False) |
|
|
return summary[0]['summary_text'] |
|
|
except Exception as e: |
|
|
st.warning(f"⚠️ Summarization failed: {e}") |
|
|
return text |
|
|
|
|
|
|
|
|
|
|
|
def extract_entities(text): |
|
|
entities = {} |
|
|
|
|
|
|
|
|
|
|
|
pan_match = re.search(r"PAN (\w{5}\d{4}\w)", text) |
|
|
|
|
|
|
|
|
|
|
|
acc_match = re.search(r"Account Number (\w+)", text, re.IGNORECASE) |
|
|
|
|
|
|
|
|
|
|
|
penalty_match = re.search(r"INR ([\d,]+)", text, re.IGNORECASE) |
|
|
|
|
|
|
|
|
deactivate_match = re.search(r"\bdeactivat(?:e|ed|ion)\b", text, re.IGNORECASE) |
|
|
|
|
|
if pan_match: |
|
|
|
|
|
entities["PAN"] = pan_match.group(1) if pan_match else None |
|
|
|
|
|
if acc_match: |
|
|
entities["Account_Number"] = acc_match.group(1) if acc_match else None |
|
|
|
|
|
if penalty_match: |
|
|
|
|
|
entities["Penalty"] = penalty_match.group(1) if penalty_match else None |
|
|
|
|
|
if deactivate_match: |
|
|
entities["Deactivate"] = deactivate_match.group(0).lower() |
|
|
|
|
|
return entities |
|
|
|
|
|
|
|
|
|
|
|
def trigger_action(entities): |
|
|
if "Penalty" in entities: |
|
|
return f"Penalty of {entities['Penalty']} recorded for account {entities.get('Account_Number', 'N/A')} (PAN: {entities.get('PAN', 'N/A')})" |
|
|
elif "Deactivate" in entities: |
|
|
return f"Kindy Deactivate {entities.get('Account_Number', 'N/A')} as per request having (PAN: {entities.get('PAN', 'N/A')})" |
|
|
elif "Account_Number" in entities: |
|
|
return f"Account {entities['Account_Number']} flagged for review." |
|
|
else: |
|
|
return "No action required" |
|
|
|
|
|
|
|
|
|
|
|
def process_uploaded_pdf(pdf_file): |
|
|
raw_text = extract_text_from_pdf(pdf_file) |
|
|
lang = detect(raw_text) |
|
|
translated_text = translate_text_google(raw_text) if lang != "en" else raw_text |
|
|
summary = safe_summarize(translated_text) |
|
|
entities = extract_entities(translated_text) |
|
|
action_result = trigger_action(entities) |
|
|
|
|
|
result = { |
|
|
"file_name": pdf_file.name, |
|
|
"detected_language": lang, |
|
|
"raw_text_snippet": raw_text[:500] + ("..." if len(raw_text) > 500 else ""), |
|
|
"translated_text_snippet": translated_text[:500] + ("..." if len(translated_text) > 500 else ""), |
|
|
"summary": summary, |
|
|
"entities": entities, |
|
|
"action_triggered": action_result |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
st.title("PDF Document Processor") |
|
|
st.write("Upload a PDF file to extract text, translate (if needed), summarize, identify key entities, and suggest actions.") |
|
|
|
|
|
uploaded_file = st.file_uploader("Choose a PDF file", type="pdf") |
|
|
|
|
|
if uploaded_file is not None: |
|
|
st.write("Processing PDF...") |
|
|
try: |
|
|
processed_data = process_uploaded_pdf(uploaded_file) |
|
|
|
|
|
st.subheader("Processing Results:") |
|
|
st.write(f"**File Name:** {processed_data['file_name']}") |
|
|
st.write(f"**Detected Language:** {processed_data['detected_language']}") |
|
|
st.write(f"**Raw Text Snippet:** {processed_data['raw_text_snippet']}") |
|
|
st.write(f"**Translated Text Snippet:** {processed_data['translated_text_snippet']}") |
|
|
st.write(f"**Summary:** {processed_data['summary']}") |
|
|
st.write(f"**Extracted Entities:**") |
|
|
for key, value in processed_data['entities'].items(): |
|
|
st.write(f"- {key}: {value}") |
|
|
st.write(f"**Action Triggered:** {processed_data['action_triggered']}") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"An error occurred during processing: {e}") |
|
|
|