File size: 5,234 Bytes
70a50c3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | # funscript_loader.py – v3 (Config dataclass edition – **shim-fixed**)
"""
Load every *.funscript in ./funscripts/ and register **each
10-second slice** as an individual pattern.
Works even if the full `memory_manager` module isn’t present: a shim
falls back to *settings.patterns* used by the main app.
v3 change: magic-number constants have been moved into a small
`Config` dataclass so they can be tweaked in one place without editing
code.
"""
from __future__ import annotations
import json, re, importlib
from pathlib import Path
from statistics import mean
from dataclasses import dataclass
import unittest
@dataclass
class Config:
segment_ms: int = 10_000 # length of each slice (ms)
min_actions: int = 6 # ignore slices with fewer actions
CFG = Config()
# ------------------------------------------------------------------
# memory-manager shim (lets the loader work even if memory_manager
# isn't present and we’re just running inside the app module)
# ------------------------------------------------------------------
try:
import memory_manager as mem
except ModuleNotFoundError:
try:
settings = importlib.import_module("app").settings
except Exception:
settings = None
class _MemShim: # minimal drop-in replacement
def top_patterns(self, n: int):
if not settings:
return []
return settings.patterns[:n] if n else settings.patterns
def add_pattern(self, name: str,
triple: tuple[int, int, int],
rating: int = 10):
if not settings:
return
if any(p.get("name") == name for p in settings.patterns):
return
sp, dp, rng = triple
settings.patterns.append({
"name": name,
"sp_range": [max(0, sp - 5), min(100, sp + 5)],
"dp_range": [max(0, dp - 5), min(100, dp + 5)],
"rng": rng,
"moods": ["Imported"],
"score": rating,
})
mem = _MemShim()
# ------------------------------------------------------------------
FUNSCRIPT_DIR = (Path(__file__).parent if "__file__" in globals()
else Path.cwd()) / "funscripts"
FUNSCRIPT_DIR.mkdir(exist_ok=True)
def _slug_to_title(fn: str) -> str:
"""Turn `video_clip_01.funscript` -> 'Video Clip 01'."""
stem = Path(fn).stem
title = re.sub(r"[_\-]+", " ", stem)
return " ".join(w if w.isupper() else w.capitalize() for w in title.split())
def _stats(actions: list[dict]) -> tuple[int, int, int]:
"""Return (speed %, depth %, range %) for one slice of actions."""
if not actions:
return 50, 50, 100
positions = [a["pos"] / 10 for a in actions] # 0-100
span = max(positions) - min(positions)
depth_pct = int(mean(positions))
strokes_per_min = (len(actions) // 2) * (60_000 / CFG.segment_ms)
speed_pct = max(10, min(100, int(strokes_per_min * 2)))
range_pct = int(span)
return speed_pct, depth_pct, range_pct
def _yield_segments(actions: list[dict]):
"""Yield consecutive slices of ~segment_ms length."""
if not actions:
return
start_idx = 0
start_time = actions[0]["at"]
for i, act in enumerate(actions, 1):
if act["at"] - start_time >= CFG.segment_ms:
yield actions[start_idx:i]
start_idx = i
start_time = act["at"]
if start_idx < len(actions):
yield actions[start_idx:]
def load_all():
"""Scan FUNSCRIPT_DIR and register patterns via mem.add_pattern()."""
for fs in FUNSCRIPT_DIR.glob("*.funscript"):
try:
data = json.loads(fs.read_text())
except Exception:
continue
actions = data.get("actions", [])
if len(actions) < CFG.min_actions:
continue
for idx, segment in enumerate(_yield_segments(actions), 1):
if len(segment) < CFG.min_actions:
continue
sp, dp, rng = _stats(segment)
title = f"{_slug_to_title(fs.name)} – Seg {idx}"
mem.add_pattern(title, (sp, dp, rng), rating=10)
print("✅ Funscript import complete.")
# ------------------------------------------------------------------
# Minimal self-test
# ------------------------------------------------------------------
class _TestFunscriptLoader(unittest.TestCase):
def test_stats_empty(self):
self.assertEqual(_stats([]), (50, 50, 100))
def test_stats_basic(self):
acts = [
{"pos": 0, "at": 0},
{"pos": 1000, "at": 5000},
{"pos": 0, "at": 10_000},
{"pos": 1000, "at": 15_000},
]
sp, dp, rng = _stats(acts)
self.assertTrue(10 <= sp <= 100)
self.assertEqual(rng, 100)
def test_yield_segments(self):
acts = [
{"pos": 0, "at": 0},
{"pos": 0, "at": 5000},
{"pos": 0, "at": 10_000},
{"pos": 0, "at": 15_000},
]
segs = list(_yield_segments(acts))
self.assertEqual(len(segs), 2)
if __name__ == "__main__":
unittest.main(exit=False)
load_all()
|