File size: 11,428 Bytes
4689c2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
import struct
from typing import Optional
import json
import os
import re
from datetime import datetime

def write_wav_text_chunk(in_path: str, out_path: str, text: str,

                         fourcc: bytes = b'json', encoding: str = 'utf-8') -> None:
    """

    Insert (or replace) a custom RIFF chunk in a WAV file to hold an arbitrary string.

    - in_path:  source WAV path

    - out_path: destination WAV path (can be the same as in_path for in-place write)

    - text:     the string to store (e.g., JSON)

    - fourcc:   4-byte chunk ID; default b'json'

    - encoding: encoding for the string payload; default 'utf-8'



    Notes:

      * Keeps all original chunks as-is; if a chunk with the same fourcc exists,

        its payload is replaced; otherwise a new chunk is appended at the end.

      * Pads the chunk to even length per RIFF rules.

      * Supports standard little-endian RIFF/WAVE (not RF64 or RIFX).

    """
    data = open(in_path, 'rb').read()
    if len(data) < 12 or data[:4] not in (b'RIFF',) or data[8:12] != b'WAVE':
        raise ValueError("Not a standard little-endian RIFF/WAVE file (RF64/RIFX not supported).")
    if len(fourcc) != 4 or not all(32 <= b <= 126 for b in fourcc):
        raise ValueError("fourcc must be 4 printable ASCII bytes (e.g., b'json').")

    payload = text.encode(encoding)

    # Parse existing chunks
    pos = 12  # after 'RIFF' + size (4+4) and 'WAVE' (4)
    n = len(data)
    chunks = []  # list[(cid: bytes, payload: bytes)]
    while pos + 8 <= n:
        cid = data[pos:pos+4]
        size = struct.unpack_from('<I', data, pos+4)[0]
        start, end = pos + 8, pos + 8 + size
        if end > n:
            raise ValueError("Corrupt WAV: chunk size exceeds file length.")
        chunks.append((cid, data[start:end]))
        pos = end + (size & 1)  # pad to even

    # Replace existing or append new
    replaced = False
    new_chunks = []
    for cid, cdata in chunks:
        if cid == fourcc and not replaced:
            new_chunks.append((cid, payload))
            replaced = True
        else:
            new_chunks.append((cid, cdata))
    if not replaced:
        new_chunks.append((fourcc, payload))  # append at the end (often after 'data')

    # Rebuild RIFF body
    out_parts = [b'WAVE']
    for cid, cdata in new_chunks:
        out_parts.append(cid)
        out_parts.append(struct.pack('<I', len(cdata)))
        out_parts.append(cdata)
        if len(cdata) & 1:
            out_parts.append(b'\x00')  # pad to even size
    body = b''.join(out_parts)
    riff = b'RIFF' + struct.pack('<I', len(body)) + body

    with open(out_path, 'wb') as f:
        f.write(riff)


def read_wav_text_chunk(path: str, fourcc: bytes = b'json', encoding: str = 'utf-8') -> Optional[str]:
    """

    Read and return the string stored in a custom RIFF chunk from a WAV file.

    Returns None if the chunk isn't present.



    - path:     WAV file path

    - fourcc:   4-byte chunk ID to look for (default b'json')

    - encoding: decoding used for the stored bytes (default 'utf-8')

    """
    data = open(path, 'rb').read()
    if len(data) < 12 or data[:4] not in (b'RIFF',) or data[8:12] != b'WAVE':
        raise ValueError("Not a standard little-endian RIFF/WAVE file (RF64/RIFX not supported).")
    if len(fourcc) != 4:
        raise ValueError("fourcc must be 4 bytes.")

    pos = 12
    n = len(data)
    while pos + 8 <= n:
        cid = data[pos:pos+4]
        size = struct.unpack_from('<I', data, pos+4)[0]
        start, end = pos + 8, pos + 8 + size
        if end > n:
            raise ValueError("Corrupt WAV: chunk size exceeds file length.")
        if cid == fourcc:
            raw = data[start:end]
            return raw.decode(encoding, errors='strict')
        pos = end + (size & 1)

    return None

def _write_mp3_text_tag(path: str, text: str, tag_key: str = "WanGP") -> None:
    try:
        from mutagen.id3 import ID3, ID3NoHeaderError, TXXX
    except Exception as exc:
        raise RuntimeError("mutagen is required for mp3 metadata") from exc
    try:
        tag = ID3(path)
    except ID3NoHeaderError:
        tag = ID3()
    for key in list(tag.keys()):
        frame = tag.get(key)
        if isinstance(frame, TXXX) and frame.desc == tag_key:
            del tag[key]
    tag.add(TXXX(encoding=3, desc=tag_key, text=[text]))
    tag.save(path)


def _read_mp3_text_tag(path: str, tag_key: str = "WanGP") -> Optional[str]:
    try:
        from mutagen.id3 import ID3, ID3NoHeaderError, TXXX, COMM
    except Exception:
        return None
    try:
        tag = ID3(path)
    except ID3NoHeaderError:
        return None
    for frame in tag.getall("TXXX"):
        if isinstance(frame, TXXX) and frame.desc == tag_key:
            if frame.text:
                return frame.text[0]
    for frame in tag.getall("COMM"):
        if isinstance(frame, COMM) and frame.desc == tag_key:
            return frame.text[0] if frame.text else None
    return None


def save_audio_metadata(path, configs):
    ext = os.path.splitext(path)[1].lower()
    payload = json.dumps(configs)
    if ext == ".mp3":
        _write_mp3_text_tag(path, payload)
    elif ext == ".wav":
        write_wav_text_chunk(path, path, payload)
    else:
        raise ValueError(f"Unsupported audio metadata format: {ext}")


def read_audio_metadata(path):
    ext = os.path.splitext(path)[1].lower()
    if ext == ".mp3":
        raw = _read_mp3_text_tag(path)
    elif ext == ".wav":
        raw = read_wav_text_chunk(path)
    else:
        return None
    if not raw:
        return None
    return json.loads(raw)


_CREATION_KEYS = (
    "creation_date",
    "creation_datetime",
    "created_at",
    "created_on",
    "creation_timestamp",
    "created_timestamp",
)
_DATE_KEY_PARTS = ("date", "time", "created", "timestamp")
_DATE_KEY_EXCLUDE = ("generation_time", "pause_seconds", "duration_seconds", "video_length")


def _parse_datetime_value(value):
    if value is None:
        return None
    if isinstance(value, datetime):
        return value
    if isinstance(value, (int, float)):
        if 1900 <= value <= 3000:
            try:
                return datetime(int(value), 1, 1)
            except Exception:
                return None
        if value <= 0:
            return None
        try:
            return datetime.fromtimestamp(float(value))
        except Exception:
            return None

    text = str(value).strip()
    if not text:
        return None
    if re.fullmatch(r"\d{10}(\.\d+)?", text) or re.fullmatch(r"\d{13}", text):
        try:
            ts = float(text)
            if ts > 1_000_000_000_000:
                ts /= 1000.0
            return datetime.fromtimestamp(ts)
        except Exception:
            pass
    if text.endswith("Z"):
        text = text[:-1] + "+00:00"
    if re.match(r"^\d{4}:\d{2}:\d{2}\s", text):
        text = text.replace(":", "-", 2)

    for fmt in (
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d %H:%M",
        "%Y-%m-%d",
        "%Y/%m/%d %H:%M:%S",
        "%Y/%m/%d",
        "%Y-%m-%d-%Hh%Mm%Ss",
        "%Y%m%d",
        "%Y",
    ):
        try:
            return datetime.strptime(text, fmt)
        except Exception:
            pass
    try:
        return datetime.fromisoformat(text)
    except Exception:
        return None


def _iter_tag_values(value):
    if value is None:
        return
    if hasattr(value, "text"):
        txt = value.text
        if isinstance(txt, (list, tuple)):
            for item in txt:
                yield item
        else:
            yield txt
        return
    if isinstance(value, (list, tuple, set)):
        for item in value:
            yield from _iter_tag_values(item)
        return
    yield value


def extract_creation_datetime_from_metadata(metadata):
    if not isinstance(metadata, dict):
        return None
    for key in _CREATION_KEYS:
        dt = _parse_datetime_value(metadata.get(key))
        if dt is not None:
            return dt

    extra_info = metadata.get("extra_info")
    if isinstance(extra_info, dict):
        for key in _CREATION_KEYS:
            dt = _parse_datetime_value(extra_info.get(key))
            if dt is not None:
                return dt

    for source in (metadata, extra_info if isinstance(extra_info, dict) else {}):
        for key, value in source.items():
            lkey = str(key).strip().lower()
            if any(part in lkey for part in _DATE_KEY_EXCLUDE):
                continue
            if not any(part in lkey for part in _DATE_KEY_PARTS):
                continue
            dt = _parse_datetime_value(value)
            if dt is not None:
                return dt
    return None


def _extract_native_audio_datetime(path):
    try:
        from mutagen import File
    except Exception:
        return None
    try:
        audio = File(path, easy=False)
    except Exception:
        return None
    tags = None if audio is None else getattr(audio, "tags", None)
    if tags is None:
        return None

    if hasattr(tags, "getall"):
        for frame_name in ("TDRC", "TDEN", "TORY", "TYER", "TDAT", "TIME"):
            try:
                frames = tags.getall(frame_name)
            except Exception:
                frames = []
            for frame in frames:
                for item in _iter_tag_values(frame):
                    dt = _parse_datetime_value(item)
                    if dt is not None:
                        return dt
        try:
            txxx_frames = tags.getall("TXXX")
        except Exception:
            txxx_frames = []
        for frame in txxx_frames:
            desc = str(getattr(frame, "desc", "")).lower()
            if not any(part in desc for part in _DATE_KEY_PARTS):
                continue
            for item in _iter_tag_values(frame):
                dt = _parse_datetime_value(item)
                if dt is not None:
                    return dt

    items = tags.items() if hasattr(tags, "items") else []
    for key, value in items:
        lkey = str(key).lower()
        if any(part in lkey for part in _DATE_KEY_EXCLUDE):
            continue
        if not any(part in lkey for part in _DATE_KEY_PARTS + ("icrd", "\xa9day", "year")):
            continue
        for item in _iter_tag_values(value):
            dt = _parse_datetime_value(item)
            if dt is not None:
                return dt
    return None


def _get_file_creation_datetime(path):
    # For uploaded files, preserving browser-provided lastModified maps naturally to mtime.
    return datetime.fromtimestamp(os.path.getmtime(path))


def resolve_audio_creation_datetime(path, wangp_metadata=None):
    metadata = wangp_metadata
    if metadata is None:
        try:
            metadata = read_audio_metadata(path)
        except Exception:
            metadata = None

    dt = extract_creation_datetime_from_metadata(metadata)
    if dt is not None:
        return dt

    dt = _extract_native_audio_datetime(path)
    if dt is not None:
        return dt

    return _get_file_creation_datetime(path)