darkmedia-x-api / backend /api /utils /storage.py
cybermedia's picture
Upload folder using huggingface_hub
343eed9 verified
"""
Unified storage backend for DarkMedia-X Studio.
Local filesystem first, Cloudflare R2 fallback for cloud/serverless mode.
"""
import os
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional
# Try to import R2 helper
try:
from backend.api.r2_storage import r2_enabled, list_files, read_file as r2_read, upload_file as r2_upload
_R2 = True
except ImportError as e:
_R2 = False
print("R2 Import Error:", e)
def _is_r2() -> bool:
return _R2 and r2_enabled()
class StoryStorage:
"""Abstracts story storage: local filesystem + Cloudflare R2 fallback."""
def __init__(self, stories_root: Path):
self.root = stories_root
# ------------------------------------------------------------------
# Read
# ------------------------------------------------------------------
def read_text(self, rel_path: str) -> Optional[str]:
"""Read a story markdown file. Returns None if not found."""
local = self.root / rel_path
if local.exists():
return local.read_text(encoding="utf-8")
# Try R2 first (user preference)
if _is_r2():
try:
# Path on R2 is stories/{path}
r2_path = f"stories/{rel_path.replace('\\', '/')}"
data = r2_read(r2_path)
return data.decode("utf-8")
except Exception:
pass
return None
# ------------------------------------------------------------------
# Write
# ------------------------------------------------------------------
def write_text(self, rel_path: str, content: str) -> bool:
"""Write a story markdown file. Tries local then R2 then Blob."""
local = self.root / rel_path
try:
local.parent.mkdir(parents=True, exist_ok=True)
local.write_text(content, encoding="utf-8")
return True
except Exception:
pass
if _is_r2():
try:
r2_path = f"stories/{rel_path.replace('\\', '/')}"
# We need a temp file for r2_upload or we use a bytes buffer
# r2_storage expects a local path. Let's write locally if possible
# or use a smarter r2_upload that handles strings.
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp:
tmp.write(content)
tmp_path = tmp.name
r2_upload(r2_path, tmp_path, "text/markdown")
os.unlink(tmp_path)
return True
except Exception:
pass
if _is_blob():
try:
upload_blob(rel_path.replace("\\", "/"), content.encode("utf-8"), "text/markdown")
return True
except Exception:
pass
return False
# ------------------------------------------------------------------
# List
# ------------------------------------------------------------------
def list_stories(self) -> List[Dict[str, Any]]:
"""List all stories with metadata."""
stories: List[Dict[str, Any]] = []
seen = set()
# Local scan
try:
if self.root.exists():
for dirpath, _, filenames in os.walk(str(self.root)):
for filename in filenames:
if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md":
continue
file_path = Path(dirpath) / filename
rel_path = file_path.relative_to(self.root)
story_id = str(rel_path.parent).replace("\\", "/")
if story_id in seen: continue
seen.add(story_id)
title = filename.replace(".md", "").replace("_", " ").strip()
if title.lower() in {"story", "index", "readme"}:
title = story_id.replace("_", " ").strip()
image_count = self._count_images(story_id)
total_scenes = self._count_scenes(rel_path)
processed = self._has_video(story_id)
word_count = self._get_word_count(rel_path)
stories.append({
"id": story_id,
"title": title,
"path": str(rel_path).replace("\\", "/"),
"category": file_path.parent.parent.name if file_path.parent.parent != self.root else "General",
"processed": processed,
"image_count": image_count,
"total_scenes": total_scenes,
"ready": image_count >= total_scenes,
"word_count": word_count,
"is_empty": word_count < 20,
})
except Exception as e:
print(f"Error scanning local stories: {e}")
# R2 Scan
if _is_r2():
try:
# list_files returns full keys like "stories/Canada/Le Wendigo/story.md"
keys = list_files(prefix="stories/")
for key in keys:
if not key.endswith(".md") or "README" in key or "music_prompt.md" in key:
continue
parts = key.split("/")
if len(parts) < 3: continue
# story_id = Category/Title
story_id = "/".join(parts[1:-1])
if story_id in seen: continue
seen.add(story_id)
filename = parts[-1]
title = filename.replace(".md", "").replace("_", " ").strip()
if title.lower() in {"story", "index", "readme"}:
title = story_id.split("/")[-1].replace("_", " ").strip()
category = parts[1]
image_count = self._count_images_r2(story_id)
r2_content = self.read_text(key.replace("stories/", "")) or ""
r2_word_count = len(r2_content.split())
stories.append({
"id": story_id,
"title": title,
"path": key.replace("stories/", ""),
"category": category,
"processed": False,
"image_count": image_count,
"total_scenes": 10,
"ready": image_count >= 10,
"word_count": r2_word_count,
"is_empty": r2_word_count < 20,
})
except Exception:
pass
return sorted(stories, key=lambda s: s["title"])
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _count_images(self, story_id: str) -> int:
img_dir = self.root / story_id / "assets" / "images"
if not img_dir.exists():
img_dir = self.root / story_id / "images"
if img_dir.exists():
return len([f for f in os.listdir(str(img_dir)) if f.lower().endswith((".png", ".jpg", ".jpeg"))])
return 0
def _count_images_r2(self, story_id: str) -> int:
if not _is_r2():
return 0
try:
# We look for stories/{story_id}/assets/images/
prefix = f"stories/{story_id}/assets/images/"
keys = list_files(prefix=prefix)
return len([k for k in keys if k.lower().endswith((".png", ".jpg", ".jpeg"))])
except Exception:
return 0
def _count_scenes(self, rel_path: Path) -> int:
local = self.root / rel_path
if local.exists():
try:
content = local.read_text(encoding="utf-8")
import re
matches = re.findall(r"^##\s*(?:Scene|Scène)\s*\d+", content, flags=re.MULTILINE | re.IGNORECASE)
return len(matches) or 10
except Exception:
pass
return 10
def _get_word_count(self, rel_path: Path) -> int:
local = self.root / rel_path
if local.exists():
try:
content = local.read_text(encoding="utf-8")
return len(content.split())
except Exception:
pass
return 0
def _has_video(self, story_id: str) -> bool:
story_dir = self.root / story_id
if not story_dir.exists():
return False
return (story_dir / "final_video.mp4").exists() or (story_dir / f"TT_{story_id.replace(' ', '_').replace('/', '_')}_final.mp4").exists()
def exists(self, rel_path: str) -> bool:
local = self.root / rel_path
return local.exists()