RathodHarish's picture
Update app.py
a1a1107 verified
import gradio as gr
import mimetypes
from transformers import pipeline
import re
from PIL import Image
import pandas as pd
import fitz # PyMuPDF
from simple_salesforce import Salesforce
from dotenv import load_dotenv
import os
import json
import logging
import time
from datetime import datetime
import html
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
SF_USERNAME = os.getenv("SF_USERNAME", "your_username")
SF_PASSWORD = os.getenv("SF_PASSWORD", "your_password")
SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "your_token")
SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://smartauditchecklist-dev-ed.develop.my.salesforce.com")
# Initialize Salesforce
def init_salesforce():
try:
sf = Salesforce(
username=SF_USERNAME,
password=SF_PASSWORD,
security_token=SF_SECURITY_TOKEN,
instance_url=SF_INSTANCE_URL
)
logger.info("Connected to Salesforce")
return sf
except Exception as e:
logger.error(f"Salesforce Connection Error: {e}")
return None
def get_audit_result_fields(sf):
try:
return [field['name'] for field in sf.Audit_Result__c.describe()['fields']]
except Exception as e:
logger.error(f"Describe error: {e}")
return []
def save_to_salesforce(sf, extracted_text, score, section_scores, issues, filename):
if not sf:
logger.warning("Salesforce not initialized, skipping save.")
return
try:
available_fields = get_audit_result_fields(sf)
record = {}
if 'Extracted_Text__c' in available_fields:
record['Extracted_Text__c'] = extracted_text[:131072]
if 'Overall_Score__c' in available_fields:
record['Overall_Score__c'] = str(score)
if 'Section_Scores__c' in available_fields:
record['Section_Scores__c'] = json.dumps(section_scores)
if 'Issues__c' in available_fields:
record['Issues__c'] = "\n".join(issues) if issues else "No issues"
if 'Checklist_Name__c' in available_fields:
record['Checklist_Name__c'] = filename
if 'Upload_Date__c' in available_fields:
record['Upload_Date__c'] = datetime.utcnow().isoformat()
if record:
sf.Audit_Result__c.create(record)
logger.info("Saved to Salesforce")
except Exception as e:
logger.error(f"Save error: {e}")
# Initialize OCR pipeline
try:
ocr = pipeline("image-to-text", model="microsoft/trocr-base-printed")
logger.info("OCR pipeline initialized")
except Exception as e:
logger.error(f"OCR initialization failed: {e}")
ocr = None
def extract_text_from_pdf(file_path):
start_time = time.time()
try:
pdf = fitz.open(file_path)
if pdf.is_encrypted:
return "Error: PDF is encrypted.", 0
text = ""
for page in pdf:
page_text = page.get_text()
if page_text:
text += page_text + "\n"
text = text.strip()
elapsed_time = time.time() - start_time
return text if text else "Error: No text found in PDF.", elapsed_time
except Exception as e:
return f"Error: {e}", time.time() - start_time
def extract_text_from_excel(file_path):
start_time = time.time()
try:
# Determine the file extension to select the appropriate engine
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.xlsx':
engine = 'openpyxl'
elif file_extension == '.xls':
engine = 'xlrd'
else:
return f"Error: Unsupported Excel file extension '{file_extension}'. Supported extensions are .xlsx and .xls.", time.time() - start_time
# Read the Excel file with the specified engine
df = pd.read_excel(file_path, engine=engine)
text = df.to_string()
elapsed_time = time.time() - start_time
return text, elapsed_time
except ImportError as e:
return f"Error: Required engine not installed. Please install {engine} (e.g., 'pip install {engine}').", time.time() - start_time
except Exception as e:
return f"Error: {e}", time.time() - start_time
def extract_text_from_image(image):
start_time = time.time()
try:
if not ocr:
return "Error: OCR pipeline not initialized.", 0
image = image.convert("RGB").resize((1024, 1024))
result = ocr(image)
text = result[0]['generated_text'].strip()
elapsed_time = time.time() - start_time
return text, elapsed_time
except Exception as e:
return f"Error: {e}", time.time() - start_time
def get_color_from_score(score):
if score >= 85:
return "green"
elif score >= 60:
return "orange"
return "red"
def get_flag_color(issue):
if "Missing" in issue or "Error" in issue:
return "red"
return "orange"
def sanitize_latex(text):
"""Sanitize text to prevent LaTeX compilation errors."""
if not text:
return ""
replacements = {
r"&": r"\&",
r"%": r"\%",
r"$": r"\$",
r"#": r"\#",
r"_": r"\_",
r"{": r"\{",
r"}": r"\}",
r"~": r"\textasciitilde{}",
r"^": r"\textasciicircum{}",
r"\\": r"\textbackslash{}",
r"é": r"\'e",
r"ffi": r"ffi"
}
text = text.encode('ascii', 'ignore').decode('ascii')
for old, new in replacements.items():
text = text.replace(old, new)
return text
def find_na_context(text):
"""Find the context of 'N/A' or 'na' in the text."""
matches = re.finditer(r'\b(N/A|na)\b', text, re.IGNORECASE)
for match in matches:
start = max(0, match.start() - 20)
end = min(len(text), match.end() + 20)
context = text[start:end].replace('\n', ' ')
return f"'{match.group()}' (in '{context}')"
return "'N/A' or 'na' (exact location not identified)"
def find_percentage_context(text):
"""Find the context of '0%' or '100%' in the text."""
matches = re.finditer(r'\b(0%|100%)', text)
for match in matches:
start = max(0, match.start() - 20)
end = min(len(text), match.end() + 20)
context = text[start:end].replace('\n', ' ')
return f"'{match.group()}' (in '{context}')"
return "'0%' or '100%' (exact location not identified)"
def find_section_context(text, section):
"""Find the context of a section in the text."""
matches = re.finditer(section, text, re.IGNORECASE)
for match in matches:
start = max(0, match.start() - 20)
end = min(len(text), match.end() + 20)
context = text[start:end].replace('\n', ' ')
return f"as '{match.group()}' (in '{context}')"
return "but could not be located in the text"
def extract_section_content(text, section, required_sections):
"""Extract the content of a section from the text until the next section or end."""
# Find the start of the section
match = re.search(section, text, re.IGNORECASE)
if not match:
return ""
start_idx = match.start()
# Find the end of the section (next section header or end of text)
end_idx = len(text)
for next_section in required_sections:
if next_section == section:
continue
next_match = re.search(next_section, text[start_idx + len(section):], re.IGNORECASE)
if next_match:
potential_end = start_idx + len(section) + next_match.start()
end_idx = min(end_idx, potential_end)
# Extract the section content
section_content = text[start_idx:end_idx].strip()
return section_content
def generate_combined_download(extracted_text, score, section_scores, issues, filename):
"""Generate a combined text file with summary only."""
try:
content = f"Audit Summary Report\n\nFilename: {filename}\nDate: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\nOverall Score: {score}%\n\nSection Scores:\n"
for section, result in section_scores.items():
content += f" {section}: {result}\n"
content += "\nIssues:\n" + ("\n".join(issues) if issues else "No issues detected")
os.makedirs("output", exist_ok=True)
combined_file = "output/combined_summary.txt"
with open(combined_file, "w") as f:
f.write(content)
return combined_file
except Exception as e:
logger.error(f"Combined download generation error: {e}")
return None
def analyze_checklist(file):
try:
sf = init_salesforce()
if not file:
return "<div style='color:red;font-weight:bold;'>No file uploaded.</div>", None, False
file_type, _ = mimetypes.guess_type(file.name)
extracted_text, process_time = None, 0
filename = file.name.split("/")[-1]
# Extract text based on file type
if file_type == "application/pdf":
extracted_text, process_time = extract_text_from_pdf(file.name)
elif file_type in ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel"]:
extracted_text, process_time = extract_text_from_excel(file.name)
elif file_type and file_type.startswith("image"):
extracted_text, process_time = extract_text_from_image(Image.open(file.name))
else:
return "<div style='color:red;font-weight:bold;'>Unsupported file type.</div>", None, False
if extracted_text.startswith("Error"):
return f"<div style='color:red;font-weight:bold;'>{extracted_text}</div>", None, False
# Evaluate checklist
issues, section_scores = [], {}
score = 100
required_sections = ["safety", "inspection", "remarks", "equipment"]
initial_section_scores = {}
missing_sections = []
section_issues = {section.title(): [] for section in required_sections} # Track issues per section
# Step 1: Check for required sections and extract their content
section_contents = {}
for section in required_sections:
section_title = section.title()
if section not in extracted_text.lower():
issues.append(f"Missing required section: {section_title}")
initial_section_scores[section_title] = "Fail"
missing_sections.append(section_title)
score -= 15
section_contents[section_title] = ""
else:
initial_section_scores[section_title] = "Pass"
# Extract content for this section
section_contents[section_title] = extract_section_content(extracted_text, section, required_sections)
section_scores[section_title] = initial_section_scores[section_title]
# Step 2: Check for missing answers (N/A) per section and overall
na_detected = False
na_context = ""
if "N/A" in extracted_text or "na" in extracted_text.lower():
issues.append("Missing answers detected (N/A found)")
score -= 10
na_detected = True
na_context = find_na_context(extracted_text)
# Step 3: Check for unusual percentage values per section and overall
percentage_detected = False
percentage_context = ""
if re.search(r"\b0%|\b100%", extracted_text):
issues.append("Unusual percentage values detected")
score -= 10
percentage_detected = True
percentage_context = find_percentage_context(extracted_text)
# Step 4: Adjust section scores based on issues within each section
for section in required_sections:
section_title = section.title()
section_text = section_contents[section_title]
if section_scores[section_title] == "Pass": # Only adjust sections that are initially "Pass"
section_na_detected = False
section_percentage_detected = False
section_na_context = ""
section_percentage_context = ""
# Check for N/A in this section
if "N/A" in section_text or "na" in section_text.lower():
section_na_detected = True
section_na_context = find_na_context(section_text)
section_issues[section_title].append(f"N/A found in section: {section_na_context}")
# Check for unusual percentages in this section
if re.search(r"\b0%|\b100%", section_text):
section_percentage_detected = True
section_percentage_context = find_percentage_context(section_text)
section_issues[section_title].append(f"Unusual percentage in section: {section_percentage_context}")
# Adjust to "Partial" if issues are found in this section
if section_na_detected or section_percentage_detected:
section_scores[section_title] = "Partial"
score = max(0, score)
logger.info(f"Processing time: {process_time:.2f} seconds")
# Save to Salesforce
save_to_salesforce(sf, extracted_text, score, section_scores, issues, filename)
# Generate HTML output with dynamic explanation
html = f"""
<div style='font-family:Arial,sans-serif;padding:20px;max-width:800px;margin:auto;'>
<h2 style='color:#2c3e50;'>📄 Audit Analysis: {sanitize_latex(filename)}</h2>
<p><strong>Processing Time:</strong> {process_time:.2f} seconds</p>
<h3 style='color:#34495e;'>📊 Overall Score</h3>
<div style='font-size:24px;font-weight:bold;color:{get_color_from_score(score)};'>{score}%</div>
<h3 style='color:#34495e;'>🧪 Section-wise Scorecard</h3>
<div style='display:grid;grid-template-columns:repeat(auto-fit,minmax(150px,1fr));gap:10px;'>
"""
for section, result in section_scores.items():
color = "green" if result == "Pass" else "orange" if result == "Partial" else "red"
html += f"""
<div style='background:#f9f9f9;padding:10px;border:1px solid #ccc;border-radius:5px;text-align:center;'>
<strong>{section}</strong><br>
<span style='color:{color};font-weight:bold;'>{result}</span>
</div>
"""
html += "</div>"
if issues:
html += """
<h3 style='color:#34495e;'>🚩 Issues Detected</h3>
<table style='width:100%;border-collapse:collapse;'>
<tr style='background:#ecf0f1;'>
<th style='border:1px solid #ccc;padding:8px;'>Flag</th>
<th style='border:1px solid #ccc;padding:8px;'>Reason</th>
</tr>
"""
for issue in issues:
flag_color = get_flag_color(issue)
flag_label = "Critical" if flag_color == "red" else "Warning"
html += f"""
<tr>
<td style='border:1px solid #ccc;padding:8px;color:{flag_color};font-weight:bold;'>{flag_label}</td>
<td style='border:1px solid #ccc;padding:8px;'>{issue}</td>
</tr>
"""
html += "</table>"
else:
html += "<h3 style='color:#27ae60;'>✅ No Issues Detected</h3>"
# Add dynamic explanation section
html += """
<h3 style='color:#34495e;'>📋 Analysis Explanation</h3>
<div style='background:#f9f9f9;padding:15px;border:1px solid #ccc;border-radius:5px;'>
<p><strong>Section Scores Explanation:</strong></p>
<ul>
"""
for section in required_sections:
section_title = section.title()
initial_score = initial_section_scores[section_title]
final_score = section_scores[section_title]
section_context = find_section_context(extracted_text, section)
explanation = f"<li><strong>{section_title}:</strong> The '{section}' section was "
if initial_score == "Pass":
explanation += f"found {section_context}, so it was initially marked as 'Pass'."
else:
explanation += "not found in the text, so it was initially marked as 'Fail'."
if final_score == "Partial":
reasons = section_issues[section_title]
if reasons:
explanation += f" However, it was downgraded to 'Partial' because {', and '.join(reasons)}."
html += explanation + "</li>"
html += """
</ul>
<p><strong>Issues Explanation:</strong></p>
<ul>
"""
if na_detected:
html += f"""
<li><strong>Critical - Missing answers detected (N/A found):</strong> The text contains {na_context},
which the application interprets as a potential missing answer. This is flagged as 'Critical' because
missing answers can significantly impact the checklist's reliability.</li>
"""
if percentage_detected:
html += f"""
<li><strong>Warning - Unusual percentage values detected:</strong> The text contains {percentage_context}.
The application flags '0%' or '100%' as unusual, as they may indicate oversimplified or incorrect responses.
This is marked as a 'Warning' to prompt further review.</li>
"""
if not (na_detected or percentage_detected):
html += "<li>No issues related to missing answers or unusual percentages were detected.</li>"
# Explain missing sections
if missing_sections:
html += f"""
<li><strong>Missing Sections:</strong> The following required sections were not found in the text:
{', '.join(missing_sections)}. Each missing section deducts 15 points from the overall score.</li>
"""
html += """
</ul>
<p><strong>Overall Score:</strong> Started at 100%.
"""
deductions = []
if missing_sections:
deduction = len(missing_sections) * 15
deductions.append(f"Deducted {deduction} points for {len(missing_sections)} missing section(s) ({', '.join(missing_sections)})")
if na_detected:
deductions.append("Deducted 10 points for 'N/A' detected")
if percentage_detected:
deductions.append("Deducted 10 points for unusual percentage values")
if deductions:
html += ", ".join(deductions) + f", resulting in a final score of {score}%."
else:
html += f"No deductions were applied, resulting in a final score of {score}%."
html += """
</p>
</div>
"""
html += "</div>"
# Generate combined download file
combined_file = generate_combined_download(extracted_text, score, section_scores, issues, filename)
if not combined_file:
html += "<div style='color:red;font-weight:bold;'>Warning: Failed to generate combined summary file.</div>"
return html, combined_file, True
except Exception as e:
logger.error(f"Error in analyze_checklist: {e}", exc_info=True)
return f"<div style='color:red;font-weight:bold;'>Error processing checklist: {str(e)}</div>", None, False
def clear_outputs():
return "", None, False, None
with gr.Blocks(theme=gr.themes.Soft(), css=".gr-button-primary {background: #f28c38 !important; color: white !important; border: none !important;}") as app:
gr.Markdown("""
# Smart Audit Checklist Evaluator
Upload a checklist (PDF, Excel, or Image) to analyze for completeness, anomalies, and compliance.
Results are scored, flagged.
""")
with gr.Row():
with gr.Column(scale=2):
file_input = gr.File(label="Upload Checklist (PDF, Excel, Image)", file_types=[".pdf", ".xlsx", ".xls", ".png", ".jpg"])
with gr.Column(scale=1):
submit_btn = gr.Button("Submit", variant="primary")
clear_btn = gr.Button("Clear", variant="secondary")
output_html = gr.HTML(label="Analysis Results")
output_summary = gr.File(label="Download Summary", visible=False)
show_download = gr.State(value=False)
submit_btn.click(
fn=analyze_checklist,
inputs=file_input,
outputs=[output_html, output_summary, show_download]
).then(
fn=lambda show: gr.update(visible=show),
inputs=show_download,
outputs=output_summary
)
clear_btn.click(
fn=clear_outputs,
inputs=None,
outputs=[output_html, output_summary, show_download, file_input]
).then(
fn=lambda: gr.update(visible=False),
inputs=None,
outputs=output_summary
)
app.launch()