# funscript_loader.py – v3 (Config dataclass edition – **shim-fixed**) """ Load every *.funscript in ./funscripts/ and register **each 10-second slice** as an individual pattern. Works even if the full `memory_manager` module isn’t present: a shim falls back to *settings.patterns* used by the main app. v3 change: magic-number constants have been moved into a small `Config` dataclass so they can be tweaked in one place without editing code. """ from __future__ import annotations import json, re, importlib from pathlib import Path from statistics import mean from dataclasses import dataclass import unittest @dataclass class Config: segment_ms: int = 10_000 # length of each slice (ms) min_actions: int = 6 # ignore slices with fewer actions CFG = Config() # ------------------------------------------------------------------ # memory-manager shim (lets the loader work even if memory_manager # isn't present and we’re just running inside the app module) # ------------------------------------------------------------------ try: import memory_manager as mem except ModuleNotFoundError: try: settings = importlib.import_module("app").settings except Exception: settings = None class _MemShim: # minimal drop-in replacement def top_patterns(self, n: int): if not settings: return [] return settings.patterns[:n] if n else settings.patterns def add_pattern(self, name: str, triple: tuple[int, int, int], rating: int = 10): if not settings: return if any(p.get("name") == name for p in settings.patterns): return sp, dp, rng = triple settings.patterns.append({ "name": name, "sp_range": [max(0, sp - 5), min(100, sp + 5)], "dp_range": [max(0, dp - 5), min(100, dp + 5)], "rng": rng, "moods": ["Imported"], "score": rating, }) mem = _MemShim() # ------------------------------------------------------------------ FUNSCRIPT_DIR = (Path(__file__).parent if "__file__" in globals() else Path.cwd()) / "funscripts" FUNSCRIPT_DIR.mkdir(exist_ok=True) def _slug_to_title(fn: str) -> str: """Turn `video_clip_01.funscript` -> 'Video Clip 01'.""" stem = Path(fn).stem title = re.sub(r"[_\-]+", " ", stem) return " ".join(w if w.isupper() else w.capitalize() for w in title.split()) def _stats(actions: list[dict]) -> tuple[int, int, int]: """Return (speed %, depth %, range %) for one slice of actions.""" if not actions: return 50, 50, 100 positions = [a["pos"] / 10 for a in actions] # 0-100 span = max(positions) - min(positions) depth_pct = int(mean(positions)) strokes_per_min = (len(actions) // 2) * (60_000 / CFG.segment_ms) speed_pct = max(10, min(100, int(strokes_per_min * 2))) range_pct = int(span) return speed_pct, depth_pct, range_pct def _yield_segments(actions: list[dict]): """Yield consecutive slices of ~segment_ms length.""" if not actions: return start_idx = 0 start_time = actions[0]["at"] for i, act in enumerate(actions, 1): if act["at"] - start_time >= CFG.segment_ms: yield actions[start_idx:i] start_idx = i start_time = act["at"] if start_idx < len(actions): yield actions[start_idx:] def load_all(): """Scan FUNSCRIPT_DIR and register patterns via mem.add_pattern().""" for fs in FUNSCRIPT_DIR.glob("*.funscript"): try: data = json.loads(fs.read_text()) except Exception: continue actions = data.get("actions", []) if len(actions) < CFG.min_actions: continue for idx, segment in enumerate(_yield_segments(actions), 1): if len(segment) < CFG.min_actions: continue sp, dp, rng = _stats(segment) title = f"{_slug_to_title(fs.name)} – Seg {idx}" mem.add_pattern(title, (sp, dp, rng), rating=10) print("✅ Funscript import complete.") # ------------------------------------------------------------------ # Minimal self-test # ------------------------------------------------------------------ class _TestFunscriptLoader(unittest.TestCase): def test_stats_empty(self): self.assertEqual(_stats([]), (50, 50, 100)) def test_stats_basic(self): acts = [ {"pos": 0, "at": 0}, {"pos": 1000, "at": 5000}, {"pos": 0, "at": 10_000}, {"pos": 1000, "at": 15_000}, ] sp, dp, rng = _stats(acts) self.assertTrue(10 <= sp <= 100) self.assertEqual(rng, 100) def test_yield_segments(self): acts = [ {"pos": 0, "at": 0}, {"pos": 0, "at": 5000}, {"pos": 0, "at": 10_000}, {"pos": 0, "at": 15_000}, ] segs = list(_yield_segments(acts)) self.assertEqual(len(segs), 2) if __name__ == "__main__": unittest.main(exit=False) load_all()