""" Unified storage backend for DarkMedia-X Studio. Local filesystem first, Cloudflare R2 fallback for cloud/serverless mode. """ import os import sys from pathlib import Path from typing import List, Dict, Any, Optional # Try to import R2 helper try: from backend.api.r2_storage import r2_enabled, list_files, read_file as r2_read, upload_file as r2_upload _R2 = True except ImportError as e: _R2 = False print("R2 Import Error:", e) def _is_r2() -> bool: return _R2 and r2_enabled() class StoryStorage: """Abstracts story storage: local filesystem + Cloudflare R2 fallback.""" def __init__(self, stories_root: Path): self.root = stories_root # ------------------------------------------------------------------ # Read # ------------------------------------------------------------------ def read_text(self, rel_path: str) -> Optional[str]: """Read a story markdown file. Returns None if not found.""" local = self.root / rel_path if local.exists(): return local.read_text(encoding="utf-8") # Try R2 first (user preference) if _is_r2(): try: # Path on R2 is stories/{path} r2_path = f"stories/{rel_path.replace('\\', '/')}" data = r2_read(r2_path) return data.decode("utf-8") except Exception: pass return None # ------------------------------------------------------------------ # Write # ------------------------------------------------------------------ def write_text(self, rel_path: str, content: str) -> bool: """Write a story markdown file. Tries local then R2 then Blob.""" local = self.root / rel_path try: local.parent.mkdir(parents=True, exist_ok=True) local.write_text(content, encoding="utf-8") return True except Exception: pass if _is_r2(): try: r2_path = f"stories/{rel_path.replace('\\', '/')}" # We need a temp file for r2_upload or we use a bytes buffer # r2_storage expects a local path. Let's write locally if possible # or use a smarter r2_upload that handles strings. import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp: tmp.write(content) tmp_path = tmp.name r2_upload(r2_path, tmp_path, "text/markdown") os.unlink(tmp_path) return True except Exception: pass if _is_blob(): try: upload_blob(rel_path.replace("\\", "/"), content.encode("utf-8"), "text/markdown") return True except Exception: pass return False # ------------------------------------------------------------------ # List # ------------------------------------------------------------------ def list_stories(self) -> List[Dict[str, Any]]: """List all stories with metadata.""" stories: List[Dict[str, Any]] = [] seen = set() # Local scan try: if self.root.exists(): for dirpath, _, filenames in os.walk(str(self.root)): for filename in filenames: if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md": continue file_path = Path(dirpath) / filename rel_path = file_path.relative_to(self.root) story_id = str(rel_path.parent).replace("\\", "/") if story_id in seen: continue seen.add(story_id) title = filename.replace(".md", "").replace("_", " ").strip() if title.lower() in {"story", "index", "readme"}: title = story_id.replace("_", " ").strip() image_count = self._count_images(story_id) total_scenes = self._count_scenes(rel_path) processed = self._has_video(story_id) word_count = self._get_word_count(rel_path) stories.append({ "id": story_id, "title": title, "path": str(rel_path).replace("\\", "/"), "category": file_path.parent.parent.name if file_path.parent.parent != self.root else "General", "processed": processed, "image_count": image_count, "total_scenes": total_scenes, "ready": image_count >= total_scenes, "word_count": word_count, "is_empty": word_count < 20, }) except Exception as e: print(f"Error scanning local stories: {e}") # R2 Scan if _is_r2(): try: # list_files returns full keys like "stories/Canada/Le Wendigo/story.md" keys = list_files(prefix="stories/") for key in keys: if not key.endswith(".md") or "README" in key or "music_prompt.md" in key: continue parts = key.split("/") if len(parts) < 3: continue # story_id = Category/Title story_id = "/".join(parts[1:-1]) if story_id in seen: continue seen.add(story_id) filename = parts[-1] title = filename.replace(".md", "").replace("_", " ").strip() if title.lower() in {"story", "index", "readme"}: title = story_id.split("/")[-1].replace("_", " ").strip() category = parts[1] image_count = self._count_images_r2(story_id) r2_content = self.read_text(key.replace("stories/", "")) or "" r2_word_count = len(r2_content.split()) stories.append({ "id": story_id, "title": title, "path": key.replace("stories/", ""), "category": category, "processed": False, "image_count": image_count, "total_scenes": 10, "ready": image_count >= 10, "word_count": r2_word_count, "is_empty": r2_word_count < 20, }) except Exception: pass return sorted(stories, key=lambda s: s["title"]) # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _count_images(self, story_id: str) -> int: img_dir = self.root / story_id / "assets" / "images" if not img_dir.exists(): img_dir = self.root / story_id / "images" if img_dir.exists(): return len([f for f in os.listdir(str(img_dir)) if f.lower().endswith((".png", ".jpg", ".jpeg"))]) return 0 def _count_images_r2(self, story_id: str) -> int: if not _is_r2(): return 0 try: # We look for stories/{story_id}/assets/images/ prefix = f"stories/{story_id}/assets/images/" keys = list_files(prefix=prefix) return len([k for k in keys if k.lower().endswith((".png", ".jpg", ".jpeg"))]) except Exception: return 0 def _count_scenes(self, rel_path: Path) -> int: local = self.root / rel_path if local.exists(): try: content = local.read_text(encoding="utf-8") import re matches = re.findall(r"^##\s*(?:Scene|Scène)\s*\d+", content, flags=re.MULTILINE | re.IGNORECASE) return len(matches) or 10 except Exception: pass return 10 def _get_word_count(self, rel_path: Path) -> int: local = self.root / rel_path if local.exists(): try: content = local.read_text(encoding="utf-8") return len(content.split()) except Exception: pass return 0 def _has_video(self, story_id: str) -> bool: story_dir = self.root / story_id if not story_dir.exists(): return False return (story_dir / "final_video.mp4").exists() or (story_dir / f"TT_{story_id.replace(' ', '_').replace('/', '_')}_final.mp4").exists() def exists(self, rel_path: str) -> bool: local = self.root / rel_path return local.exists()