bahngleis-detektor / app_original_backup.py
Migjomatic's picture
Deutsche Fassung
a5ab0c0
raw
history blame
27.4 kB
import streamlit as st
import cv2
import os
import tempfile
import requests
import base64
import subprocess
import json
from io import BytesIO
from PIL import Image
import numpy as np
from dotenv import load_dotenv
from ontology_eval import Observation, evaluate, OntologyContext, decision_to_triples, triples_to_turtle, Severity
# Try to import local models, fall back gracefully if not available
try:
from local_models import get_local_model_manager
LOCAL_MODELS_AVAILABLE = True
except ImportError as e:
LOCAL_MODELS_AVAILABLE = False
print(f"Local models not available: {e}")
def get_local_model_manager():
return None
# Load environment variables
load_dotenv()
def load_settings():
"""Load settings from JSON file"""
try:
with open('settings.json', 'r') as f:
return json.load(f)
except FileNotFoundError:
return {}
# Local models configuration
LOCAL_MODELS_ENABLED = LOCAL_MODELS_AVAILABLE
REMOTE_MODELS_ENABLED = True # Always allow remote API as fallback
# Initialize local model manager
@st.cache_resource
def initialize_local_models():
"""Initialize local model manager"""
return get_local_model_manager()
# Hugging Face models for vision-language tasks (kept for compatibility)
AVAILABLE_MODELS = {
"microsoft/kosmos-2-patch14-224": "Kosmos-2",
"Salesforce/blip-image-captioning-large": "BLIP Image Captioning",
"microsoft/DialoGPT-medium": "DialoGPT",
"microsoft/git-large-coco": "GIT Large COCO",
"nlpconnect/vit-gpt2-image-captioning": "ViT-GPT2"
}
def repair_video_with_ffmpeg(input_path, output_path):
"""
Repair corrupted video by moving moov atom to the beginning
"""
try:
# Try to fix the video using FFmpeg
cmd = [
'ffmpeg',
'-i', input_path,
'-c', 'copy',
'-movflags', 'faststart',
'-avoid_negative_ts', 'make_zero',
'-y', # Overwrite output file
output_path
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError):
return False
def extract_frames_from_video(video_file, fps=1):
"""
Extract frames from video at specified FPS (default 1 frame per second)
Automatically handles corrupted videos by attempting repair with FFmpeg
"""
frames = []
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_file:
tmp_file.write(video_file.read())
tmp_file_path = tmp_file.name
repaired_path = None
try:
# First attempt: try to open video directly
cap = cv2.VideoCapture(tmp_file_path)
# Check if video opened successfully and has frames
if not cap.isOpened() or cap.get(cv2.CAP_PROP_FRAME_COUNT) == 0:
cap.release()
# Second attempt: try to repair the video with FFmpeg
st.warning("Video appears corrupted (moov atom issue). Attempting repair...")
with tempfile.NamedTemporaryFile(delete=False, suffix='_repaired.mp4') as repaired_file:
repaired_path = repaired_file.name
if repair_video_with_ffmpeg(tmp_file_path, repaired_path):
st.success("Video repair successful! Processing frames...")
cap = cv2.VideoCapture(repaired_path)
else:
st.error("Failed to repair video. FFmpeg may not be installed or video is severely corrupted.")
return frames
# Extract video properties
video_fps = cap.get(cv2.CAP_PROP_FPS)
if video_fps <= 0:
video_fps = 30 # Default fallback FPS
frame_interval = int(video_fps / fps) if video_fps > fps else 1
frame_count = 0
extracted_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
frames.append({
'frame': pil_image,
'timestamp': frame_count / video_fps,
'frame_number': extracted_count
})
extracted_count += 1
frame_count += 1
cap.release()
finally:
# Clean up temporary files
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
if repaired_path and os.path.exists(repaired_path):
os.unlink(repaired_path)
return frames
def image_to_base64(image):
"""Convert PIL image to base64 string"""
buffer = BytesIO()
image.save(buffer, format="PNG")
img_str = base64.b64encode(buffer.getvalue()).decode()
return img_str
def process_image_locally(image, prompt, model_name, local_manager):
"""
Process image using local models
"""
try:
if model_name == "Person on Track Detector":
# Special handling for person-on-track detection
result = local_manager.person_on_track_detector.detect_person_on_track(image)
return {"person_on_track_detection": result}
else:
caption = local_manager.generate_caption(model_name, image, prompt)
return {"generated_text": caption}
except Exception as e:
return {"error": f"Local processing failed: {str(e)}"}
def query_huggingface_api(image, prompt, model_name, api_token):
"""
Query Hugging Face API with image and prompt
"""
API_URL = f"https://api-inference.huggingface.co/models/{model_name}"
headers = {"Authorization": f"Bearer {api_token}"}
# Convert image to base64
img_base64 = image_to_base64(image)
# Prepare payload based on model type
if "blip" in model_name.lower():
# For BLIP models, send image directly
buffer = BytesIO()
image.save(buffer, format="PNG")
response = requests.post(
API_URL,
headers=headers,
files={"file": buffer.getvalue()}
)
else:
# For other vision-language models
payload = {
"inputs": {
"image": img_base64,
"text": prompt
}
}
response = requests.post(API_URL, headers=headers, json=payload)
if response.status_code == 200:
return response.json()
else:
return {"error": f"API request failed: {response.status_code} - {response.text}"}
def analyze_scene_with_ontology(scene_description, use_ontology=True):
"""
Analyze scene description using ontology-based evaluation
Returns classification and explanation
"""
if not use_ontology:
return {
"severity": "NONE",
"severity_icon": "βœ…",
"score": 0,
"explanation": "Ontology-based analysis skipped",
"ontology_used": False,
"raw_description": scene_description
}
# Extract relevant information from scene description for ontology
scene_lower = scene_description.lower().strip() if scene_description else ""
# Initialize observation based on scene analysis
obs = Observation()
# Analyze scene for ontology features
person_words = ['person', 'people', 'man', 'woman', 'boy', 'girl', 'human', 'individual', 'someone']
track_words = ['track', 'tracks', 'rail', 'rails', 'railway', 'railroad']
platform_words = ['platform', 'station', 'bahnsteig']
danger_words = ['fallen', 'lying', 'down', 'accident', 'emergency']
fire_words = ['fire', 'smoke', 'flames', 'burning']
crowd_words = ['crowd', 'many people', 'group', 'mehrere personen']
safe_words = ['no people', 'empty', 'clear', 'safe', 'nobody', 'without people']
# Set observation values based on keyword analysis
person_mentions = sum(1 for word in person_words if word in scene_lower)
track_mentions = sum(1 for word in track_words if word in scene_lower)
platform_mentions = sum(1 for word in platform_words if word in scene_lower)
danger_mentions = sum(1 for word in danger_words if word in scene_lower)
fire_mentions = sum(1 for word in fire_words if word in scene_lower)
crowd_mentions = sum(1 for word in crowd_words if word in scene_lower)
safe_mentions = sum(1 for word in safe_words if word in scene_lower)
# Person on track detection (but not if explicitly safe)
if person_mentions > 0 and track_mentions > 0 and safe_mentions == 0:
# Check if person is actually on the tracks vs just mentioned
on_track_indicators = ['on track', 'on the track', 'on rails', 'on the rails', 'standing on', 'walking on']
on_track_specific = sum(1 for phrase in on_track_indicators if phrase in scene_lower)
if on_track_specific > 0:
obs.on_track_person = min(0.8, 0.6 + on_track_specific * 0.1)
elif person_mentions > 0 and track_mentions > 0:
# General co-occurrence but less confident - need stronger evidence
near_indicators = ['near', 'close to', 'next to', 'beside', 'by the']
near_mentions = sum(1 for phrase in near_indicators if phrase in scene_lower)
if near_mentions > 0:
# Person near tracks but not necessarily on them - lower confidence
obs.on_track_person = min(0.4, 0.25 + near_mentions * 0.05)
else:
# Just mention of person and tracks together - very low confidence
obs.on_track_person = min(0.3, 0.2 + (person_mentions + track_mentions) * 0.02)
# Fallen person detection
if person_mentions > 0 and danger_mentions > 0:
obs.fallen_person = min(0.7, 0.4 + danger_mentions * 0.1)
# Fire/smoke detection
if fire_mentions > 0:
obs.smoke_or_fire = min(0.8, 0.5 + fire_mentions * 0.15)
# Crowd detection
if crowd_mentions > 0 and (track_mentions > 0 or platform_mentions > 0):
obs.crowd_on_track = min(0.7, 0.4 + crowd_mentions * 0.1)
# Generic object detection (if no person but something mentioned on tracks)
if track_mentions > 0 and person_mentions == 0 and any(word in scene_lower for word in ['object', 'item', 'thing', 'debris']):
obs.object_on_track = 0.6
# Evaluate using ontology
decision = evaluate(obs)
# Map severity to icons and colors
severity_mapping = {
Severity.NONE: {"icon": "βœ…", "color": "green"},
Severity.LOW: {"icon": "🟒", "color": "lightgreen"},
Severity.MEDIUM: {"icon": "🟠", "color": "orange"},
Severity.HIGH: {"icon": "⚠️", "color": "red"},
Severity.CRITICAL: {"icon": "🚨", "color": "darkred"}
}
severity_info = severity_mapping[decision.severity]
return {
"severity": decision.severity.name,
"severity_icon": severity_info["icon"],
"severity_color": severity_info["color"],
"score": decision.score_0_100,
"labels": [label.value for label in decision.labels],
"explanations": decision.explanations,
"fired_rules": decision.fired_rules,
"ontology_used": True,
"raw_description": scene_description,
"observation": obs,
"decision": decision
}
def main():
st.set_page_config(
page_title="Video Frame Analyzer",
page_icon="πŸŽ₯",
layout="wide"
)
st.title("πŸŽ₯ Video Frame Analyzer with Local AI Models")
st.markdown("Upload a video, provide a prompt, and analyze each frame using local AI models (CNN or Transformer)")
# Load settings and initialize local models
settings = load_settings()
# Initialize local models if enabled
local_manager = None
local_models_available = False
if LOCAL_MODELS_ENABLED:
try:
local_manager = initialize_local_models()
local_models_available = True
st.success("πŸ€– Local AI models initialized successfully!")
except Exception as e:
st.warning(f"Local AI models not available: {str(e)}")
st.info("πŸ’‘ Install AI packages: `pip install torch torchvision transformers accelerate sentencepiece`")
local_models_available = False
else:
st.info("πŸ’‘ Local AI models not installed. Install with: `pip install torch torchvision transformers accelerate sentencepiece`")
# Sidebar for configuration
with st.sidebar:
st.header("Configuration")
# Model type selection
available_options = []
if local_models_available:
available_options.append("Local Models")
if REMOTE_MODELS_ENABLED:
available_options.append("Remote API")
if not available_options:
available_options = ["Remote API"] # Fallback
model_type = st.radio(
"Model Type",
available_options,
help="Choose between local AI models or remote Hugging Face API"
)
if model_type == "Local Models" and local_models_available:
# Local model selection
available_local_models = local_manager.get_available_models()
selected_model = st.selectbox(
"Select Local Model",
options=available_local_models,
help="Choose between CNN (fast) or Transformer (detailed) models"
)
# Show model info
model_info = local_manager.get_model_info()
if selected_model in model_info:
with st.expander("Model Information"):
st.write(f"**Description:** {model_info[selected_model]['description']}")
st.write(f"**Strengths:** {model_info[selected_model]['strengths']}")
st.write(f"**Size:** {model_info[selected_model]['size']}")
api_token = None # Not needed for local models
else:
# Remote API configuration
default_token = settings.get('hugging_face_api_token', '')
api_token = st.text_input(
"Hugging Face API Token",
value=default_token,
type="password",
help="Get your token from https://huggingface.co/settings/tokens or save in settings.json"
)
# Remote model selection
selected_model = st.selectbox(
"Select Model",
options=list(AVAILABLE_MODELS.keys()),
format_func=lambda x: AVAILABLE_MODELS[x]
)
# Frame extraction rate
fps = st.slider(
"Frames per second to extract",
min_value=0.1,
max_value=5.0,
value=1.0,
step=0.1
)
# Ontology settings
st.subheader("Ontology Analysis")
use_ontology = st.checkbox(
"Enable Ontology Analysis",
value=True,
help="Use ontology-based classification (NONE/LOW/MEDIUM/HIGH/CRITICAL)"
)
if not use_ontology:
st.info("πŸ”„ Ontology analysis disabled - showing raw model output only")
# Main content area
col1, col2 = st.columns([1, 1])
with col1:
st.header("Input")
# Video upload
video_file = st.file_uploader(
"Upload Video",
type=['mp4', 'avi', 'mov', 'mkv'],
help="Upload a video file to analyze"
)
# Prompt input (conditional based on model)
if model_type == "Local Models" and local_models_available and selected_model == "Person on Track Detector":
# Person on Track Detector works automatically
st.info("πŸ€– Person on Track Detector works automatically - no prompt needed!")
prompt = "automatic" # Set automatic prompt
else:
# Regular models need user prompt
prompt = st.text_area(
"Analysis Prompt",
placeholder="Describe what you see in the image...",
help="Enter the prompt to analyze each frame"
)
# Process button
process_button = st.button("Video auswΓ€hlen", type="primary")
with col2:
st.header("Results")
results_container = st.container()
# Processing logic
if process_button and video_file and (prompt or (model_type == "Local Models" and selected_model == "Person on Track Detector")) and (api_token or model_type == "Local Models"):
with st.spinner("Processing video..."):
# Extract frames
frames = extract_frames_from_video(video_file, fps)
if not frames:
st.error("No frames could be extracted from the video")
return
st.success(f"Extracted {len(frames)} frames from video")
# Process each frame
results = []
progress_bar = st.progress(0)
for i, frame_data in enumerate(frames):
with st.spinner(f"Analyzing frame {i+1}/{len(frames)}..."):
# Process frame based on model type
if model_type == "Local Models" and local_models_available:
result = process_image_locally(
frame_data['frame'],
prompt,
selected_model,
local_manager
)
else:
result = query_huggingface_api(
frame_data['frame'],
prompt,
selected_model,
api_token
)
# Extract scene description for ontology analysis
scene_description = ""
if 'person_on_track_detection' in result:
# For person detection results, use the analysis text
scene_description = result['person_on_track_detection'].get('detailed_analysis', {}).get('scene_description', '')
elif 'generated_text' in result:
scene_description = result['generated_text']
elif isinstance(result, list) and len(result) > 0 and 'generated_text' in result[0]:
scene_description = result[0]['generated_text']
# Apply ontology analysis
ontology_analysis = analyze_scene_with_ontology(scene_description, use_ontology)
results.append({
'frame_number': frame_data['frame_number'],
'timestamp': frame_data['timestamp'],
'image': frame_data['frame'],
'result': result,
'ontology_analysis': ontology_analysis
})
progress_bar.progress((i + 1) / len(frames))
# Display results
with results_container:
st.subheader("Analysis Results")
for result_data in results:
ontology = result_data['ontology_analysis']
severity_icon = ontology.get('severity_icon', 'βœ…')
severity = ontology.get('severity', 'NONE')
# Create expander title with severity indicator
expander_title = f"{severity_icon} {severity} - Frame {result_data['frame_number']} (t={result_data['timestamp']:.1f}s)"
with st.expander(expander_title):
col_img, col_text = st.columns([1, 2])
with col_img:
st.image(
result_data['image'],
caption=f"Frame {result_data['frame_number']}",
use_container_width=True
)
with col_text:
# Display ontology analysis first if enabled
if ontology.get('ontology_used', False):
# Severity display with color
severity_color = ontology.get('severity_color', 'green')
st.markdown(f"**Safety Assessment:** :{severity_color}[{severity_icon} {severity}]")
# Score display
if ontology.get('score', 0) > 0:
st.metric("Risk Score", f"{ontology['score']}/100")
# Show explanations if available
if ontology.get('explanations'):
st.write("**Ontology Analysis:**")
for explanation in ontology['explanations']:
st.write(f"β€’ {explanation}")
# Show fired rules if available
if ontology.get('fired_rules'):
with st.expander("Technical Details"):
st.write("**Triggered Rules:**")
for rule in ontology['fired_rules']:
st.code(rule)
if ontology.get('labels'):
st.write("**Detected Hazard Labels:**")
for label in ontology['labels']:
st.code(label)
st.divider()
# Display original model results
st.write("**Model Output:**")
if 'error' in result_data['result']:
st.error(f"Error: {result_data['result']['error']}")
elif 'person_on_track_detection' in result_data['result']:
# Handle person-on-track detection results
detection = result_data['result']['person_on_track_detection']
people_count = detection.get('people_count', 0)
confidence = detection.get('confidence', 0)
analysis = detection.get('analysis', 'No analysis')
person_on_track = detection.get('person_on_track', False)
st.write(f"**Detection Analysis:** {analysis}")
# Show metrics
col1, col2 = st.columns(2)
with col1:
st.metric("πŸ‘₯ People Detected", people_count)
with col2:
st.metric("πŸ“Š Model Confidence", f"{confidence:.0%}")
else:
if 'generated_text' in result_data['result']:
# Handle direct generated_text response (local models)
st.write(f"*{result_data['result']['generated_text']}*")
elif isinstance(result_data['result'], list) and len(result_data['result']) > 0:
# Handle list responses (common for captioning models)
if 'generated_text' in result_data['result'][0]:
st.write(f"*{result_data['result'][0]['generated_text']}*")
else:
st.json(result_data['result'][0])
else:
st.json(result_data['result'])
elif process_button:
if not video_file:
st.error("Please upload a video file")
if not prompt and not (model_type == "Local Models" and selected_model == "Person on Track Detector"):
st.error("Please enter an analysis prompt")
if not api_token and model_type == "Remote API":
st.error("Please provide your Hugging Face API token for remote models")
if model_type == "Local Models" and not local_models_available:
st.error("Local models failed to initialize. Check your installation.")
# Instructions
with st.expander("How to use"):
st.markdown("""
## Local AI Models (Recommended)
1. **Upload a video**: Choose a video file (MP4, AVI, MOV, or MKV)
2. **Select model type**: Choose "Local Models" for offline processing
3. **Choose AI model**:
- **CNN (BLIP)**: Fast, good for object detection (~1.2GB)
- **Transformer (ViT-GPT2)**: Detailed descriptions (~1.8GB)
4. **Enter a prompt**: Describe what you want the AI to analyze
5. **Adjust frame rate**: Set frames per second to extract (default: 1 fps)
6. **Click Process**: Frames are processed locally on your machine
## Remote API Models (Optional)
1. **Get API token**: Visit [Hugging Face Settings](https://huggingface.co/settings/tokens)
2. **Select "Remote API"** in model type
3. **Enter token** and select remote model
## Video Support Features
- **Automatic corruption repair**: Handles videos with corrupted moov atoms
- **FFmpeg integration**: Auto-repairs problematic video files
- **Multiple formats**: MP4, AVI, MOV, MKV support
## Requirements
- **Python packages**: torch, transformers, accelerate (see requirements.txt)
- **Optional**: FFmpeg for video repair (download from https://ffmpeg.org)
- **Storage**: ~3GB for both local models
## Example Prompts
- "Describe what you see in this image"
- "Count the number of people in this scene"
- "What objects are visible in this frame?"
- "Describe the emotions and actions in this scene"
- "What is the main activity happening here?"
""")
if __name__ == "__main__":
main()