File size: 5,234 Bytes
70a50c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# funscript_loader.py – v3  (Config dataclass edition – **shim-fixed**)
"""
Load every *.funscript in ./funscripts/ and register **each
10-second slice** as an individual pattern.
Works even if the full `memory_manager` module isn’t present: a shim
falls back to *settings.patterns* used by the main app.

v3 change: magic-number constants have been moved into a small
`Config` dataclass so they can be tweaked in one place without editing
code.
"""
from __future__ import annotations
import json, re, importlib
from pathlib import Path
from statistics import mean
from dataclasses import dataclass
import unittest

@dataclass
class Config:
    segment_ms: int = 10_000  # length of each slice (ms)
    min_actions: int = 6      # ignore slices with fewer actions

CFG = Config()

# ------------------------------------------------------------------
# memory-manager shim (lets the loader work even if memory_manager
# isn't present and we’re just running inside the app module)
# ------------------------------------------------------------------
try:
    import memory_manager as mem
except ModuleNotFoundError:
    try:
        settings = importlib.import_module("app").settings
    except Exception:
        settings = None

    class _MemShim:               # minimal drop-in replacement
        def top_patterns(self, n: int):
            if not settings:
                return []
            return settings.patterns[:n] if n else settings.patterns

        def add_pattern(self, name: str,
                        triple: tuple[int, int, int],
                        rating: int = 10):
            if not settings:
                return
            if any(p.get("name") == name for p in settings.patterns):
                return
            sp, dp, rng = triple
            settings.patterns.append({
                "name": name,
                "sp_range": [max(0, sp - 5), min(100, sp + 5)],
                "dp_range": [max(0, dp - 5), min(100, dp + 5)],
                "rng": rng,
                "moods": ["Imported"],
                "score": rating,
            })

    mem = _MemShim()

# ------------------------------------------------------------------
FUNSCRIPT_DIR = (Path(__file__).parent if "__file__" in globals()
                 else Path.cwd()) / "funscripts"
FUNSCRIPT_DIR.mkdir(exist_ok=True)


def _slug_to_title(fn: str) -> str:
    """Turn `video_clip_01.funscript` -> 'Video Clip 01'."""
    stem = Path(fn).stem
    title = re.sub(r"[_\-]+", " ", stem)
    return " ".join(w if w.isupper() else w.capitalize() for w in title.split())


def _stats(actions: list[dict]) -> tuple[int, int, int]:
    """Return (speed %, depth %, range %) for one slice of actions."""
    if not actions:
        return 50, 50, 100
    positions = [a["pos"] / 10 for a in actions]          # 0-100
    span = max(positions) - min(positions)
    depth_pct = int(mean(positions))
    strokes_per_min = (len(actions) // 2) * (60_000 / CFG.segment_ms)
    speed_pct = max(10, min(100, int(strokes_per_min * 2)))
    range_pct = int(span)
    return speed_pct, depth_pct, range_pct


def _yield_segments(actions: list[dict]):
    """Yield consecutive slices of ~segment_ms length."""
    if not actions:
        return
    start_idx = 0
    start_time = actions[0]["at"]
    for i, act in enumerate(actions, 1):
        if act["at"] - start_time >= CFG.segment_ms:
            yield actions[start_idx:i]
            start_idx = i
            start_time = act["at"]
    if start_idx < len(actions):
        yield actions[start_idx:]


def load_all():
    """Scan FUNSCRIPT_DIR and register patterns via mem.add_pattern()."""
    for fs in FUNSCRIPT_DIR.glob("*.funscript"):
        try:
            data = json.loads(fs.read_text())
        except Exception:
            continue
        actions = data.get("actions", [])
        if len(actions) < CFG.min_actions:
            continue
        for idx, segment in enumerate(_yield_segments(actions), 1):
            if len(segment) < CFG.min_actions:
                continue
            sp, dp, rng = _stats(segment)
            title = f"{_slug_to_title(fs.name)} – Seg {idx}"
            mem.add_pattern(title, (sp, dp, rng), rating=10)
    print("✅ Funscript import complete.")


# ------------------------------------------------------------------
# Minimal self-test
# ------------------------------------------------------------------
class _TestFunscriptLoader(unittest.TestCase):
    def test_stats_empty(self):
        self.assertEqual(_stats([]), (50, 50, 100))

    def test_stats_basic(self):
        acts = [
            {"pos": 0, "at": 0},
            {"pos": 1000, "at": 5000},
            {"pos": 0, "at": 10_000},
            {"pos": 1000, "at": 15_000},
        ]
        sp, dp, rng = _stats(acts)
        self.assertTrue(10 <= sp <= 100)
        self.assertEqual(rng, 100)

    def test_yield_segments(self):
        acts = [
            {"pos": 0, "at": 0},
            {"pos": 0, "at": 5000},
            {"pos": 0, "at": 10_000},
            {"pos": 0, "at": 15_000},
        ]
        segs = list(_yield_segments(acts))
        self.assertEqual(len(segs), 2)


if __name__ == "__main__":
    unittest.main(exit=False)
    load_all()