# ontology_eval.py from __future__ import annotations from dataclasses import dataclass, field from enum import Enum, IntEnum from typing import Dict, List, Optional, Tuple from datetime import datetime # ============================================================================ # ONTOLOGIE-ANBINDUNG (an die in deiner Grafik gezeigten Klassen/Properties) # -------------------------------------------------------------------------- # Klassen (Auszug): ex:Person, ex:Gleis, ex:Bahnsteig, ex:Zug, ex:Gefahr, # ex:Videoüberwachung, ex:Sensor, ex:Alarmsystem, ex:Maßnahme # Objekt-Properties: ex:befindetSichIn, ex:erkennt, ex:stehtAuf, ex:löstAus, # ex:überwacht, ex:beobachtet, ex:meldet, ex:führtZu # Daten-Properties : ex:hatKonfidenz (xsd:float), ex:hatZeitstempel (xsd:dateTime), # ex:hatPosition (xsd:string), ex:hatBeschreibung (xsd:string) # ============================================================================ EX = "ex:" # einfacher Prefix (du kannst z.B. "http://example.org/rail#" verwenden) class Severity(IntEnum): NONE = 0 LOW = 1 MEDIUM = 2 HIGH = 3 CRITICAL = 4 class HazardLabel(str, Enum): PERSON_ON_TRACK = "PersonOnTrack" # ex:Person befindetSichIn ex:Gleis NEAR_EDGE_TRAIN = "NearEdgeWithTrain" # ex:Person stehtAuf ex:BahnsteigKante ∧ ex:Zug in Szene FALLEN_PERSON = "FallenPersonNearTrack" # ex:Person liegt/gestürzt nahe Gleis OBJECT_ON_TRACK = "ObjectOnTrack" # ex:Objekt befindetSichIn ex:Gleis SMOKE_FIRE = "SmokeOrFire" # ex:Rauch/Feuer als Gefahr CROWD_OVERFLOW = "CrowdOverflowOnTrack" # ex:Menschenmenge im Gleisbereich @dataclass class Observation: """Beobachtungen/Signale für eine Szene (alle Werte ∈ [0,1] sind Konfidenzen).""" # Kontext-Geometrie distance_to_edge_m: Optional[float] = None train_approaching: float = 0.0 # Detektor-Konfidenzen on_track_person: float = 0.0 fallen_person: float = 0.0 object_on_track: float = 0.0 smoke_or_fire: float = 0.0 crowd_on_track: float = 0.0 # Bias (Recall-Priorisierung) class_threshold_recall_bias: float = 0.35 # Zusatzinfos notes: Dict[str, float] = field(default_factory=dict) @dataclass class HazardDecision: severity: Severity score_0_100: int labels: List[HazardLabel] explanations: List[str] fired_rules: List[str] # --------------------------- REGELWERK --------------------------------------- def _passes(p: float, thr: float) -> bool: return p >= thr def evaluate(ob: Observation) -> HazardDecision: """Regelbasierte Bewertung mit erklärbarer Ausgabe (Ontologie-gedacht).""" thr = ob.class_threshold_recall_bias labels: List[HazardLabel] = [] explains: List[str] = [] fired: List[str] = [] score_terms: List[Tuple[Severity, float]] = [] # R1 — ex:Person ex:befindetSichIn ex:Gleis → Kritisch if _passes(ob.on_track_person, thr): labels.append(HazardLabel.PERSON_ON_TRACK) fired.append("R1_befindetSichIn_Gleis") explains.append(f"R1: Person im Gleis erkannt (p={ob.on_track_person:.2f}).") score_terms.append((Severity.CRITICAL, 0.85 + 0.15 * ob.on_track_person)) # R2 — Nahe Kante + Zug → Hoch if (ob.distance_to_edge_m is not None and ob.distance_to_edge_m <= 0.5) and _passes(ob.train_approaching, thr): labels.append(HazardLabel.NEAR_EDGE_TRAIN) fired.append("R2_stehtAuf_Bahnsteigkante_und_Zug") explains.append( f"R2: ≤0.5 m zur Kante (d={ob.distance_to_edge_m:.2f} m) & Zug (p={ob.train_approaching:.2f})." ) score_terms.append((Severity.HIGH, 0.75 + 0.25 * ob.train_approaching)) # R3 — Gestürzte Person nahe Kante/auf Gleis → Hoch/Kritisch if _passes(ob.fallen_person, thr): if (ob.distance_to_edge_m is not None and ob.distance_to_edge_m <= 1.0) or _passes(ob.on_track_person, thr): labels.append(HazardLabel.FALLEN_PERSON) fired.append("R3_fallenPerson_in_Gefahrenzone") explains.append(f"R3: Gestürzte Person (p={ob.fallen_person:.2f}).") sev = Severity.CRITICAL if _passes(ob.on_track_person, thr) else Severity.HIGH base = 0.80 if sev is Severity.CRITICAL else 0.70 score_terms.append((sev, base + 0.20 * ob.fallen_person)) # R4 — Objekt im Gleis → Mittel if _passes(ob.object_on_track, thr): labels.append(HazardLabel.OBJECT_ON_TRACK) fired.append("R4_Objekt_im_Gleis") explains.append(f"R4: Objekt im Gleis (p={ob.object_on_track:.2f}).") score_terms.append((Severity.MEDIUM, 0.60 + 0.30 * ob.object_on_track)) # R5 — Rauch/Feuer → Hoch if _passes(ob.smoke_or_fire, thr): labels.append(HazardLabel.SMOKE_FIRE) fired.append("R5_Rauch_oder_Feuer") explains.append(f"R5: Rauch/Feuer (p={ob.smoke_or_fire:.2f}).") score_terms.append((Severity.HIGH, 0.70 + 0.25 * ob.smoke_or_fire)) # R6 — Menschenmenge im Gleis → Kritisch if _passes(ob.crowd_on_track, thr): labels.append(HazardLabel.CROWD_OVERFLOW) fired.append("R6_Menschenmenge_im_Gleisbereich") explains.append(f"R6: Crowd im Gleis (p={ob.crowd_on_track:.2f}).") score_terms.append((Severity.CRITICAL, 0.80 + 0.20 * ob.crowd_on_track)) if not score_terms: return HazardDecision( severity=Severity.NONE, score_0_100=0, labels=[], explanations=["Keine Gefahrenrelation erfüllt."], fired_rules=[] ) sev_weights = {Severity.NONE:0.0, Severity.LOW:0.25, Severity.MEDIUM:0.55, Severity.HIGH:0.80, Severity.CRITICAL:1.0} best = max(score_terms, key=lambda t: sev_weights[t[0]] * t[1]) best_sev, best_p = best final_score = int(round(100 * sev_weights[best_sev] * best_p)) labels = list(dict.fromkeys(labels)) return HazardDecision(best_sev, final_score, labels, explains, fired) # --------------------------- RDF/TRIPLES ------------------------------------- @dataclass class OntologyContext: """IDs/Metadaten für Tripel (du kannst echte IRIs verwenden).""" person_id: str = "person1" sensor_id: str = "sensor1" video_system_id: str = "videoSys1" track_id: str = "gleis1" platform_id: str = "bahnsteig1" alarm_id: str = "alarm1" measure_id: str = "massnahme1" event_id: str = "event1" timestamp: datetime = field(default_factory=datetime.utcnow) position: Optional[str] = None # z.B. "x=123,y=45,cam=2" def _lit(value: str, dtype: str) -> str: # Turtle-ähnlicher Literal-Renderer return f"\"{value}\"^^{dtype}" def decision_to_triples(dec: HazardDecision, ob: Observation, ctx: OntologyContext) -> List[Tuple[str,str,str]]: """ Erzeugt RDF-ähnliche Tripel basierend auf der Ontologie aus deiner Grafik. Nur stdlib; Ausgabe als einfache (s, p, o)-Tupel (Turtle-artig). """ triples: List[Tuple[str,str,str]] = [] # Typisierungen (rdf:type) triples += [ (EX+ctx.person_id, "rdf:type", EX+"Person"), (EX+ctx.sensor_id, "rdf:type", EX+"Sensor"), (EX+ctx.video_system_id, "rdf:type", EX+"Videoüberwachung"), (EX+ctx.track_id, "rdf:type", EX+"Gleis"), (EX+ctx.platform_id, "rdf:type", EX+"Bahnsteig"), (EX+ctx.alarm_id, "rdf:type", EX+"Alarmsystem"), (EX+ctx.measure_id, "rdf:type", EX+"Maßnahme"), (EX+ctx.event_id, "rdf:type", EX+"Ereignis"), (EX+"gef1", "rdf:type", EX+"Gefahr"), ] # Überwachung/Beobachtungskette triples += [ (EX+ctx.video_system_id, EX+"überwacht", EX+ctx.platform_id), (EX+ctx.sensor_id, EX+"beobachtet", EX+ctx.platform_id), (EX+ctx.sensor_id, EX+"erkennt", EX+ctx.person_id), ] # Daten-Properties triples.append((EX+ctx.event_id, EX+"hatZeitstempel", _lit(ctx.timestamp.isoformat(), "xsd:dateTime"))) if ctx.position: triples.append((EX+ctx.person_id, EX+"hatPosition", _lit(ctx.position, "xsd:string"))) # Konfidenzen (nur wenn gesetzt) def add_conf(name: str, val: float): triples.append((EX+name, EX+"hatKonfidenz", _lit(f"{val:.3f}", "xsd:float"))) if ob.on_track_person: add_conf(ctx.person_id, ob.on_track_person) if ob.object_on_track: triples.append((EX+"obj1", "rdf:type", EX+"Objekt")) or add_conf("obj1", ob.object_on_track) if ob.smoke_or_fire: triples.append((EX+"smk1", "rdf:type", EX+"Unfall")) or add_conf("smk1", ob.smoke_or_fire) # Ontologische Kernaussagen je nach Label for lab in dec.labels: if lab == HazardLabel.PERSON_ON_TRACK: triples.append((EX+ctx.person_id, EX+"befindetSichIn", EX+ctx.track_id)) elif lab == HazardLabel.NEAR_EDGE_TRAIN: # approximiert: Person steht (nahe) auf Bahnsteigkante triples.append((EX+ctx.person_id, EX+"stehtAuf", EX+ctx.platform_id)) elif lab == HazardLabel.OBJECT_ON_TRACK: triples.append((EX+"obj1", EX+"befindetSichIn", EX+ctx.track_id)) elif lab == HazardLabel.CROWD_OVERFLOW: triples.append((EX+ctx.platform_id, EX+"istZugaenglich", _lit("false", "xsd:boolean"))) # Gefahr → löstAus → Alarm; Alarm → führtZu → Maßnahme triples += [ (EX+"gef1", EX+"hatBeschreibung", _lit(f"Severity={dec.severity.name}; Score={dec.score_0_100}", "xsd:string")), (EX+"gef1", EX+"löstAus", EX+ctx.alarm_id), (EX+ctx.alarm_id, EX+"führtZu", EX+ctx.measure_id), (EX+ctx.alarm_id, EX+"meldet", EX+"Polizei"), # optionaler Meldeweg ] return triples def triples_to_turtle(triples: List[Tuple[str,str,str]]) -> str: """Kleine Pretty-Printer-Hilfe für Logs/Datei-Export.""" lines = [] for s,p,o in triples: if not o.startswith(EX) and not o.startswith("\""): # Literale sind schon getaggt; ansonsten als Ressourcen belassen o = o lines.append(f"{s} {p} {o} .") return "\n".join(lines)