Spaces:
Build error
Build error
| import streamlit as st | |
| import subprocess | |
| import tempfile | |
| import os | |
| import shutil | |
| import sys | |
| import time | |
| import re | |
| import pandas as pd | |
| import threading | |
| import queue | |
| from datetime import datetime | |
| import graphviz | |
| # Title and description | |
| st.title("🎥 GenConViT Deepfake Detection") | |
| st.markdown("**Upload a video and analyze it for deepfakes with AI models.**") | |
| # File uploader | |
| uploaded_file = st.file_uploader("Upload a video file", type=["mp4", "avi", "mov", "mkv"], help="Supported formats: MP4, AVI, MOV, MKV") | |
| # Video preview | |
| if uploaded_file is not None: | |
| st.subheader("📼 Video Preview") | |
| # Create a temporary file with correct extension | |
| tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| tfile.write(uploaded_file.getvalue()) | |
| tfile.close() | |
| # Display the video | |
| st.video(tfile.name) | |
| # Return file pointer to beginning for later processing | |
| uploaded_file.seek(0) | |
| # Configuration | |
| st.markdown("### ⚙️ Configuration") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| model_choice = st.selectbox("Select Model Variant", options=["Autoencoder (ED)", "Variational Autoencoder (VAE)", "Both"]) | |
| with col2: | |
| num_frames = st.number_input("Number of Frames to Extract", min_value=1, max_value=100, value=10) | |
| # Create function to read process output | |
| def read_output(process, out_queue): | |
| for line in iter(process.stdout.readline, ''): | |
| if line: | |
| out_queue.put(line.strip()) | |
| process.stdout.close() | |
| def read_error(process, err_queue): | |
| for line in iter(process.stderr.readline, ''): | |
| if line: | |
| err_queue.put(line.strip()) | |
| process.stderr.close() | |
| # Function to create and update the dynamic flowchart | |
| def create_flowchart(stage=None, model_choice="Both"): | |
| # Create Graphviz object | |
| graph = graphviz.Digraph('pipeline', graph_attr={'rankdir': 'LR', 'size': '15,10'}) # Increased size significantly | |
| # Overall graph aesthetics (same as before) | |
| graph.attr(bgcolor='#f0f2f6') | |
| graph.attr('node', style='filled,rounded', shape='box', fontname='Arial', fontsize='12', | |
| padding='0.5', fixedsize='false', width='1.5') | |
| graph.attr('edge', arrowhead='vee', arrowsize='0.8', color='#555555') | |
| # Define node styles and more visually appealing colors (same as before) | |
| stages = { | |
| "upload": {"label": "Upload\nVideo", "fillcolor": "#ddeedd", "color": "#336633", "done": False}, | |
| "frames": {"label": "Extract\nFrames", "fillcolor": "#eef2ff", "color": "#336699", "done": False}, | |
| "preprocessing": {"label": "Preprocess\nFrames", "fillcolor": "#fff0ee", "color": "#996633", "done": False}, | |
| "model_selection": {"label": "Model\nSelection", "fillcolor": "#fff8dc", "color": "#cc9933", "done": False} | |
| } | |
| # Add model paths based on model_choice with distinct colors (same as before) | |
| if model_choice == "Both" or model_choice == "Autoencoder (ED)": | |
| stages["ed_model"] = {"label": "Autoencoder\n(ED)", "fillcolor": "#f0e68c", "color": "#a67d3d", "done": False} | |
| stages["ed_results"] = {"label": "ED\nResults", "fillcolor": "#e0ffff", "color": "#668b8b", "done": False} | |
| if model_choice == "Both" or model_choice == "Variational Autoencoder (VAE)": | |
| stages["vae_model"] = {"label": "Variational\nAutoencoder\n(VAE)", "fillcolor": "#ffb6c1", "color": "#8b475d", "done": False} | |
| stages["vae_results"] = {"label": "VAE\nResults", "fillcolor": "#f0f8ff", "color": "#6a5acd", "done": False} | |
| stages["final_results"] = {"label": "Final\nResults", "fillcolor": "#c0c0c0", "color": "#555555", "done": False} | |
| # Update colors based on current stage (same as before) | |
| if stage: | |
| # Mark all previous stages as done | |
| for key in stages: | |
| if key == stage: | |
| stages[key]["fillcolor"] = "#ffcc00" | |
| stages[key]["color"] = "#b8860b" | |
| break | |
| else: | |
| stages[key]["fillcolor"] = "#90ee90" | |
| stages[key]["color"] = "#006400" | |
| stages[key]["done"] = True | |
| # Add nodes to graph (same as before) | |
| for key, details in stages.items(): | |
| graph.node(key, details["label"], fillcolor=details["fillcolor"], color=details["color"]) | |
| # Add edges based on model_choice (same as before) | |
| graph.edge("upload", "frames") | |
| graph.edge("frames", "preprocessing") | |
| graph.edge("preprocessing", "model_selection") | |
| if model_choice == "Both": | |
| graph.edge("model_selection", "ed_model") | |
| graph.edge("model_selection", "vae_model") | |
| graph.edge("ed_model", "ed_results") | |
| graph.edge("vae_model", "vae_results") | |
| graph.edge("ed_results", "final_results") | |
| graph.edge("vae_results", "final_results") | |
| elif model_choice == "Autoencoder (ED)": | |
| graph.edge("model_selection", "ed_model") | |
| graph.edge("ed_model", "ed_results") | |
| graph.edge("ed_results", "final_results") | |
| elif model_choice == "Variational Autoencoder (VAE)": | |
| graph.edge("model_selection", "vae_model") | |
| graph.edge("vae_model", "vae_results") | |
| graph.edge("vae_results", "final_results") | |
| return graph | |
| # Function to parse prediction results | |
| def parse_prediction(text): | |
| try: | |
| # Extract prediction probability | |
| prob_match = re.search(r'Prediction: ([\d\.]+)', text) | |
| probability = float(prob_match.group(1)) if prob_match else None | |
| # Extract prediction classification | |
| class_match = re.search(r'(FAKE|REAL)', text) | |
| classification = class_match.group(1) if class_match else "Unknown" | |
| # Extract fake/real counts | |
| counts_match = re.search(r'Fake: (\d+) Real: (\d+)', text) | |
| fake_count = int(counts_match.group(1)) if counts_match else 0 | |
| real_count = int(counts_match.group(2)) if counts_match else 0 | |
| # Extract processing time | |
| time_match = re.search(r'--- ([\d\.]+) seconds ---', text) | |
| process_time = float(time_match.group(1)) if time_match else None | |
| return { | |
| "probability": probability, | |
| "classification": classification, | |
| "fake_count": fake_count, | |
| "real_count": real_count, | |
| "process_time": process_time | |
| } | |
| except Exception as e: | |
| st.error(f"Error parsing results: {str(e)}") | |
| return { | |
| "probability": None, | |
| "classification": "Error", | |
| "fake_count": 0, | |
| "real_count": 0, | |
| "process_time": None | |
| } | |
| # Pipeline with dynamic progress | |
| def run_pipeline(video_file, model_choice, num_frames): | |
| # Create placeholders for dynamic updates | |
| status_text = st.empty() | |
| flowchart_area = st.empty() # Revert back to single placeholder | |
| output_area = st.empty() | |
| # Create temporary directory (same as before) | |
| temp_dir = tempfile.mkdtemp() | |
| video_path = os.path.join(temp_dir, video_file.name) | |
| try: | |
| # Remove two column layout - back to single column | |
| # Static Pipeline Architecture image - display it *above* the dynamic flowchart now | |
| st.subheader("🔄 Pipeline Architecture") | |
| st.image("pipeline_architecture.png", caption="GenConViT Pipeline Architecture") | |
| # Dynamic Flowchart Area (single column now) | |
| # flowchart_area = st.empty() # Already defined above | |
| # No need for 'with col2:' or column specific code | |
| # Initial flowchart | |
| flowchart_area.graphviz_chart(create_flowchart(None, model_choice)) | |
| # Step 1: Save video (same as before) | |
| status_text.info("Saving video to temporary directory...") | |
| flowchart_area.graphviz_chart(create_flowchart("upload", model_choice)) | |
| with open(video_path, "wb") as f: | |
| f.write(video_file.getbuffer()) | |
| time.sleep(1) | |
| # Step 2: Prepare for frame extraction (same as before) | |
| status_text.info("Preparing to extract frames...") | |
| flowchart_area.graphviz_chart(create_flowchart("frames", model_choice)) | |
| time.sleep(1) | |
| # Step 3: Prepare command (same as before) | |
| model_flags = [] | |
| if model_choice == "Autoencoder (ED)": | |
| model_flags = ["--e", "genconvit_ed_inference"] | |
| elif model_choice == "Variational Autoencoder (VAE)": | |
| model_flags = ["--v", "genconvit_vae_inference"] | |
| elif model_choice == "Both": | |
| model_flags = ["--e", "genconvit_ed_inference", "--v", "genconvit_vae_inference"] | |
| command = [sys.executable, "prediction.py", "--p", temp_dir, "--f", str(num_frames)] + model_flags | |
| status_text.info(f"Running command: {' '.join(command)}") | |
| # Step 4: Show preprocessing stage (same as before) | |
| flowchart_area.graphviz_chart(create_flowchart("preprocessing", model_choice)) | |
| time.sleep(1) | |
| # Step 5: Show model selection stage (same as before) | |
| flowchart_area.graphviz_chart(create_flowchart("model_selection", model_choice)) | |
| time.sleep(1) | |
| # Step 6: Run inference with dynamic output (same as before) | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| universal_newlines=True | |
| ) | |
| # Create queues and threads to read output (same as before) | |
| out_queue = queue.Queue() | |
| err_queue = queue.Queue() | |
| out_thread = threading.Thread(target=read_output, args=(process, out_queue)) | |
| err_thread = threading.Thread(target=read_error, args=(process, err_queue)) | |
| out_thread.daemon = True | |
| err_thread.daemon = True | |
| out_thread.start() | |
| err_thread.start() | |
| # Process output dynamically (same as before) | |
| output_text = [] | |
| tqdm_pattern = re.compile(r'(\d+)%\|.*?\| (\d+)/(\d+)') | |
| # Model processing stage | |
| if model_choice == "Autoencoder (ED)" or model_choice == "Both": | |
| flowchart_area.graphviz_chart(create_flowchart("ed_model", model_choice)) | |
| else: | |
| flowchart_area.graphviz_chart(create_flowchart("vae_model", model_choice)) | |
| processing_started = False | |
| ed_done = False | |
| vae_done = False | |
| results_shown = False | |
| while process.poll() is None or not out_queue.empty() or not err_queue.empty(): | |
| # Process stdout (same as before) | |
| while not out_queue.empty(): | |
| line = out_queue.get() | |
| output_text.append(line) | |
| # Show the user what's happening | |
| output_area.code("\n".join(output_text[-10:])) | |
| # Check for specific progress indicators | |
| if "Loading..." in line and not processing_started: | |
| processing_started = True | |
| status_text.info("Loading video data...") | |
| # Check for tqdm progress | |
| tqdm_match = tqdm_pattern.search(line) | |
| if tqdm_match: | |
| current = int(tqdm_match.group(2)) | |
| total = int(tqdm_match.group(3)) | |
| status_text.info(f"Processing frame {current}/{total}") | |
| # Look for specific model outputs | |
| if "genconvit_ed_inference" in line or "Autoencoder" in line: | |
| if not ed_done and (model_choice == "Autoencoder (ED)" or model_choice == "Both"): | |
| ed_done = True | |
| flowchart_area.graphviz_chart(create_flowchart("ed_results", model_choice)) | |
| status_text.info("Processing with Autoencoder (ED)...") | |
| if "genconvit_vae_inference" in line or "Variational" in line: | |
| if not vae_done and (model_choice == "Variational Autoencoder (VAE)" or model_choice == "Both"): | |
| vae_done = True | |
| flowchart_area.graphviz_chart(create_flowchart("vae_results", model_choice)) | |
| status_text.info("Processing with Variational Autoencoder (VAE)...") | |
| # Check for prediction results | |
| if "Prediction:" in line and not results_shown: | |
| results_shown = True | |
| flowchart_area.graphviz_chart(create_flowchart("final_results", model_choice)) | |
| status_text.success("Processing complete! Analyzing results...") | |
| # Process stderr (same as before) | |
| while not err_queue.empty(): | |
| line = err_queue.get() | |
| output_text.append(f"ERROR: {line}") | |
| output_area.code("\n".join(output_text[-10:])) | |
| time.sleep(0.1) | |
| # Final update if not already done (same as before) | |
| if not results_shown: | |
| flowchart_area.graphviz_chart(create_flowchart("final_results", model_choice)) | |
| status_text.success("Processing complete!") | |
| # Full output (hidden by default) (same as before) | |
| with st.expander("Show full processing log"): | |
| st.code("\n".join(output_text)) | |
| # Step 7: Parse and display results nicely (same as before) | |
| if any("Prediction:" in line for line in output_text): | |
| prediction_line = next((line for line in output_text if "Prediction:" in line), None) | |
| time_line = next((line for line in output_text if "seconds" in line), None) | |
| if prediction_line: | |
| full_result = prediction_line | |
| if time_line: | |
| full_result += "\n" + time_line | |
| results = parse_prediction(full_result) | |
| # Display nicely formatted results | |
| st.markdown("### 📊 Results") | |
| col1, col2, col3 = st.columns(3) # Reverted back to single column names | |
| with col1: | |
| # Determine color based on classification | |
| if results["classification"] == "FAKE": | |
| st.markdown(f"<div style='background-color:#ffcccc;padding:10px;border-radius:5px;'><h3 style='text-align:center;margin:0;'>⚠️ {results['classification']} ⚠️</h3></div>", unsafe_allow_html=True) | |
| else: | |
| st.markdown(f"<div style='background-color:#ccffcc;padding:10px;border-radius:5px;'><h3 style='text-align:center;margin:0;'>✅ {results['classification']} ✅</h3></div>", unsafe_allow_html=True) | |
| with col2: | |
| confidence = f"{results['probability']*100:.2f}%" if results['probability'] else "N/A" | |
| st.metric("Confidence", confidence) | |
| with col3: | |
| process_time = f"{results['process_time']:.2f}s" if results['process_time'] else "N/A" | |
| st.metric("Processing Time", process_time) | |
| # Create a dataframe for the detailed results | |
| details = { | |
| "Metric": ["Video", "Model Used", "Frames Analyzed", "Fake Frames", "Real Frames", "Date/Time"], | |
| "Value": [ | |
| video_file.name, | |
| model_choice, | |
| num_frames, | |
| results["fake_count"], | |
| results["real_count"], | |
| datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| ] | |
| } | |
| st.dataframe(pd.DataFrame(details), use_container_width=True) | |
| # Visualization of the results | |
| if results["fake_count"] > 0 or results["real_count"] > 0: | |
| st.subheader("Frame Classification") | |
| chart_data = pd.DataFrame({ | |
| "Category": ["Fake Frames", "Real Frames"], | |
| "Count": [results["fake_count"], results["real_count"]] | |
| }) | |
| st.bar_chart(chart_data, x="Category", y="Count") | |
| else: | |
| st.warning("No prediction results found in the output.") | |
| except Exception as e: | |
| status_text.error("Error occurred!") | |
| st.error(f"Pipeline failed: {str(e)}") | |
| finally: | |
| # Cleanup (same as before) | |
| if os.path.exists(temp_dir): | |
| shutil.rmtree(temp_dir) | |
| # Run pipeline on button click | |
| if st.button("Run Inference"): | |
| if uploaded_file is not None: | |
| st.markdown("### 🔄 Pipeline Execution") | |
| run_pipeline(uploaded_file, model_choice, num_frames) | |
| else: | |
| st.warning("Please upload a video file first!") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("Built with Streamlit | © 2025") |