ddeeds / funscript_loader.py
sudotheworld's picture
Upload 56 files
70a50c3 verified
# funscript_loader.py – v3 (Config dataclass edition – **shim-fixed**)
"""
Load every *.funscript in ./funscripts/ and register **each
10-second slice** as an individual pattern.
Works even if the full `memory_manager` module isn’t present: a shim
falls back to *settings.patterns* used by the main app.
v3 change: magic-number constants have been moved into a small
`Config` dataclass so they can be tweaked in one place without editing
code.
"""
from __future__ import annotations
import json, re, importlib
from pathlib import Path
from statistics import mean
from dataclasses import dataclass
import unittest
@dataclass
class Config:
segment_ms: int = 10_000 # length of each slice (ms)
min_actions: int = 6 # ignore slices with fewer actions
CFG = Config()
# ------------------------------------------------------------------
# memory-manager shim (lets the loader work even if memory_manager
# isn't present and we’re just running inside the app module)
# ------------------------------------------------------------------
try:
import memory_manager as mem
except ModuleNotFoundError:
try:
settings = importlib.import_module("app").settings
except Exception:
settings = None
class _MemShim: # minimal drop-in replacement
def top_patterns(self, n: int):
if not settings:
return []
return settings.patterns[:n] if n else settings.patterns
def add_pattern(self, name: str,
triple: tuple[int, int, int],
rating: int = 10):
if not settings:
return
if any(p.get("name") == name for p in settings.patterns):
return
sp, dp, rng = triple
settings.patterns.append({
"name": name,
"sp_range": [max(0, sp - 5), min(100, sp + 5)],
"dp_range": [max(0, dp - 5), min(100, dp + 5)],
"rng": rng,
"moods": ["Imported"],
"score": rating,
})
mem = _MemShim()
# ------------------------------------------------------------------
FUNSCRIPT_DIR = (Path(__file__).parent if "__file__" in globals()
else Path.cwd()) / "funscripts"
FUNSCRIPT_DIR.mkdir(exist_ok=True)
def _slug_to_title(fn: str) -> str:
"""Turn `video_clip_01.funscript` -> 'Video Clip 01'."""
stem = Path(fn).stem
title = re.sub(r"[_\-]+", " ", stem)
return " ".join(w if w.isupper() else w.capitalize() for w in title.split())
def _stats(actions: list[dict]) -> tuple[int, int, int]:
"""Return (speed %, depth %, range %) for one slice of actions."""
if not actions:
return 50, 50, 100
positions = [a["pos"] / 10 for a in actions] # 0-100
span = max(positions) - min(positions)
depth_pct = int(mean(positions))
strokes_per_min = (len(actions) // 2) * (60_000 / CFG.segment_ms)
speed_pct = max(10, min(100, int(strokes_per_min * 2)))
range_pct = int(span)
return speed_pct, depth_pct, range_pct
def _yield_segments(actions: list[dict]):
"""Yield consecutive slices of ~segment_ms length."""
if not actions:
return
start_idx = 0
start_time = actions[0]["at"]
for i, act in enumerate(actions, 1):
if act["at"] - start_time >= CFG.segment_ms:
yield actions[start_idx:i]
start_idx = i
start_time = act["at"]
if start_idx < len(actions):
yield actions[start_idx:]
def load_all():
"""Scan FUNSCRIPT_DIR and register patterns via mem.add_pattern()."""
for fs in FUNSCRIPT_DIR.glob("*.funscript"):
try:
data = json.loads(fs.read_text())
except Exception:
continue
actions = data.get("actions", [])
if len(actions) < CFG.min_actions:
continue
for idx, segment in enumerate(_yield_segments(actions), 1):
if len(segment) < CFG.min_actions:
continue
sp, dp, rng = _stats(segment)
title = f"{_slug_to_title(fs.name)} – Seg {idx}"
mem.add_pattern(title, (sp, dp, rng), rating=10)
print("✅ Funscript import complete.")
# ------------------------------------------------------------------
# Minimal self-test
# ------------------------------------------------------------------
class _TestFunscriptLoader(unittest.TestCase):
def test_stats_empty(self):
self.assertEqual(_stats([]), (50, 50, 100))
def test_stats_basic(self):
acts = [
{"pos": 0, "at": 0},
{"pos": 1000, "at": 5000},
{"pos": 0, "at": 10_000},
{"pos": 1000, "at": 15_000},
]
sp, dp, rng = _stats(acts)
self.assertTrue(10 <= sp <= 100)
self.assertEqual(rng, 100)
def test_yield_segments(self):
acts = [
{"pos": 0, "at": 0},
{"pos": 0, "at": 5000},
{"pos": 0, "at": 10_000},
{"pos": 0, "at": 15_000},
]
segs = list(_yield_segments(acts))
self.assertEqual(len(segs), 2)
if __name__ == "__main__":
unittest.main(exit=False)
load_all()