deepfake / app.py
tony133777's picture
Upload 12 files
9907b42 verified
import streamlit as st
import subprocess
import tempfile
import os
import shutil
import sys
import time
import re
import pandas as pd
import threading
import queue
from datetime import datetime
import graphviz
# Title and description
st.title("🎥 GenConViT Deepfake Detection")
st.markdown("**Upload a video and analyze it for deepfakes with AI models.**")
# File uploader
uploaded_file = st.file_uploader("Upload a video file", type=["mp4", "avi", "mov", "mkv"], help="Supported formats: MP4, AVI, MOV, MKV")
# Video preview
if uploaded_file is not None:
st.subheader("📼 Video Preview")
# Create a temporary file with correct extension
tfile = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4')
tfile.write(uploaded_file.getvalue())
tfile.close()
# Display the video
st.video(tfile.name)
# Return file pointer to beginning for later processing
uploaded_file.seek(0)
# Configuration
st.markdown("### ⚙️ Configuration")
col1, col2 = st.columns(2)
with col1:
model_choice = st.selectbox("Select Model Variant", options=["Autoencoder (ED)", "Variational Autoencoder (VAE)", "Both"])
with col2:
num_frames = st.number_input("Number of Frames to Extract", min_value=1, max_value=100, value=10)
# Create function to read process output
def read_output(process, out_queue):
for line in iter(process.stdout.readline, ''):
if line:
out_queue.put(line.strip())
process.stdout.close()
def read_error(process, err_queue):
for line in iter(process.stderr.readline, ''):
if line:
err_queue.put(line.strip())
process.stderr.close()
# Function to create and update the dynamic flowchart
def create_flowchart(stage=None, model_choice="Both"):
# Create Graphviz object
graph = graphviz.Digraph('pipeline', graph_attr={'rankdir': 'LR', 'size': '15,10'}) # Increased size significantly
# Overall graph aesthetics (same as before)
graph.attr(bgcolor='#f0f2f6')
graph.attr('node', style='filled,rounded', shape='box', fontname='Arial', fontsize='12',
padding='0.5', fixedsize='false', width='1.5')
graph.attr('edge', arrowhead='vee', arrowsize='0.8', color='#555555')
# Define node styles and more visually appealing colors (same as before)
stages = {
"upload": {"label": "Upload\nVideo", "fillcolor": "#ddeedd", "color": "#336633", "done": False},
"frames": {"label": "Extract\nFrames", "fillcolor": "#eef2ff", "color": "#336699", "done": False},
"preprocessing": {"label": "Preprocess\nFrames", "fillcolor": "#fff0ee", "color": "#996633", "done": False},
"model_selection": {"label": "Model\nSelection", "fillcolor": "#fff8dc", "color": "#cc9933", "done": False}
}
# Add model paths based on model_choice with distinct colors (same as before)
if model_choice == "Both" or model_choice == "Autoencoder (ED)":
stages["ed_model"] = {"label": "Autoencoder\n(ED)", "fillcolor": "#f0e68c", "color": "#a67d3d", "done": False}
stages["ed_results"] = {"label": "ED\nResults", "fillcolor": "#e0ffff", "color": "#668b8b", "done": False}
if model_choice == "Both" or model_choice == "Variational Autoencoder (VAE)":
stages["vae_model"] = {"label": "Variational\nAutoencoder\n(VAE)", "fillcolor": "#ffb6c1", "color": "#8b475d", "done": False}
stages["vae_results"] = {"label": "VAE\nResults", "fillcolor": "#f0f8ff", "color": "#6a5acd", "done": False}
stages["final_results"] = {"label": "Final\nResults", "fillcolor": "#c0c0c0", "color": "#555555", "done": False}
# Update colors based on current stage (same as before)
if stage:
# Mark all previous stages as done
for key in stages:
if key == stage:
stages[key]["fillcolor"] = "#ffcc00"
stages[key]["color"] = "#b8860b"
break
else:
stages[key]["fillcolor"] = "#90ee90"
stages[key]["color"] = "#006400"
stages[key]["done"] = True
# Add nodes to graph (same as before)
for key, details in stages.items():
graph.node(key, details["label"], fillcolor=details["fillcolor"], color=details["color"])
# Add edges based on model_choice (same as before)
graph.edge("upload", "frames")
graph.edge("frames", "preprocessing")
graph.edge("preprocessing", "model_selection")
if model_choice == "Both":
graph.edge("model_selection", "ed_model")
graph.edge("model_selection", "vae_model")
graph.edge("ed_model", "ed_results")
graph.edge("vae_model", "vae_results")
graph.edge("ed_results", "final_results")
graph.edge("vae_results", "final_results")
elif model_choice == "Autoencoder (ED)":
graph.edge("model_selection", "ed_model")
graph.edge("ed_model", "ed_results")
graph.edge("ed_results", "final_results")
elif model_choice == "Variational Autoencoder (VAE)":
graph.edge("model_selection", "vae_model")
graph.edge("vae_model", "vae_results")
graph.edge("vae_results", "final_results")
return graph
# Function to parse prediction results
def parse_prediction(text):
try:
# Extract prediction probability
prob_match = re.search(r'Prediction: ([\d\.]+)', text)
probability = float(prob_match.group(1)) if prob_match else None
# Extract prediction classification
class_match = re.search(r'(FAKE|REAL)', text)
classification = class_match.group(1) if class_match else "Unknown"
# Extract fake/real counts
counts_match = re.search(r'Fake: (\d+) Real: (\d+)', text)
fake_count = int(counts_match.group(1)) if counts_match else 0
real_count = int(counts_match.group(2)) if counts_match else 0
# Extract processing time
time_match = re.search(r'--- ([\d\.]+) seconds ---', text)
process_time = float(time_match.group(1)) if time_match else None
return {
"probability": probability,
"classification": classification,
"fake_count": fake_count,
"real_count": real_count,
"process_time": process_time
}
except Exception as e:
st.error(f"Error parsing results: {str(e)}")
return {
"probability": None,
"classification": "Error",
"fake_count": 0,
"real_count": 0,
"process_time": None
}
# Pipeline with dynamic progress
def run_pipeline(video_file, model_choice, num_frames):
# Create placeholders for dynamic updates
status_text = st.empty()
flowchart_area = st.empty() # Revert back to single placeholder
output_area = st.empty()
# Create temporary directory (same as before)
temp_dir = tempfile.mkdtemp()
video_path = os.path.join(temp_dir, video_file.name)
try:
# Remove two column layout - back to single column
# Static Pipeline Architecture image - display it *above* the dynamic flowchart now
st.subheader("🔄 Pipeline Architecture")
st.image("pipeline_architecture.png", caption="GenConViT Pipeline Architecture")
# Dynamic Flowchart Area (single column now)
# flowchart_area = st.empty() # Already defined above
# No need for 'with col2:' or column specific code
# Initial flowchart
flowchart_area.graphviz_chart(create_flowchart(None, model_choice))
# Step 1: Save video (same as before)
status_text.info("Saving video to temporary directory...")
flowchart_area.graphviz_chart(create_flowchart("upload", model_choice))
with open(video_path, "wb") as f:
f.write(video_file.getbuffer())
time.sleep(1)
# Step 2: Prepare for frame extraction (same as before)
status_text.info("Preparing to extract frames...")
flowchart_area.graphviz_chart(create_flowchart("frames", model_choice))
time.sleep(1)
# Step 3: Prepare command (same as before)
model_flags = []
if model_choice == "Autoencoder (ED)":
model_flags = ["--e", "genconvit_ed_inference"]
elif model_choice == "Variational Autoencoder (VAE)":
model_flags = ["--v", "genconvit_vae_inference"]
elif model_choice == "Both":
model_flags = ["--e", "genconvit_ed_inference", "--v", "genconvit_vae_inference"]
command = [sys.executable, "prediction.py", "--p", temp_dir, "--f", str(num_frames)] + model_flags
status_text.info(f"Running command: {' '.join(command)}")
# Step 4: Show preprocessing stage (same as before)
flowchart_area.graphviz_chart(create_flowchart("preprocessing", model_choice))
time.sleep(1)
# Step 5: Show model selection stage (same as before)
flowchart_area.graphviz_chart(create_flowchart("model_selection", model_choice))
time.sleep(1)
# Step 6: Run inference with dynamic output (same as before)
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
# Create queues and threads to read output (same as before)
out_queue = queue.Queue()
err_queue = queue.Queue()
out_thread = threading.Thread(target=read_output, args=(process, out_queue))
err_thread = threading.Thread(target=read_error, args=(process, err_queue))
out_thread.daemon = True
err_thread.daemon = True
out_thread.start()
err_thread.start()
# Process output dynamically (same as before)
output_text = []
tqdm_pattern = re.compile(r'(\d+)%\|.*?\| (\d+)/(\d+)')
# Model processing stage
if model_choice == "Autoencoder (ED)" or model_choice == "Both":
flowchart_area.graphviz_chart(create_flowchart("ed_model", model_choice))
else:
flowchart_area.graphviz_chart(create_flowchart("vae_model", model_choice))
processing_started = False
ed_done = False
vae_done = False
results_shown = False
while process.poll() is None or not out_queue.empty() or not err_queue.empty():
# Process stdout (same as before)
while not out_queue.empty():
line = out_queue.get()
output_text.append(line)
# Show the user what's happening
output_area.code("\n".join(output_text[-10:]))
# Check for specific progress indicators
if "Loading..." in line and not processing_started:
processing_started = True
status_text.info("Loading video data...")
# Check for tqdm progress
tqdm_match = tqdm_pattern.search(line)
if tqdm_match:
current = int(tqdm_match.group(2))
total = int(tqdm_match.group(3))
status_text.info(f"Processing frame {current}/{total}")
# Look for specific model outputs
if "genconvit_ed_inference" in line or "Autoencoder" in line:
if not ed_done and (model_choice == "Autoencoder (ED)" or model_choice == "Both"):
ed_done = True
flowchart_area.graphviz_chart(create_flowchart("ed_results", model_choice))
status_text.info("Processing with Autoencoder (ED)...")
if "genconvit_vae_inference" in line or "Variational" in line:
if not vae_done and (model_choice == "Variational Autoencoder (VAE)" or model_choice == "Both"):
vae_done = True
flowchart_area.graphviz_chart(create_flowchart("vae_results", model_choice))
status_text.info("Processing with Variational Autoencoder (VAE)...")
# Check for prediction results
if "Prediction:" in line and not results_shown:
results_shown = True
flowchart_area.graphviz_chart(create_flowchart("final_results", model_choice))
status_text.success("Processing complete! Analyzing results...")
# Process stderr (same as before)
while not err_queue.empty():
line = err_queue.get()
output_text.append(f"ERROR: {line}")
output_area.code("\n".join(output_text[-10:]))
time.sleep(0.1)
# Final update if not already done (same as before)
if not results_shown:
flowchart_area.graphviz_chart(create_flowchart("final_results", model_choice))
status_text.success("Processing complete!")
# Full output (hidden by default) (same as before)
with st.expander("Show full processing log"):
st.code("\n".join(output_text))
# Step 7: Parse and display results nicely (same as before)
if any("Prediction:" in line for line in output_text):
prediction_line = next((line for line in output_text if "Prediction:" in line), None)
time_line = next((line for line in output_text if "seconds" in line), None)
if prediction_line:
full_result = prediction_line
if time_line:
full_result += "\n" + time_line
results = parse_prediction(full_result)
# Display nicely formatted results
st.markdown("### 📊 Results")
col1, col2, col3 = st.columns(3) # Reverted back to single column names
with col1:
# Determine color based on classification
if results["classification"] == "FAKE":
st.markdown(f"<div style='background-color:#ffcccc;padding:10px;border-radius:5px;'><h3 style='text-align:center;margin:0;'>⚠️ {results['classification']} ⚠️</h3></div>", unsafe_allow_html=True)
else:
st.markdown(f"<div style='background-color:#ccffcc;padding:10px;border-radius:5px;'><h3 style='text-align:center;margin:0;'>✅ {results['classification']} ✅</h3></div>", unsafe_allow_html=True)
with col2:
confidence = f"{results['probability']*100:.2f}%" if results['probability'] else "N/A"
st.metric("Confidence", confidence)
with col3:
process_time = f"{results['process_time']:.2f}s" if results['process_time'] else "N/A"
st.metric("Processing Time", process_time)
# Create a dataframe for the detailed results
details = {
"Metric": ["Video", "Model Used", "Frames Analyzed", "Fake Frames", "Real Frames", "Date/Time"],
"Value": [
video_file.name,
model_choice,
num_frames,
results["fake_count"],
results["real_count"],
datetime.now().strftime("%Y-%m-%d %H:%M:%S")
]
}
st.dataframe(pd.DataFrame(details), use_container_width=True)
# Visualization of the results
if results["fake_count"] > 0 or results["real_count"] > 0:
st.subheader("Frame Classification")
chart_data = pd.DataFrame({
"Category": ["Fake Frames", "Real Frames"],
"Count": [results["fake_count"], results["real_count"]]
})
st.bar_chart(chart_data, x="Category", y="Count")
else:
st.warning("No prediction results found in the output.")
except Exception as e:
status_text.error("Error occurred!")
st.error(f"Pipeline failed: {str(e)}")
finally:
# Cleanup (same as before)
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
# Run pipeline on button click
if st.button("Run Inference"):
if uploaded_file is not None:
st.markdown("### 🔄 Pipeline Execution")
run_pipeline(uploaded_file, model_choice, num_frames)
else:
st.warning("Please upload a video file first!")
# Footer
st.markdown("---")
st.markdown("Built with Streamlit | © 2025")