| | |
| | """ |
| | Load every *.funscript in ./funscripts/ and register **each |
| | 10-second slice** as an individual pattern. |
| | Works even if the full `memory_manager` module isn’t present: a shim |
| | falls back to *settings.patterns* used by the main app. |
| | |
| | v3 change: magic-number constants have been moved into a small |
| | `Config` dataclass so they can be tweaked in one place without editing |
| | code. |
| | """ |
| | from __future__ import annotations |
| | import json, re, importlib |
| | from pathlib import Path |
| | from statistics import mean |
| | from dataclasses import dataclass |
| | import unittest |
| |
|
| | @dataclass |
| | class Config: |
| | segment_ms: int = 10_000 |
| | min_actions: int = 6 |
| |
|
| | CFG = Config() |
| |
|
| | |
| | |
| | |
| | |
| | try: |
| | import memory_manager as mem |
| | except ModuleNotFoundError: |
| | try: |
| | settings = importlib.import_module("app").settings |
| | except Exception: |
| | settings = None |
| |
|
| | class _MemShim: |
| | def top_patterns(self, n: int): |
| | if not settings: |
| | return [] |
| | return settings.patterns[:n] if n else settings.patterns |
| |
|
| | def add_pattern(self, name: str, |
| | triple: tuple[int, int, int], |
| | rating: int = 10): |
| | if not settings: |
| | return |
| | if any(p.get("name") == name for p in settings.patterns): |
| | return |
| | sp, dp, rng = triple |
| | settings.patterns.append({ |
| | "name": name, |
| | "sp_range": [max(0, sp - 5), min(100, sp + 5)], |
| | "dp_range": [max(0, dp - 5), min(100, dp + 5)], |
| | "rng": rng, |
| | "moods": ["Imported"], |
| | "score": rating, |
| | }) |
| |
|
| | mem = _MemShim() |
| |
|
| | |
| | FUNSCRIPT_DIR = (Path(__file__).parent if "__file__" in globals() |
| | else Path.cwd()) / "funscripts" |
| | FUNSCRIPT_DIR.mkdir(exist_ok=True) |
| |
|
| |
|
| | def _slug_to_title(fn: str) -> str: |
| | """Turn `video_clip_01.funscript` -> 'Video Clip 01'.""" |
| | stem = Path(fn).stem |
| | title = re.sub(r"[_\-]+", " ", stem) |
| | return " ".join(w if w.isupper() else w.capitalize() for w in title.split()) |
| |
|
| |
|
| | def _stats(actions: list[dict]) -> tuple[int, int, int]: |
| | """Return (speed %, depth %, range %) for one slice of actions.""" |
| | if not actions: |
| | return 50, 50, 100 |
| | positions = [a["pos"] / 10 for a in actions] |
| | span = max(positions) - min(positions) |
| | depth_pct = int(mean(positions)) |
| | strokes_per_min = (len(actions) // 2) * (60_000 / CFG.segment_ms) |
| | speed_pct = max(10, min(100, int(strokes_per_min * 2))) |
| | range_pct = int(span) |
| | return speed_pct, depth_pct, range_pct |
| |
|
| |
|
| | def _yield_segments(actions: list[dict]): |
| | """Yield consecutive slices of ~segment_ms length.""" |
| | if not actions: |
| | return |
| | start_idx = 0 |
| | start_time = actions[0]["at"] |
| | for i, act in enumerate(actions, 1): |
| | if act["at"] - start_time >= CFG.segment_ms: |
| | yield actions[start_idx:i] |
| | start_idx = i |
| | start_time = act["at"] |
| | if start_idx < len(actions): |
| | yield actions[start_idx:] |
| |
|
| |
|
| | def load_all(): |
| | """Scan FUNSCRIPT_DIR and register patterns via mem.add_pattern().""" |
| | for fs in FUNSCRIPT_DIR.glob("*.funscript"): |
| | try: |
| | data = json.loads(fs.read_text()) |
| | except Exception: |
| | continue |
| | actions = data.get("actions", []) |
| | if len(actions) < CFG.min_actions: |
| | continue |
| | for idx, segment in enumerate(_yield_segments(actions), 1): |
| | if len(segment) < CFG.min_actions: |
| | continue |
| | sp, dp, rng = _stats(segment) |
| | title = f"{_slug_to_title(fs.name)} – Seg {idx}" |
| | mem.add_pattern(title, (sp, dp, rng), rating=10) |
| | print("✅ Funscript import complete.") |
| |
|
| |
|
| | |
| | |
| | |
| | class _TestFunscriptLoader(unittest.TestCase): |
| | def test_stats_empty(self): |
| | self.assertEqual(_stats([]), (50, 50, 100)) |
| |
|
| | def test_stats_basic(self): |
| | acts = [ |
| | {"pos": 0, "at": 0}, |
| | {"pos": 1000, "at": 5000}, |
| | {"pos": 0, "at": 10_000}, |
| | {"pos": 1000, "at": 15_000}, |
| | ] |
| | sp, dp, rng = _stats(acts) |
| | self.assertTrue(10 <= sp <= 100) |
| | self.assertEqual(rng, 100) |
| |
|
| | def test_yield_segments(self): |
| | acts = [ |
| | {"pos": 0, "at": 0}, |
| | {"pos": 0, "at": 5000}, |
| | {"pos": 0, "at": 10_000}, |
| | {"pos": 0, "at": 15_000}, |
| | ] |
| | segs = list(_yield_segments(acts)) |
| | self.assertEqual(len(segs), 2) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | unittest.main(exit=False) |
| | load_all() |
| |
|