Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import mimetypes | |
| from transformers import pipeline | |
| import re | |
| from PIL import Image | |
| import pandas as pd | |
| import fitz # PyMuPDF | |
| from simple_salesforce import Salesforce | |
| from dotenv import load_dotenv | |
| import os | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime | |
| import html | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| SF_USERNAME = os.getenv("SF_USERNAME", "your_username") | |
| SF_PASSWORD = os.getenv("SF_PASSWORD", "your_password") | |
| SF_SECURITY_TOKEN = os.getenv("SF_SECURITY_TOKEN", "your_token") | |
| SF_INSTANCE_URL = os.getenv("SF_INSTANCE_URL", "https://smartauditchecklist-dev-ed.develop.my.salesforce.com") | |
| # Initialize Salesforce | |
| def init_salesforce(): | |
| try: | |
| sf = Salesforce( | |
| username=SF_USERNAME, | |
| password=SF_PASSWORD, | |
| security_token=SF_SECURITY_TOKEN, | |
| instance_url=SF_INSTANCE_URL | |
| ) | |
| logger.info("Connected to Salesforce") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"Salesforce Connection Error: {e}") | |
| return None | |
| def get_audit_result_fields(sf): | |
| try: | |
| return [field['name'] for field in sf.Audit_Result__c.describe()['fields']] | |
| except Exception as e: | |
| logger.error(f"Describe error: {e}") | |
| return [] | |
| def save_to_salesforce(sf, extracted_text, score, section_scores, issues, filename): | |
| if not sf: | |
| logger.warning("Salesforce not initialized, skipping save.") | |
| return | |
| try: | |
| available_fields = get_audit_result_fields(sf) | |
| record = {} | |
| if 'Extracted_Text__c' in available_fields: | |
| record['Extracted_Text__c'] = extracted_text[:131072] | |
| if 'Overall_Score__c' in available_fields: | |
| record['Overall_Score__c'] = str(score) | |
| if 'Section_Scores__c' in available_fields: | |
| record['Section_Scores__c'] = json.dumps(section_scores) | |
| if 'Issues__c' in available_fields: | |
| record['Issues__c'] = "\n".join(issues) if issues else "No issues" | |
| if 'Checklist_Name__c' in available_fields: | |
| record['Checklist_Name__c'] = filename | |
| if 'Upload_Date__c' in available_fields: | |
| record['Upload_Date__c'] = datetime.utcnow().isoformat() | |
| if record: | |
| sf.Audit_Result__c.create(record) | |
| logger.info("Saved to Salesforce") | |
| except Exception as e: | |
| logger.error(f"Save error: {e}") | |
| # Initialize OCR pipeline | |
| try: | |
| ocr = pipeline("image-to-text", model="microsoft/trocr-base-printed") | |
| logger.info("OCR pipeline initialized") | |
| except Exception as e: | |
| logger.error(f"OCR initialization failed: {e}") | |
| ocr = None | |
| def extract_text_from_pdf(file_path): | |
| start_time = time.time() | |
| try: | |
| pdf = fitz.open(file_path) | |
| if pdf.is_encrypted: | |
| return "Error: PDF is encrypted.", 0 | |
| text = "" | |
| for page in pdf: | |
| page_text = page.get_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| text = text.strip() | |
| elapsed_time = time.time() - start_time | |
| return text if text else "Error: No text found in PDF.", elapsed_time | |
| except Exception as e: | |
| return f"Error: {e}", time.time() - start_time | |
| def extract_text_from_excel(file_path): | |
| start_time = time.time() | |
| try: | |
| # Determine the file extension to select the appropriate engine | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension == '.xlsx': | |
| engine = 'openpyxl' | |
| elif file_extension == '.xls': | |
| engine = 'xlrd' | |
| else: | |
| return f"Error: Unsupported Excel file extension '{file_extension}'. Supported extensions are .xlsx and .xls.", time.time() - start_time | |
| # Read the Excel file with the specified engine | |
| df = pd.read_excel(file_path, engine=engine) | |
| text = df.to_string() | |
| elapsed_time = time.time() - start_time | |
| return text, elapsed_time | |
| except ImportError as e: | |
| return f"Error: Required engine not installed. Please install {engine} (e.g., 'pip install {engine}').", time.time() - start_time | |
| except Exception as e: | |
| return f"Error: {e}", time.time() - start_time | |
| def extract_text_from_image(image): | |
| start_time = time.time() | |
| try: | |
| if not ocr: | |
| return "Error: OCR pipeline not initialized.", 0 | |
| image = image.convert("RGB").resize((1024, 1024)) | |
| result = ocr(image) | |
| text = result[0]['generated_text'].strip() | |
| elapsed_time = time.time() - start_time | |
| return text, elapsed_time | |
| except Exception as e: | |
| return f"Error: {e}", time.time() - start_time | |
| def get_color_from_score(score): | |
| if score >= 85: | |
| return "green" | |
| elif score >= 60: | |
| return "orange" | |
| return "red" | |
| def get_flag_color(issue): | |
| if "Missing" in issue or "Error" in issue: | |
| return "red" | |
| return "orange" | |
| def sanitize_latex(text): | |
| """Sanitize text to prevent LaTeX compilation errors.""" | |
| if not text: | |
| return "" | |
| replacements = { | |
| r"&": r"\&", | |
| r"%": r"\%", | |
| r"$": r"\$", | |
| r"#": r"\#", | |
| r"_": r"\_", | |
| r"{": r"\{", | |
| r"}": r"\}", | |
| r"~": r"\textasciitilde{}", | |
| r"^": r"\textasciicircum{}", | |
| r"\\": r"\textbackslash{}", | |
| r"é": r"\'e", | |
| r"ffi": r"ffi" | |
| } | |
| text = text.encode('ascii', 'ignore').decode('ascii') | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| return text | |
| def find_na_context(text): | |
| """Find the context of 'N/A' or 'na' in the text.""" | |
| matches = re.finditer(r'\b(N/A|na)\b', text, re.IGNORECASE) | |
| for match in matches: | |
| start = max(0, match.start() - 20) | |
| end = min(len(text), match.end() + 20) | |
| context = text[start:end].replace('\n', ' ') | |
| return f"'{match.group()}' (in '{context}')" | |
| return "'N/A' or 'na' (exact location not identified)" | |
| def find_percentage_context(text): | |
| """Find the context of '0%' or '100%' in the text.""" | |
| matches = re.finditer(r'\b(0%|100%)', text) | |
| for match in matches: | |
| start = max(0, match.start() - 20) | |
| end = min(len(text), match.end() + 20) | |
| context = text[start:end].replace('\n', ' ') | |
| return f"'{match.group()}' (in '{context}')" | |
| return "'0%' or '100%' (exact location not identified)" | |
| def find_section_context(text, section): | |
| """Find the context of a section in the text.""" | |
| matches = re.finditer(section, text, re.IGNORECASE) | |
| for match in matches: | |
| start = max(0, match.start() - 20) | |
| end = min(len(text), match.end() + 20) | |
| context = text[start:end].replace('\n', ' ') | |
| return f"as '{match.group()}' (in '{context}')" | |
| return "but could not be located in the text" | |
| def extract_section_content(text, section, required_sections): | |
| """Extract the content of a section from the text until the next section or end.""" | |
| # Find the start of the section | |
| match = re.search(section, text, re.IGNORECASE) | |
| if not match: | |
| return "" | |
| start_idx = match.start() | |
| # Find the end of the section (next section header or end of text) | |
| end_idx = len(text) | |
| for next_section in required_sections: | |
| if next_section == section: | |
| continue | |
| next_match = re.search(next_section, text[start_idx + len(section):], re.IGNORECASE) | |
| if next_match: | |
| potential_end = start_idx + len(section) + next_match.start() | |
| end_idx = min(end_idx, potential_end) | |
| # Extract the section content | |
| section_content = text[start_idx:end_idx].strip() | |
| return section_content | |
| def generate_combined_download(extracted_text, score, section_scores, issues, filename): | |
| """Generate a combined text file with summary only.""" | |
| try: | |
| content = f"Audit Summary Report\n\nFilename: {filename}\nDate: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\nOverall Score: {score}%\n\nSection Scores:\n" | |
| for section, result in section_scores.items(): | |
| content += f" {section}: {result}\n" | |
| content += "\nIssues:\n" + ("\n".join(issues) if issues else "No issues detected") | |
| os.makedirs("output", exist_ok=True) | |
| combined_file = "output/combined_summary.txt" | |
| with open(combined_file, "w") as f: | |
| f.write(content) | |
| return combined_file | |
| except Exception as e: | |
| logger.error(f"Combined download generation error: {e}") | |
| return None | |
| def analyze_checklist(file): | |
| try: | |
| sf = init_salesforce() | |
| if not file: | |
| return "<div style='color:red;font-weight:bold;'>No file uploaded.</div>", None, False | |
| file_type, _ = mimetypes.guess_type(file.name) | |
| extracted_text, process_time = None, 0 | |
| filename = file.name.split("/")[-1] | |
| # Extract text based on file type | |
| if file_type == "application/pdf": | |
| extracted_text, process_time = extract_text_from_pdf(file.name) | |
| elif file_type in ["application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "application/vnd.ms-excel"]: | |
| extracted_text, process_time = extract_text_from_excel(file.name) | |
| elif file_type and file_type.startswith("image"): | |
| extracted_text, process_time = extract_text_from_image(Image.open(file.name)) | |
| else: | |
| return "<div style='color:red;font-weight:bold;'>Unsupported file type.</div>", None, False | |
| if extracted_text.startswith("Error"): | |
| return f"<div style='color:red;font-weight:bold;'>{extracted_text}</div>", None, False | |
| # Evaluate checklist | |
| issues, section_scores = [], {} | |
| score = 100 | |
| required_sections = ["safety", "inspection", "remarks", "equipment"] | |
| initial_section_scores = {} | |
| missing_sections = [] | |
| section_issues = {section.title(): [] for section in required_sections} # Track issues per section | |
| # Step 1: Check for required sections and extract their content | |
| section_contents = {} | |
| for section in required_sections: | |
| section_title = section.title() | |
| if section not in extracted_text.lower(): | |
| issues.append(f"Missing required section: {section_title}") | |
| initial_section_scores[section_title] = "Fail" | |
| missing_sections.append(section_title) | |
| score -= 15 | |
| section_contents[section_title] = "" | |
| else: | |
| initial_section_scores[section_title] = "Pass" | |
| # Extract content for this section | |
| section_contents[section_title] = extract_section_content(extracted_text, section, required_sections) | |
| section_scores[section_title] = initial_section_scores[section_title] | |
| # Step 2: Check for missing answers (N/A) per section and overall | |
| na_detected = False | |
| na_context = "" | |
| if "N/A" in extracted_text or "na" in extracted_text.lower(): | |
| issues.append("Missing answers detected (N/A found)") | |
| score -= 10 | |
| na_detected = True | |
| na_context = find_na_context(extracted_text) | |
| # Step 3: Check for unusual percentage values per section and overall | |
| percentage_detected = False | |
| percentage_context = "" | |
| if re.search(r"\b0%|\b100%", extracted_text): | |
| issues.append("Unusual percentage values detected") | |
| score -= 10 | |
| percentage_detected = True | |
| percentage_context = find_percentage_context(extracted_text) | |
| # Step 4: Adjust section scores based on issues within each section | |
| for section in required_sections: | |
| section_title = section.title() | |
| section_text = section_contents[section_title] | |
| if section_scores[section_title] == "Pass": # Only adjust sections that are initially "Pass" | |
| section_na_detected = False | |
| section_percentage_detected = False | |
| section_na_context = "" | |
| section_percentage_context = "" | |
| # Check for N/A in this section | |
| if "N/A" in section_text or "na" in section_text.lower(): | |
| section_na_detected = True | |
| section_na_context = find_na_context(section_text) | |
| section_issues[section_title].append(f"N/A found in section: {section_na_context}") | |
| # Check for unusual percentages in this section | |
| if re.search(r"\b0%|\b100%", section_text): | |
| section_percentage_detected = True | |
| section_percentage_context = find_percentage_context(section_text) | |
| section_issues[section_title].append(f"Unusual percentage in section: {section_percentage_context}") | |
| # Adjust to "Partial" if issues are found in this section | |
| if section_na_detected or section_percentage_detected: | |
| section_scores[section_title] = "Partial" | |
| score = max(0, score) | |
| logger.info(f"Processing time: {process_time:.2f} seconds") | |
| # Save to Salesforce | |
| save_to_salesforce(sf, extracted_text, score, section_scores, issues, filename) | |
| # Generate HTML output with dynamic explanation | |
| html = f""" | |
| <div style='font-family:Arial,sans-serif;padding:20px;max-width:800px;margin:auto;'> | |
| <h2 style='color:#2c3e50;'>📄 Audit Analysis: {sanitize_latex(filename)}</h2> | |
| <p><strong>Processing Time:</strong> {process_time:.2f} seconds</p> | |
| <h3 style='color:#34495e;'>📊 Overall Score</h3> | |
| <div style='font-size:24px;font-weight:bold;color:{get_color_from_score(score)};'>{score}%</div> | |
| <h3 style='color:#34495e;'>🧪 Section-wise Scorecard</h3> | |
| <div style='display:grid;grid-template-columns:repeat(auto-fit,minmax(150px,1fr));gap:10px;'> | |
| """ | |
| for section, result in section_scores.items(): | |
| color = "green" if result == "Pass" else "orange" if result == "Partial" else "red" | |
| html += f""" | |
| <div style='background:#f9f9f9;padding:10px;border:1px solid #ccc;border-radius:5px;text-align:center;'> | |
| <strong>{section}</strong><br> | |
| <span style='color:{color};font-weight:bold;'>{result}</span> | |
| </div> | |
| """ | |
| html += "</div>" | |
| if issues: | |
| html += """ | |
| <h3 style='color:#34495e;'>🚩 Issues Detected</h3> | |
| <table style='width:100%;border-collapse:collapse;'> | |
| <tr style='background:#ecf0f1;'> | |
| <th style='border:1px solid #ccc;padding:8px;'>Flag</th> | |
| <th style='border:1px solid #ccc;padding:8px;'>Reason</th> | |
| </tr> | |
| """ | |
| for issue in issues: | |
| flag_color = get_flag_color(issue) | |
| flag_label = "Critical" if flag_color == "red" else "Warning" | |
| html += f""" | |
| <tr> | |
| <td style='border:1px solid #ccc;padding:8px;color:{flag_color};font-weight:bold;'>{flag_label}</td> | |
| <td style='border:1px solid #ccc;padding:8px;'>{issue}</td> | |
| </tr> | |
| """ | |
| html += "</table>" | |
| else: | |
| html += "<h3 style='color:#27ae60;'>✅ No Issues Detected</h3>" | |
| # Add dynamic explanation section | |
| html += """ | |
| <h3 style='color:#34495e;'>📋 Analysis Explanation</h3> | |
| <div style='background:#f9f9f9;padding:15px;border:1px solid #ccc;border-radius:5px;'> | |
| <p><strong>Section Scores Explanation:</strong></p> | |
| <ul> | |
| """ | |
| for section in required_sections: | |
| section_title = section.title() | |
| initial_score = initial_section_scores[section_title] | |
| final_score = section_scores[section_title] | |
| section_context = find_section_context(extracted_text, section) | |
| explanation = f"<li><strong>{section_title}:</strong> The '{section}' section was " | |
| if initial_score == "Pass": | |
| explanation += f"found {section_context}, so it was initially marked as 'Pass'." | |
| else: | |
| explanation += "not found in the text, so it was initially marked as 'Fail'." | |
| if final_score == "Partial": | |
| reasons = section_issues[section_title] | |
| if reasons: | |
| explanation += f" However, it was downgraded to 'Partial' because {', and '.join(reasons)}." | |
| html += explanation + "</li>" | |
| html += """ | |
| </ul> | |
| <p><strong>Issues Explanation:</strong></p> | |
| <ul> | |
| """ | |
| if na_detected: | |
| html += f""" | |
| <li><strong>Critical - Missing answers detected (N/A found):</strong> The text contains {na_context}, | |
| which the application interprets as a potential missing answer. This is flagged as 'Critical' because | |
| missing answers can significantly impact the checklist's reliability.</li> | |
| """ | |
| if percentage_detected: | |
| html += f""" | |
| <li><strong>Warning - Unusual percentage values detected:</strong> The text contains {percentage_context}. | |
| The application flags '0%' or '100%' as unusual, as they may indicate oversimplified or incorrect responses. | |
| This is marked as a 'Warning' to prompt further review.</li> | |
| """ | |
| if not (na_detected or percentage_detected): | |
| html += "<li>No issues related to missing answers or unusual percentages were detected.</li>" | |
| # Explain missing sections | |
| if missing_sections: | |
| html += f""" | |
| <li><strong>Missing Sections:</strong> The following required sections were not found in the text: | |
| {', '.join(missing_sections)}. Each missing section deducts 15 points from the overall score.</li> | |
| """ | |
| html += """ | |
| </ul> | |
| <p><strong>Overall Score:</strong> Started at 100%. | |
| """ | |
| deductions = [] | |
| if missing_sections: | |
| deduction = len(missing_sections) * 15 | |
| deductions.append(f"Deducted {deduction} points for {len(missing_sections)} missing section(s) ({', '.join(missing_sections)})") | |
| if na_detected: | |
| deductions.append("Deducted 10 points for 'N/A' detected") | |
| if percentage_detected: | |
| deductions.append("Deducted 10 points for unusual percentage values") | |
| if deductions: | |
| html += ", ".join(deductions) + f", resulting in a final score of {score}%." | |
| else: | |
| html += f"No deductions were applied, resulting in a final score of {score}%." | |
| html += """ | |
| </p> | |
| </div> | |
| """ | |
| html += "</div>" | |
| # Generate combined download file | |
| combined_file = generate_combined_download(extracted_text, score, section_scores, issues, filename) | |
| if not combined_file: | |
| html += "<div style='color:red;font-weight:bold;'>Warning: Failed to generate combined summary file.</div>" | |
| return html, combined_file, True | |
| except Exception as e: | |
| logger.error(f"Error in analyze_checklist: {e}", exc_info=True) | |
| return f"<div style='color:red;font-weight:bold;'>Error processing checklist: {str(e)}</div>", None, False | |
| def clear_outputs(): | |
| return "", None, False, None | |
| with gr.Blocks(theme=gr.themes.Soft(), css=".gr-button-primary {background: #f28c38 !important; color: white !important; border: none !important;}") as app: | |
| gr.Markdown(""" | |
| # Smart Audit Checklist Evaluator | |
| Upload a checklist (PDF, Excel, or Image) to analyze for completeness, anomalies, and compliance. | |
| Results are scored, flagged. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| file_input = gr.File(label="Upload Checklist (PDF, Excel, Image)", file_types=[".pdf", ".xlsx", ".xls", ".png", ".jpg"]) | |
| with gr.Column(scale=1): | |
| submit_btn = gr.Button("Submit", variant="primary") | |
| clear_btn = gr.Button("Clear", variant="secondary") | |
| output_html = gr.HTML(label="Analysis Results") | |
| output_summary = gr.File(label="Download Summary", visible=False) | |
| show_download = gr.State(value=False) | |
| submit_btn.click( | |
| fn=analyze_checklist, | |
| inputs=file_input, | |
| outputs=[output_html, output_summary, show_download] | |
| ).then( | |
| fn=lambda show: gr.update(visible=show), | |
| inputs=show_download, | |
| outputs=output_summary | |
| ) | |
| clear_btn.click( | |
| fn=clear_outputs, | |
| inputs=None, | |
| outputs=[output_html, output_summary, show_download, file_input] | |
| ).then( | |
| fn=lambda: gr.update(visible=False), | |
| inputs=None, | |
| outputs=output_summary | |
| ) | |
| app.launch() |