Spaces:
Sleeping
Sleeping
| """ | |
| Unified storage backend for DarkMedia-X Studio. | |
| Local filesystem first, Cloudflare R2 fallback for cloud/serverless mode. | |
| """ | |
| import os | |
| import sys | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Optional | |
| # Try to import R2 helper | |
| try: | |
| from backend.api.r2_storage import r2_enabled, list_files, read_file as r2_read, upload_file as r2_upload | |
| _R2 = True | |
| except ImportError as e: | |
| _R2 = False | |
| print("R2 Import Error:", e) | |
| def _is_r2() -> bool: | |
| return _R2 and r2_enabled() | |
| class StoryStorage: | |
| """Abstracts story storage: local filesystem + Cloudflare R2 fallback.""" | |
| def __init__(self, stories_root: Path): | |
| self.root = stories_root | |
| # ------------------------------------------------------------------ | |
| # Read | |
| # ------------------------------------------------------------------ | |
| def read_text(self, rel_path: str) -> Optional[str]: | |
| """Read a story markdown file. Returns None if not found.""" | |
| local = self.root / rel_path | |
| if local.exists(): | |
| return local.read_text(encoding="utf-8") | |
| # Try R2 first (user preference) | |
| if _is_r2(): | |
| try: | |
| # Path on R2 is stories/{path} | |
| r2_path = f"stories/{rel_path.replace('\\', '/')}" | |
| data = r2_read(r2_path) | |
| return data.decode("utf-8") | |
| except Exception: | |
| pass | |
| return None | |
| # ------------------------------------------------------------------ | |
| # Write | |
| # ------------------------------------------------------------------ | |
| def write_text(self, rel_path: str, content: str) -> bool: | |
| """Write a story markdown file. Tries local then R2 then Blob.""" | |
| local = self.root / rel_path | |
| try: | |
| local.parent.mkdir(parents=True, exist_ok=True) | |
| local.write_text(content, encoding="utf-8") | |
| return True | |
| except Exception: | |
| pass | |
| if _is_r2(): | |
| try: | |
| r2_path = f"stories/{rel_path.replace('\\', '/')}" | |
| # We need a temp file for r2_upload or we use a bytes buffer | |
| # r2_storage expects a local path. Let's write locally if possible | |
| # or use a smarter r2_upload that handles strings. | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as tmp: | |
| tmp.write(content) | |
| tmp_path = tmp.name | |
| r2_upload(r2_path, tmp_path, "text/markdown") | |
| os.unlink(tmp_path) | |
| return True | |
| except Exception: | |
| pass | |
| if _is_blob(): | |
| try: | |
| upload_blob(rel_path.replace("\\", "/"), content.encode("utf-8"), "text/markdown") | |
| return True | |
| except Exception: | |
| pass | |
| return False | |
| # ------------------------------------------------------------------ | |
| # List | |
| # ------------------------------------------------------------------ | |
| def list_stories(self) -> List[Dict[str, Any]]: | |
| """List all stories with metadata.""" | |
| stories: List[Dict[str, Any]] = [] | |
| seen = set() | |
| # Local scan | |
| try: | |
| if self.root.exists(): | |
| for dirpath, _, filenames in os.walk(str(self.root)): | |
| for filename in filenames: | |
| if not filename.endswith(".md") or filename.startswith("README") or filename == "music_prompt.md": | |
| continue | |
| file_path = Path(dirpath) / filename | |
| rel_path = file_path.relative_to(self.root) | |
| story_id = str(rel_path.parent).replace("\\", "/") | |
| if story_id in seen: continue | |
| seen.add(story_id) | |
| title = filename.replace(".md", "").replace("_", " ").strip() | |
| if title.lower() in {"story", "index", "readme"}: | |
| title = story_id.replace("_", " ").strip() | |
| image_count = self._count_images(story_id) | |
| total_scenes = self._count_scenes(rel_path) | |
| processed = self._has_video(story_id) | |
| word_count = self._get_word_count(rel_path) | |
| stories.append({ | |
| "id": story_id, | |
| "title": title, | |
| "path": str(rel_path).replace("\\", "/"), | |
| "category": file_path.parent.parent.name if file_path.parent.parent != self.root else "General", | |
| "processed": processed, | |
| "image_count": image_count, | |
| "total_scenes": total_scenes, | |
| "ready": image_count >= total_scenes, | |
| "word_count": word_count, | |
| "is_empty": word_count < 20, | |
| }) | |
| except Exception as e: | |
| print(f"Error scanning local stories: {e}") | |
| # R2 Scan | |
| if _is_r2(): | |
| try: | |
| # list_files returns full keys like "stories/Canada/Le Wendigo/story.md" | |
| keys = list_files(prefix="stories/") | |
| for key in keys: | |
| if not key.endswith(".md") or "README" in key or "music_prompt.md" in key: | |
| continue | |
| parts = key.split("/") | |
| if len(parts) < 3: continue | |
| # story_id = Category/Title | |
| story_id = "/".join(parts[1:-1]) | |
| if story_id in seen: continue | |
| seen.add(story_id) | |
| filename = parts[-1] | |
| title = filename.replace(".md", "").replace("_", " ").strip() | |
| if title.lower() in {"story", "index", "readme"}: | |
| title = story_id.split("/")[-1].replace("_", " ").strip() | |
| category = parts[1] | |
| image_count = self._count_images_r2(story_id) | |
| r2_content = self.read_text(key.replace("stories/", "")) or "" | |
| r2_word_count = len(r2_content.split()) | |
| stories.append({ | |
| "id": story_id, | |
| "title": title, | |
| "path": key.replace("stories/", ""), | |
| "category": category, | |
| "processed": False, | |
| "image_count": image_count, | |
| "total_scenes": 10, | |
| "ready": image_count >= 10, | |
| "word_count": r2_word_count, | |
| "is_empty": r2_word_count < 20, | |
| }) | |
| except Exception: | |
| pass | |
| return sorted(stories, key=lambda s: s["title"]) | |
| # ------------------------------------------------------------------ | |
| # Helpers | |
| # ------------------------------------------------------------------ | |
| def _count_images(self, story_id: str) -> int: | |
| img_dir = self.root / story_id / "assets" / "images" | |
| if not img_dir.exists(): | |
| img_dir = self.root / story_id / "images" | |
| if img_dir.exists(): | |
| return len([f for f in os.listdir(str(img_dir)) if f.lower().endswith((".png", ".jpg", ".jpeg"))]) | |
| return 0 | |
| def _count_images_r2(self, story_id: str) -> int: | |
| if not _is_r2(): | |
| return 0 | |
| try: | |
| # We look for stories/{story_id}/assets/images/ | |
| prefix = f"stories/{story_id}/assets/images/" | |
| keys = list_files(prefix=prefix) | |
| return len([k for k in keys if k.lower().endswith((".png", ".jpg", ".jpeg"))]) | |
| except Exception: | |
| return 0 | |
| def _count_scenes(self, rel_path: Path) -> int: | |
| local = self.root / rel_path | |
| if local.exists(): | |
| try: | |
| content = local.read_text(encoding="utf-8") | |
| import re | |
| matches = re.findall(r"^##\s*(?:Scene|Scène)\s*\d+", content, flags=re.MULTILINE | re.IGNORECASE) | |
| return len(matches) or 10 | |
| except Exception: | |
| pass | |
| return 10 | |
| def _get_word_count(self, rel_path: Path) -> int: | |
| local = self.root / rel_path | |
| if local.exists(): | |
| try: | |
| content = local.read_text(encoding="utf-8") | |
| return len(content.split()) | |
| except Exception: | |
| pass | |
| return 0 | |
| def _has_video(self, story_id: str) -> bool: | |
| story_dir = self.root / story_id | |
| if not story_dir.exists(): | |
| return False | |
| return (story_dir / "final_video.mp4").exists() or (story_dir / f"TT_{story_id.replace(' ', '_').replace('/', '_')}_final.mp4").exists() | |
| def exists(self, rel_path: str) -> bool: | |
| local = self.root / rel_path | |
| return local.exists() | |