| import os |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" |
|
|
| import cv2 |
| import json |
| import tempfile |
| from PIL import Image |
|
|
| import gradio as gr |
| from ultralytics import YOLO |
| from openai import OpenAI |
| from docx import Document |
|
|
|
|
| |
| |
| |
|
|
| MODEL_PATHS = { |
| "Asphalt Pathologies Detection": "best_asphalt.pt", |
| "Concrete Pathologies Detection": "Concreate_defect_detection.pt", |
| "Facades Pathologies Detection": "Facades_defect_detection.pt" |
| } |
|
|
| models = { |
| name: YOLO(path) |
| for name, path in MODEL_PATHS.items() |
| } |
|
|
|
|
| |
| |
| |
|
|
| CATALOG_PATHS = { |
| "Asphalt Pathologies Detection": "knowledge/asphalt_catalog.json", |
| "Concrete Pathologies Detection": "knowledge/concrete_catalog.json", |
| "Facades Pathologies Detection": "knowledge/facades_catalog.json" |
| } |
|
|
|
|
| |
| |
| |
|
|
| XAI_API_KEY = os.getenv("XAI_API_KEY") |
|
|
| client = OpenAI( |
| api_key=XAI_API_KEY, |
| base_url="https://api.x.ai/v1" |
| ) if XAI_API_KEY else None |
|
|
|
|
| |
| |
| |
|
|
| def normalize_class_name(name): |
| name = str(name).upper().strip() |
| name = name.replace(".", "") |
| name = name.replace("-", " ") |
| name = name.replace("_", " ") |
| name = name.replace("/", " ") |
| name = " ".join(name.split()) |
| return name |
|
|
|
|
| def normalize_catalog_key(key): |
| return normalize_class_name(key) |
|
|
|
|
| |
| |
| |
|
|
| def get_report_title(): |
| return "Civil Infrastructure Pathology Inspection Report" |
|
|
|
|
| |
| |
| |
|
|
| def load_catalog(selected_model): |
| path = CATALOG_PATHS[selected_model] |
|
|
| if not os.path.exists(path): |
| return {} |
|
|
| with open(path, "r", encoding="utf-8") as f: |
| raw_catalog = json.load(f) |
|
|
| normalized_catalog = {} |
|
|
| for key, value in raw_catalog.items(): |
| normalized_key = normalize_catalog_key(key) |
| normalized_catalog[normalized_key] = value |
|
|
| return normalized_catalog |
|
|
|
|
| |
| |
| |
|
|
| def retrieve_knowledge_all(detections): |
| retrieved = [] |
|
|
| for d in detections: |
| selected_model = d["model"] |
| catalog = load_catalog(selected_model) |
|
|
| key = normalize_class_name(d["class"]) |
|
|
| if key in catalog: |
| item = catalog[key].copy() |
| item["source_model"] = selected_model |
| item["detected_class"] = d["class"] |
| item["confidence"] = d["confidence"] |
| item["bbox"] = d["bbox"] |
| retrieved.append(item) |
|
|
| else: |
| retrieved.append({ |
| "source_model": selected_model, |
| "detected_class": d["class"], |
| "confidence": d["confidence"], |
| "bbox": d["bbox"], |
| "name": d["class"], |
| "severity": "Not available", |
| "priority": "Not available", |
| "deadline": "Manual inspection required", |
| "risk": "Not available in RAG catalog", |
| "recommendation": "Manual civil engineering verification is required." |
| }) |
|
|
| return retrieved |
|
|
|
|
| |
| |
| |
|
|
| def get_detection_summary(results, selected_model): |
| detections = [] |
| model = models[selected_model] |
|
|
| for r in results: |
| if r.boxes is None: |
| continue |
|
|
| for box in r.boxes: |
| cls_id = int(box.cls[0]) |
| conf = float(box.conf[0]) |
| xyxy = box.xyxy[0].cpu().numpy().tolist() |
|
|
| raw_name = model.names[cls_id] |
| key = normalize_class_name(raw_name) |
|
|
| detections.append({ |
| "model": selected_model, |
| "model_file": MODEL_PATHS[selected_model], |
| "class": raw_name, |
| "key": key, |
| "confidence": round(conf, 3), |
| "bbox": [round(x, 2) for x in xyxy] |
| }) |
|
|
| return detections |
|
|
|
|
| def summarize_detections(detections): |
| if not detections: |
| return { |
| "total": 0, |
| "classes": {}, |
| "highest_confidence": None |
| } |
|
|
| classes = {} |
|
|
| for d in detections: |
| cls_key = d["key"] |
| classes[cls_key] = classes.get(cls_key, 0) + 1 |
|
|
| highest = max(detections, key=lambda x: x["confidence"]) |
|
|
| return { |
| "total": len(detections), |
| "classes": classes, |
| "highest_confidence": highest |
| } |
|
|
|
|
| |
| |
| |
|
|
| def basic_engineering_report(detections): |
| title = get_report_title() |
| summary = summarize_detections(detections) |
| retrieved_docs = retrieve_knowledge_all(detections) |
|
|
| if summary["total"] == 0: |
| return f""" |
| # {title} |
| |
| ## 1. Detection Result |
| No pathology was detected by any of the three YOLO models with the selected confidence threshold. |
| |
| ## 2. Engineering Interpretation |
| The inspected image/video does not show a clear defect detectable by the asphalt, concrete, or facade pathology models. This does not guarantee that the element is defect-free. |
| |
| ## 3. Recommended Action |
| - Repeat inspection with clearer images. |
| - Capture the surface perpendicular to the inspected element. |
| - Use manual visual inspection for confirmation. |
| - Reduce the confidence threshold if small or low-contrast defects are expected. |
| |
| ## 4. Disclaimer |
| This report is AI-assisted and must be verified by a qualified civil engineer. |
| """ |
|
|
| lines = [] |
|
|
| lines.append(f"# {title}") |
| lines.append("") |
| lines.append("## 1. AI Detection Summary") |
| lines.append("- Pipeline: All three YOLO models were applied automatically.") |
| lines.append(f"- Total detected defects: {summary['total']}") |
|
|
| model_counts = {} |
|
|
| for d in detections: |
| model_counts[d["model"]] = model_counts.get(d["model"], 0) + 1 |
|
|
| lines.append("") |
| lines.append("### Detection Count by Model") |
|
|
| for model_name, count in model_counts.items(): |
| lines.append(f"- {model_name}: {count} detected area(s)") |
|
|
| lines.append("") |
| lines.append("### Detection Count by Pathology Class") |
|
|
| for cls, count in summary["classes"].items(): |
| lines.append(f"- {cls.title()}: {count} detected area(s)") |
|
|
| lines.append("") |
| lines.append("## 2. RAG-Based Engineering Diagnosis") |
|
|
| grouped = {} |
|
|
| for item in retrieved_docs: |
| model_name = item.get("source_model", "Unknown model") |
| name = item.get("name", item.get("detected_class", "Unknown Pathology")) |
| group_key = f"{model_name} | {name}" |
| grouped.setdefault(group_key, []).append(item) |
|
|
| for group_key, items in grouped.items(): |
| model_name, name = group_key.split(" | ", 1) |
| info = items[0] |
| count = len(items) |
|
|
| lines.append("") |
| lines.append(f"### {name}") |
| lines.append(f"- Source model: {model_name}") |
| lines.append(f"- Detected quantity: {count}") |
| lines.append(f"- Severity: {info.get('severity', 'Not available')}") |
| lines.append(f"- Priority: {info.get('priority', 'Not available')}") |
| lines.append(f"- Recommended action period: {info.get('deadline', 'Not available')}") |
| lines.append(f"- Associated risk: {info.get('risk', 'Not available')}") |
| lines.append(f"- Recommended intervention: {info.get('recommendation', 'Manual inspection required')}") |
|
|
| if "causes" in info: |
| causes = info["causes"] |
|
|
| if isinstance(causes, list): |
| lines.append("- Probable causes: " + ", ".join(causes)) |
| else: |
| lines.append(f"- Probable causes: {causes}") |
|
|
| lines.append("") |
| lines.append("## 3. Field Verification Checklist") |
| lines.append("- Confirm all AI-detected pathologies by on-site inspection.") |
| lines.append("- Verify whether each detected defect belongs to asphalt, concrete, or facade pathology.") |
| lines.append("- Measure crack width, spalling depth, deformation, rut depth, delamination, moisture extent, or level difference where applicable.") |
| lines.append("- Record location, chainage/GPS coordinates, inspection date, and environmental condition.") |
| lines.append("- Capture georeferenced photographs before repair.") |
| lines.append("- Check drainage, water ingress, moisture source, corrosion signs, and loose material.") |
| lines.append("- Determine whether temporary safety measures such as warning signs, barriers, or cordoning are required.") |
|
|
| lines.append("") |
| lines.append("## 4. Professional Notes") |
| lines.append("- P1 defects require urgent safety management.") |
| lines.append("- P2 defects should be included in a structural maintenance or repair plan.") |
| lines.append("- P3 defects can generally be handled through routine preventive maintenance.") |
| lines.append("- Final decisions must be confirmed by a qualified civil/structural/building engineer.") |
|
|
| lines.append("") |
| lines.append("## 5. Disclaimer") |
| lines.append("This report is generated using AI-based image/video detection from three YOLO models and RAG-based engineering knowledge catalogs. It is intended for preliminary engineering support only and must not replace field inspection, testing, or professional engineering judgment.") |
|
|
| return "\n".join(lines) |
|
|
|
|
| |
| |
| |
|
|
| def grok_engineering_report(detections): |
| base_report = basic_engineering_report(detections) |
| retrieved_docs = retrieve_knowledge_all(detections) |
|
|
| if client is None: |
| return base_report + "\n\n⚠️ Grok API key not found. Add `XAI_API_KEY` in Hugging Face Space Secrets." |
|
|
| prompt = f""" |
| You are a professional civil engineering inspection expert. |
| |
| Generate a formal engineering inspection report using ONLY the provided RAG engineering knowledge. |
| |
| Inspection pipeline: |
| All three YOLO models were applied automatically. |
| |
| Models used: |
| {json.dumps(list(MODEL_PATHS.keys()), indent=2)} |
| |
| Detected YOLO defects: |
| {json.dumps(detections, indent=2)} |
| |
| Retrieved RAG engineering knowledge: |
| {json.dumps(retrieved_docs, indent=2)} |
| |
| Base local report: |
| {base_report} |
| |
| Use this structure: |
| 1. Project title |
| 2. AI detection summary |
| 3. Identified pathologies grouped by source model |
| 4. Probable causes |
| 5. Severity and priority |
| 6. Associated civil engineering risks |
| 7. Recommended repair method |
| 8. Recommended action timeframe |
| 9. Field verification checklist |
| 10. Disclaimer |
| |
| Important rules: |
| - Do not invent pathologies not detected by YOLO. |
| - Mention which model detected each pathology. |
| - Use the severity, priority, risk, deadline, and recommendation from the retrieved RAG knowledge. |
| - If information is missing, say that manual engineering verification is required. |
| - Keep the report professional, concise, and suitable for a civil engineering inspection document. |
| """ |
|
|
| try: |
| response = client.chat.completions.create( |
| model="grok-3-mini", |
| messages=[ |
| { |
| "role": "system", |
| "content": "You are a civil engineering pathology inspection expert." |
| }, |
| { |
| "role": "user", |
| "content": prompt |
| } |
| ], |
| temperature=0.2 |
| ) |
|
|
| return response.choices[0].message.content |
|
|
| except Exception as e: |
| return base_report + f"\n\n⚠️ Grok report generation failed. Error: {str(e)}" |
|
|
|
|
| |
| |
| |
|
|
| def create_word_report(report_text): |
| doc = Document() |
| doc.add_heading(get_report_title(), level=1) |
|
|
| for line in report_text.split("\n"): |
| line = line.strip() |
|
|
| if not line: |
| continue |
|
|
| if line.startswith("# "): |
| doc.add_heading(line.replace("# ", ""), level=1) |
|
|
| elif line.startswith("## "): |
| doc.add_heading(line.replace("## ", ""), level=2) |
|
|
| elif line.startswith("### "): |
| doc.add_heading(line.replace("### ", ""), level=3) |
|
|
| elif line.startswith("- "): |
| doc.add_paragraph(line.replace("- ", ""), style="List Bullet") |
|
|
| else: |
| doc.add_paragraph(line) |
|
|
| file_path = tempfile.NamedTemporaryFile( |
| suffix=".docx", |
| delete=False |
| ).name |
|
|
| doc.save(file_path) |
| return file_path |
|
|
|
|
| |
| |
| |
|
|
| def detect_image(image, conf): |
| if image is None: |
| return None, "Please upload an image.", "", None |
|
|
| all_detections = [] |
| annotated_image = image |
|
|
| for selected_model, model in models.items(): |
| results = model.predict( |
| source=annotated_image, |
| imgsz=640, |
| conf=conf, |
| save=False, |
| verbose=False |
| ) |
|
|
| detections = get_detection_summary(results, selected_model) |
| all_detections.extend(detections) |
|
|
| plotted = results[0].plot() |
| plotted = cv2.cvtColor(plotted, cv2.COLOR_BGR2RGB) |
| annotated_image = Image.fromarray(plotted) |
|
|
| retrieved_docs = retrieve_knowledge_all(all_detections) |
|
|
| report = grok_engineering_report(all_detections) |
| word_file = create_word_report(report) |
|
|
| output_json = { |
| "pipeline": "All three YOLO models were applied automatically", |
| "models_used": MODEL_PATHS, |
| "catalogs_used": CATALOG_PATHS, |
| "total_detections": len(all_detections), |
| "detections": all_detections, |
| "retrieved_rag_knowledge": retrieved_docs |
| } |
|
|
| return annotated_image, json.dumps(output_json, indent=2), report, word_file |
|
|
|
|
| |
| |
| |
|
|
| def detect_video(video_path, conf): |
| if video_path is None: |
| return None, "Please upload a video.", "", None |
|
|
| cap = cv2.VideoCapture(video_path) |
|
|
| if not cap.isOpened(): |
| return None, "Could not open video.", "", None |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
| if fps <= 0: |
| fps = 20 |
|
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| temp_output = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) |
| output_path = temp_output.name |
| temp_output.close() |
|
|
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
| all_detections = [] |
| frame_count = 0 |
| process_every_n_frames = 3 |
| last_annotated = None |
|
|
| while True: |
| ret, frame = cap.read() |
|
|
| if not ret: |
| break |
|
|
| if frame_count % process_every_n_frames == 0: |
| annotated = frame.copy() |
|
|
| for selected_model, model in models.items(): |
| results = model.predict( |
| source=annotated, |
| imgsz=640, |
| conf=conf, |
| save=False, |
| verbose=False |
| ) |
|
|
| detections = get_detection_summary(results, selected_model) |
| all_detections.extend(detections) |
|
|
| annotated = results[0].plot() |
|
|
| last_annotated = annotated |
|
|
| else: |
| annotated = last_annotated if last_annotated is not None else frame |
|
|
| writer.write(annotated) |
| frame_count += 1 |
|
|
| cap.release() |
| writer.release() |
|
|
| retrieved_docs = retrieve_knowledge_all(all_detections) |
|
|
| report = grok_engineering_report(all_detections) |
| word_file = create_word_report(report) |
|
|
| output_json = { |
| "pipeline": "All three YOLO models were applied automatically", |
| "models_used": MODEL_PATHS, |
| "catalogs_used": CATALOG_PATHS, |
| "total_detections": len(all_detections), |
| "detections_first_100": all_detections[:100], |
| "retrieved_rag_knowledge_first_100": retrieved_docs[:100] |
| } |
|
|
| return output_path, json.dumps(output_json, indent=2), report, word_file |
|
|
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="Civil Infrastructure Pathology Detection + RAG Engineering Report") as demo: |
|
|
| gr.Markdown(""" |
| # Civil Infrastructure Pathology Detection + RAG Engineering Report |
| |
| Upload an image or video. |
| |
| The system automatically runs all three YOLO models: |
| |
| - Asphalt pathologies detection |
| - Concrete pathologies detection |
| - Facades pathologies detection |
| |
| The detected defects are combined, RAG knowledge is retrieved from the related JSON catalogs, and one civil engineering inspection report is generated. |
| """) |
|
|
| conf = gr.Slider( |
| minimum=0.10, |
| maximum=0.90, |
| value=0.25, |
| step=0.05, |
| label="Confidence Threshold" |
| ) |
|
|
| with gr.Tab("Image Detection"): |
| image_input = gr.Image(type="pil", label="Upload Image") |
| image_output = gr.Image(type="pil", label="Detection Output") |
| image_json = gr.Textbox(label="Detection + RAG Data", lines=12) |
| image_report = gr.Markdown(label="Engineering Report") |
| image_docx = gr.File(label="Download Engineering Report as Word File") |
|
|
| image_btn = gr.Button("Run Image Detection Through All Models + Generate RAG Report") |
|
|
| image_btn.click( |
| fn=detect_image, |
| inputs=[image_input, conf], |
| outputs=[image_output, image_json, image_report, image_docx] |
| ) |
|
|
| with gr.Tab("Video Detection"): |
| video_input = gr.Video(label="Upload Video") |
| video_output = gr.Video(label="Detection Output Video") |
| video_json = gr.Textbox(label="Detection + RAG Data", lines=12) |
| video_report = gr.Markdown(label="Engineering Report") |
| video_docx = gr.File(label="Download Engineering Report as Word File") |
|
|
| video_btn = gr.Button("Run Video Detection Through All Models + Generate RAG Report") |
|
|
| video_btn.click( |
| fn=detect_video, |
| inputs=[video_input, conf], |
| outputs=[video_output, video_json, video_report, video_docx] |
| ) |
|
|
| gr.Markdown(""" |
| ## Notes |
| - The uploaded image/video is processed through all three models automatically. |
| - For video, every 3rd frame is processed to reduce runtime. |
| - Add `XAI_API_KEY` in Hugging Face Space Secrets to enable Grok-generated professional reporting. |
| - If no API key is added, the app still generates a local RAG-based engineering report. |
| - The report is AI-assisted and must be verified by a qualified civil engineer. |
| """) |
|
|
|
|
| if __name__ == "__main__": |
| demo.queue() |
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=7860, |
| ssr_mode=False |
| ) |