blacksinisterx's picture
fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified
"""
Reset MinIO buckets and test storage paths for DetectifAI.
This script ensures that all required MinIO buckets and storage paths
are properly configured for video processing.
"""
from minio import Minio
from minio.error import S3Error
import os
from datetime import datetime
from dotenv import load_dotenv
import logging
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# MinIO configuration
MINIO_CONFIG = {
"endpoint": os.getenv("MINIO_ENDPOINT", "s3.eu-central-003.backblazeb2.com"),
"access_key": os.getenv("MINIO_ACCESS_KEY", "00367479ffb7e4e0000000001"),
"secret_key": os.getenv("MINIO_SECRET_KEY", "K003opTvf92ijRj5dM7H1dgrlwcGTdA"),
"secure": os.getenv("MINIO_SECURE", "true").lower() == "true",
"region": os.getenv("MINIO_REGION", "eu-central-003")
}
# Bucket configuration with descriptions
BUCKETS = {
"detectifai-videos": {
"description": "Main bucket for video storage",
"prefixes": {
"original": "Original uploaded videos",
"compressed": "Compressed video versions"
}
},
"detectifai-keyframes": {
"description": "Storage for extracted video frames",
"prefixes": {
"keyframes": "Extracted keyframes and annotated frames"
}
}
}
def reset_minio_storage():
"""Reset and verify MinIO storage configuration"""
client = Minio(**MINIO_CONFIG)
print("Checking MinIO connection and buckets...")
for bucket_name, config in BUCKETS.items():
try:
# Check if bucket exists
found = client.bucket_exists(bucket_name)
if not found:
print(f"Creating bucket: {bucket_name}")
client.make_bucket(bucket_name)
# Test each prefix path
for prefix in config["prefixes"]:
test_object = f"{prefix}/test.txt"
test_data = f"Test data for {bucket_name}/{prefix}"
print(f"\nTesting path: {bucket_name}/{test_object}")
# Upload test object
test_bytes = bytes(test_data, 'utf-8')
from io import BytesIO
test_stream = BytesIO(test_bytes)
client.put_object(
bucket_name,
test_object,
test_stream,
len(test_bytes)
)
# Verify upload
try:
client.stat_object(bucket_name, test_object)
print(f"βœ… Test file uploaded successfully")
# Clean up test file
client.remove_object(bucket_name, test_object)
print(f"βœ… Test file removed")
except:
print(f"❌ Could not verify test file")
print(f"\nListing objects in {bucket_name}:")
objects = client.list_objects(bucket_name, recursive=True)
for obj in objects:
print(f"- {obj.object_name} (size: {obj.size} bytes)")
except S3Error as e:
print(f"❌ Error with bucket {bucket_name}: {e}")
continue
if __name__ == "__main__":
reset_minio_storage()