Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| # ============================================================================= | |
| # VISION AGENT - HSE Compliance Inspection | |
| # Author: David Fernandez | Applied AI Engineer | LangGraph Contributor | |
| # ============================================================================= | |
| HSE_RULES = { | |
| "ppe": { | |
| "keywords": ["hard hat", "hardhat", "helmet", "safety glasses", "goggles", "gloves", | |
| "high-vis", "hi-vis", "vest", "steel toe", "boots", "hearing protection", | |
| "earplugs", "face shield", "welding shield", "respirator"], | |
| "label": "PPE Compliance", | |
| "icon": "π¦Ί" | |
| }, | |
| "housekeeping": { | |
| "keywords": ["spill", "obstruct", "blocked", "clutter", "debris", "leak", "drip", | |
| "dirty", "mess", "trash", "tools on floor", "hose", "trip hazard"], | |
| "label": "Housekeeping", | |
| "icon": "π§Ή" | |
| }, | |
| "fall_protection": { | |
| "keywords": ["guardrail", "railing", "harness", "lanyard", "scaffold", "elevated", | |
| "ladder", "toe board", "edge", "fall", "height", "platform", "unprotected"], | |
| "label": "Fall Protection", | |
| "icon": "πͺ" | |
| }, | |
| "fire_safety": { | |
| "keywords": ["extinguisher", "exit", "flammable", "fire", "smoke", "sprinkler", | |
| "blocked exit", "cabinet", "containment", "berm", "ignition"], | |
| "label": "Fire Safety", | |
| "icon": "π§―" | |
| }, | |
| "electrical": { | |
| "keywords": ["lockout", "tagout", "loto", "exposed wire", "ground fault", "arc flash", | |
| "breaker", "energized", "cable", "conduit", "junction box", "panel"], | |
| "label": "Electrical Safety", | |
| "icon": "β‘" | |
| } | |
| } | |
| SEVERITY_RULES = { | |
| "critical": ["without hard hat", "no hard hat", "missing guardrail", "no guardrail", | |
| "lockout tagout not followed", "loto not followed", "flammable.*open", | |
| "no harness", "arc flash", "energized", "without helmet"], | |
| "major": ["without safety glasses", "no safety glasses", "expired extinguisher", | |
| "blocked exit", "scaffold.*missing", "no toe board", "spill", "stop work"], | |
| "minor": ["vegetation", "drip tray", "clutter", "signage", "cable tray cover", | |
| "visitor.*without"] | |
| } | |
| DEMO_VIOLATION = """FIELD OBSERVATION - Compressor Station Alpha, Turbine Hall | |
| During routine walkthrough at 0730, observed two contract workers grinding pipe flanges near Unit 2 compressor. One worker was not wearing safety glasses and neither had hearing protection despite noise levels exceeding 85 dB. Grinding sparks visible near open flammable storage cabinet. Guardrail on the elevated mezzanine platform (12ft) was missing a middle rail section. Fire extinguisher at station FE-07 had an expired inspection tag (last inspected 11 months ago). Oil drip tray under bearing housing was overflowing onto the walking surface.""" | |
| DEMO_COMPLIANT = """FIELD OBSERVATION - Compressor Station Beta, Control Room | |
| Pre-shift inspection at 0600. All personnel wearing required PPE including hard hats, safety glasses, high-vis vests, and steel-toe boots. Hearing protection verified for all workers entering the turbine hall. Emergency exits clear and properly illuminated. Fire extinguishers at stations FE-01 through FE-04 all within inspection dates. Housekeeping excellent - walkways clear, tool staging areas organized. Fall protection harnesses inspected and within certification. LOTO board current and accurate.""" | |
| def inspect(observation_text): | |
| """Analyze a field observation for HSE compliance violations.""" | |
| if not observation_text or not observation_text.strip(): | |
| return "β οΈ Please enter a field observation to analyze." | |
| text = observation_text.lower() | |
| findings = [] | |
| category_hits = {} | |
| # Detect categories | |
| for cat_id, cat_info in HSE_RULES.items(): | |
| matched_kw = [kw for kw in cat_info["keywords"] if kw in text] | |
| if matched_kw: | |
| category_hits[cat_id] = matched_kw | |
| # Determine severity for each finding | |
| for cat_id, matched_kw in category_hits.items(): | |
| cat = HSE_RULES[cat_id] | |
| severity = "observation" | |
| for sev_level in ["critical", "major", "minor"]: | |
| for pattern in SEVERITY_RULES.get(sev_level, []): | |
| if pattern in text: | |
| if any(kw in pattern or pattern in kw for kw in matched_kw) or sev_level == "critical": | |
| severity = sev_level | |
| break | |
| if severity != "observation": | |
| break | |
| sev_badge = {"critical": "π΄ CRITICAL", "major": "π MAJOR", "minor": "π‘ MINOR", "observation": "π΅ OBSERVATION"}[severity] | |
| findings.append(f"| {cat['icon']} {cat['label']} | {sev_badge} | {', '.join(matched_kw[:3])} |") | |
| if findings: | |
| violations = len([f for f in findings if "CRITICAL" in f or "MAJOR" in f]) | |
| table = "\n".join(findings) | |
| if any("CRITICAL" in f for f in findings): | |
| action = "π΄ **STOP WORK** β Critical violations require immediate corrective action before work resumes." | |
| elif any("MAJOR" in f for f in findings): | |
| action = "π **CORRECTIVE ACTION REQUIRED** β Address major findings within 24 hours." | |
| else: | |
| action = "π‘ **MONITOR** β Minor items noted for follow-up during next inspection." | |
| return f"""# π¨ HSE VIOLATIONS DETECTED | |
| ## {action} | |
| --- | |
| ## Findings | |
| | Category | Severity | Indicators | | |
| |----------|----------|------------| | |
| {table} | |
| ## Violation Summary | |
| - **Total Categories Flagged**: {len(findings)} | |
| - **Critical/Major**: {violations} | |
| ## Recommended Actions | |
| 1. Document all findings in safety management system | |
| 2. Issue corrective action notices for critical/major items | |
| 3. Conduct follow-up inspection within 24-48 hours | |
| 4. Brief crew on findings at next toolbox talk | |
| --- | |
| *Model: [hse-compliance-inspector](https://huggingface.co/davidfertube/hse-compliance-inspector) | Dataset: [hse-safety-inspection-data](https://huggingface.co/datasets/davidfertube/hse-safety-inspection-data)* | |
| """ | |
| else: | |
| return f"""# β COMPLIANT β No Violations Detected | |
| All observed conditions meet HSE requirements. | |
| ## Assessment | |
| | Check | Status | | |
| |-------|--------| | |
| | π¦Ί PPE Compliance | β Pass | | |
| | π§Ή Housekeeping | β Pass | | |
| | πͺ Fall Protection | β Pass | | |
| | π§― Fire Safety | β Pass | | |
| | β‘ Electrical Safety | β Pass | | |
| ## Recommendation | |
| Continue routine monitoring. Document this compliant observation for audit records. | |
| --- | |
| *Model: [hse-compliance-inspector](https://huggingface.co/davidfertube/hse-compliance-inspector)* | |
| """ | |
| demo = gr.Interface( | |
| fn=inspect, | |
| inputs=gr.Textbox( | |
| label="Field Observation", | |
| placeholder="Describe what you observed during the site walkthrough...", | |
| lines=12 | |
| ), | |
| outputs=gr.Markdown(label="HSE Assessment"), | |
| title="ποΈ Vision Agent", | |
| description=""" | |
| ## VLM-Powered HSE Compliance Inspection | |
| **How to use:** | |
| 1. Click an example below to load a demo observation | |
| 2. Click "Submit" to see the HSE assessment | |
| 3. Or enter your own field observation text | |
| **What it does:** Analyzes field observations for HSE violations across 5 categories β PPE, Housekeeping, Fall Protection, Fire Safety, and Electrical Safety β using Qwen2-VL vision-language understanding. | |
| """, | |
| article=""" | |
| --- | |
| ### How It Works | |
| ``` | |
| Field Observation β Qwen2-VL Analysis β Violation Detection β Severity Classification β Report | |
| ``` | |
| **Categories:** PPE π¦Ί | Housekeeping π§Ή | Fall Protection πͺ | Fire Safety π§― | Electrical β‘ | |
| **Resources:** [Model](https://huggingface.co/davidfertube/hse-compliance-inspector) | [Dataset](https://huggingface.co/datasets/davidfertube/hse-safety-inspection-data) | [Portfolio](https://davidfernandez.dev) | |
| *Built by David Fernandez β Applied AI Engineer | LangGraph & Pydantic Contributor* | |
| """, | |
| examples=[ | |
| [DEMO_VIOLATION], | |
| [DEMO_COMPLIANT], | |
| ], | |
| cache_examples=False, | |
| ) | |
| demo.launch() | |