VeuReu commited on
Commit
c854a70
·
verified ·
1 Parent(s): 075d9b3

Upload 4 files

Browse files
Files changed (1) hide show
  1. scripts/initial_identity_encoding.py +340 -0
scripts/initial_identity_encoding.py ADDED
@@ -0,0 +1,340 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # identity_encoding.py (updated to use libs/*)
2
+ # Veureu — Identity Encoder (faces, voices, scenarios)
3
+ # -----------------------------------------------------------------------------
4
+ # This script replaces the original `identity_encoding.py` but **reuses**
5
+ # as much as possible the functions already present in `libs/`.
6
+ # It respects the project's path structure (identities/*, scenarios, chroma_db,
7
+ # results) and maintains the classic pipeline:
8
+ # 1) index_faces (ChromaDB)
9
+ # 2) identity_features.csv
10
+ # 3) index_voices (ChromaDB)
11
+ # 4) scenarios_descriptions.csv
12
+ # 5) index_scenarios (ChromaDB)
13
+ # -----------------------------------------------------------------------------
14
+ from __future__ import annotations
15
+ import argparse
16
+ import csv
17
+ import logging
18
+ import sys
19
+ import uuid
20
+ from dataclasses import dataclass
21
+ from pathlib import Path
22
+ from typing import Any, Dict, Iterable, List, Optional, Tuple
23
+
24
+ # ============================ LOGGING ========================================
25
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
26
+ log = logging.getLogger("identity_encoding")
27
+
28
+ # ============================ DEPENDENCIES ===================================
29
+ # ChromaDB (persistente)
30
+ try:
31
+ import chromadb
32
+ from chromadb.config import Settings # noqa: F401
33
+ except Exception as e:
34
+ chromadb = None # type: ignore
35
+ log.error("No se pudo importar chromadb: %s", e)
36
+
37
+ from libs.vision_tools_salamandra import FaceAnalyzer
38
+ from collections import Counter
39
+
40
+ # Audio: reuse get_embedding from the existing pipeline
41
+ from libs.audio_tools_ana_2 import VoiceEmbedder
42
+ from libs.vision_tools_salamandra import FaceOfImageEmbedding
43
+
44
+ # Optional
45
+ try:
46
+ import numpy as np
47
+ except Exception:
48
+ np = None # type: ignore
49
+
50
+ # ============================ UTILITIES =====================================
51
+ IMG_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
52
+ AUD_EXT = {".wav", ".mp3", ".flac", ".m4a", ".ogg"}
53
+
54
+
55
+ def list_files(root: Path, exts: Iterable[str]) -> List[Path]:
56
+ root = Path(root)
57
+ if not root.exists():
58
+ return []
59
+ return [p for p in root.rglob('*') if p.suffix.lower() in exts]
60
+
61
+
62
+ def ensure_chroma(db_dir: Path):
63
+ if chromadb is None:
64
+ raise RuntimeError("chromadb no instalado. pip install chromadb")
65
+ db_dir.mkdir(parents=True, exist_ok=True)
66
+
67
+ # Nueva forma de crear un cliente persistente
68
+ client = chromadb.Client(Settings(
69
+ chroma_db_impl="duckdb+parquet",
70
+ persist_directory=str(db_dir)
71
+ ))
72
+ return client
73
+
74
+ # ============================ 1) INDEX FACES =================================
75
+ def build_faces_index(faces_dir: Path, client, collection_name: str = "index_faces",
76
+ deepface_model: str = 'Facenet512', drop: bool = True) -> int:
77
+ # idempotency
78
+ if collection_name in [c.name for c in client.list_collections()] and drop:
79
+ client.delete_collection(name=collection_name)
80
+ col = client.get_or_create_collection(name=collection_name)
81
+
82
+ be = FaceOfImageEmbedding(deepface_model=deepface_model)
83
+ count = 0
84
+ registered_identities = set() # 👈 para no repetir nombres
85
+
86
+ for ident_dir in sorted(Path(faces_dir).iterdir() if Path(faces_dir).exists() else []):
87
+ if not ident_dir.is_dir():
88
+ continue
89
+ ident = ident_dir.name
90
+ for img_path in list_files(ident_dir, IMG_EXT):
91
+ embeddings = be.encode_image(img_path)
92
+ if embeddings is None:
93
+ log.warning("No face embedding in %s", img_path)
94
+ continue
95
+
96
+ # Aplanar para que cada embedding sea una lista de floats
97
+ for e in (embeddings if isinstance(embeddings[0], list) else [embeddings]):
98
+ uid = str(uuid.uuid4())
99
+ col.add(ids=[uid], embeddings=[e], metadatas=[{"identity": ident, "path": str(img_path)}])
100
+ count += 1
101
+ registered_identities.add(ident) # 👈 guardamos el nombre
102
+
103
+ # Mensajes finales
104
+ print("Ha acabado de crear la base de datos.")
105
+ print(f"Total de embeddings guardados: {count}")
106
+ print("Identidades registradas:")
107
+ for name in sorted(registered_identities):
108
+ print(f" - {name}")
109
+
110
+ log.info("index_faces => %d embeddings", count)
111
+ return count
112
+
113
+ # ===================== 2) IDENTITY FEATURES CSV ==============================
114
+
115
+ def aggregate_face_attributes(faces_dir: Path, out_csv: Path) -> int:
116
+ """
117
+ Procesa un directorio de caras por identidad y genera un CSV con edad y género.
118
+ Usa FaceAnalyzer para extraer atributos.
119
+ """
120
+ # Inicializa el analizador
121
+ from libs.vision_tools_salamandra import FaceAnalyzer
122
+ analyzer = FaceAnalyzer()
123
+
124
+ rows: List[Dict[str, Any]] = []
125
+
126
+ faces_dir = Path(faces_dir)
127
+ if not faces_dir.exists() or not faces_dir.is_dir():
128
+ log.error("El directorio de caras no existe: %s", faces_dir)
129
+ return 0
130
+
131
+ def most_common(lst, default="unknown"):
132
+ return Counter(lst).most_common(1)[0][0] if lst else default
133
+
134
+ # Itera sobre cada identidad
135
+ for ident_dir in sorted(faces_dir.iterdir()):
136
+ if not ident_dir.is_dir():
137
+ continue
138
+ ident = ident_dir.name
139
+ attrs: List[Dict[str, Any]] = []
140
+
141
+ log.info("Procesando identidad: %s", ident)
142
+
143
+ for img_path in sorted(list_files(ident_dir, IMG_EXT)):
144
+ try:
145
+ data = analyzer.analyze_image(str(img_path))
146
+ if data:
147
+ attrs.append(data)
148
+ except Exception as e:
149
+ log.warning("Error procesando imagen %s: %s", img_path, e)
150
+
151
+ genders = [a.get("gender", "unknown") for a in attrs]
152
+ ages = [a.get("age", "unknown") for a in attrs]
153
+
154
+ # Contexto opcional por identidad
155
+ context_txt = (faces_dir.parent / "context" / f"{ident}.txt")
156
+ identity_context = context_txt.read_text(encoding="utf-8").strip() if context_txt.exists() else ""
157
+
158
+ rows.append({
159
+ "identity": ident,
160
+ "samples": len(attrs),
161
+ "gender": most_common(genders),
162
+ "age_bucket": most_common(ages),
163
+ "identity_context": identity_context,
164
+ })
165
+
166
+ log.info("Procesados %d atributos para %s", len(attrs), ident)
167
+
168
+ # Guardar CSV
169
+ out_csv.parent.mkdir(parents=True, exist_ok=True)
170
+ with out_csv.open("w", newline='', encoding="utf-8") as f:
171
+ fieldnames = list(rows[0].keys()) if rows else ["identity", "identity_context"]
172
+ writer = csv.DictWriter(f, fieldnames=fieldnames)
173
+ writer.writeheader()
174
+ writer.writerows(rows)
175
+
176
+ log.info("CSV generado correctamente: %s", out_csv)
177
+ return len(rows)
178
+
179
+ # ============================ 3) INDEX VOICES =================================
180
+ from pydub import AudioSegment # agregar al inicio de tu archivo junto a otros imports
181
+
182
+ def build_voices_index(voices_dir: Path, client, collection_name: str = "index_voices", drop: bool = True) -> int:
183
+ if collection_name in [c.name for c in client.list_collections()] and drop:
184
+ client.delete_collection(name=collection_name)
185
+ col = client.get_or_create_collection(name=collection_name)
186
+
187
+ ve = VoiceEmbedder()
188
+ count = 0
189
+
190
+ for ident_dir in sorted(Path(voices_dir).iterdir() if Path(voices_dir).exists() else []):
191
+ if not ident_dir.is_dir():
192
+ continue
193
+ ident = ident_dir.name
194
+ for wav_path in list_files(ident_dir, AUD_EXT):
195
+ # Intentar embed directamente
196
+ try:
197
+ emb = ve.embed(wav_path)
198
+ except Exception as e:
199
+ log.warning("Error leyendo audio %s: %s. Intentando reconvertir...", wav_path, e)
200
+ # Reconversión automática a WAV PCM
201
+ try:
202
+ audio = AudioSegment.from_file(wav_path)
203
+ fixed_path = wav_path.with_name(wav_path.stem + "_fixed.wav")
204
+ audio.export(fixed_path, format="wav")
205
+ log.info("Archivo convertido a WAV compatible: %s", fixed_path)
206
+ emb = ve.embed(fixed_path)
207
+ except Exception as e2:
208
+ log.error("No se pudo generar embedding tras reconversión para %s: %s", wav_path, e2)
209
+ continue # saltar este archivo
210
+ if emb is None:
211
+ log.warning("No voice embedding en %s", wav_path)
212
+ continue
213
+ uid = str(uuid.uuid4())
214
+ col.add(ids=[uid], embeddings=[emb], metadatas=[{"identity": ident, "path": str(wav_path)}])
215
+ count += 1
216
+
217
+ log.info("index_voices => %d embeddings", count)
218
+ return count
219
+
220
+ # ============================ 4) SCENARIOS ==================================
221
+ @dataclass
222
+ class VisionClient:
223
+ provider: str = "none" # placeholder to plug in an LLM if desired
224
+
225
+ def describe(self, image_path: str, prompt: str) -> str:
226
+ return (f"Automatic description (placeholder) for {Path(image_path).name}. "
227
+ f"{prompt}")
228
+
229
+
230
+ class TextEmbedder:
231
+ """Text embeddings with Sentence-Transformers if available; fallback to TF-IDF."""
232
+ def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
233
+ self.kind = "tfidf"; self.model = None; self.vectorizer = None
234
+ try:
235
+ from sentence_transformers import SentenceTransformer
236
+ self.model = SentenceTransformer(model_name)
237
+ self.kind = "sbert"
238
+ except Exception:
239
+ from sklearn.feature_extraction.text import TfidfVectorizer
240
+ self.vectorizer = TfidfVectorizer(max_features=768)
241
+
242
+ def fit(self, texts: List[str]):
243
+ if self.vectorizer is not None:
244
+ self.vectorizer.fit(texts)
245
+
246
+ def encode(self, texts: List[str]) -> List[List[float]]:
247
+ if self.model is not None:
248
+ arr = self.model.encode(texts, convert_to_numpy=True)
249
+ return arr.astype(float).tolist()
250
+ X = self.vectorizer.transform(texts) if self.vectorizer is not None else None
251
+ return (X.toarray().astype(float).tolist() if X is not None else [[0.0]*128 for _ in texts])
252
+
253
+
254
+ def build_scenarios_descriptions(scenarios_dir: Path, out_csv: Path, vision: VisionClient,
255
+ sample_per_scenario: int = 12) -> Tuple[int, List[Dict[str, Any]]]:
256
+ rows: List[Dict[str, Any]] = []
257
+ for scen_dir in sorted(Path(scenarios_dir).iterdir() if Path(scenarios_dir).exists() else []):
258
+ if not scen_dir.is_dir():
259
+ continue
260
+ scen = scen_dir.name
261
+ descs: List[str] = []
262
+ imgs = list_files(scen_dir, IMG_EXT)[:sample_per_scenario]
263
+ for img in imgs:
264
+ d = vision.describe(str(img), prompt="Describe location, time period, lighting, and atmosphere without mentioning people or time of day.")
265
+ if d:
266
+ descs.append(d)
267
+ if not descs:
268
+ descs = [f"Scenario {scen} (no images)"]
269
+ rows.append({"scenario": scen, "descriptions": " \n".join(descs)})
270
+
271
+ out_csv.parent.mkdir(parents=True, exist_ok=True)
272
+ with out_csv.open("w", newline='', encoding="utf-8") as f:
273
+ w = csv.DictWriter(f, fieldnames=["scenario", "descriptions"])
274
+ w.writeheader(); w.writerows(rows)
275
+ log.info("scenarios_descriptions => %s", out_csv)
276
+ return len(rows), rows
277
+
278
+
279
+ def build_scenarios_index(client, rows: List[Dict[str, Any]], embedder: TextEmbedder,
280
+ collection_name: str = "index_scenarios", drop: bool = True) -> int:
281
+ texts = [r["descriptions"] for r in rows]
282
+ embedder.fit(texts)
283
+ embs = embedder.encode(texts)
284
+
285
+ if collection_name in [c.name for c in client.list_collections()] and drop:
286
+ client.delete_collection(name=collection_name)
287
+ col = client.get_or_create_collection(name=collection_name)
288
+
289
+ for r, e in zip(rows, embs):
290
+ col.add(ids=[r["scenario"]], embeddings=[e], metadatas=[{"scenario": r["scenario"]}])
291
+ log.info("index_scenarios => %d descriptions", len(rows))
292
+ return len(rows)
293
+
294
+ # ================================ CLI ========================================
295
+
296
+ def main():
297
+ ap = argparse.ArgumentParser(description="Veureu — Build identity/scenario indices and CSVs")
298
+ ap.add_argument('--faces_dir', default='identities/faces', help='Root directory of face images per identity')
299
+ ap.add_argument('--voices_dir', default='identities/voices', help='Root directory of voice clips per identity')
300
+ ap.add_argument('--scenarios_dir', default='scenarios', help='Root directory of scenario folders with images')
301
+ ap.add_argument('--db_dir', default='chroma_db', help='ChromaDB persistence directory')
302
+ ap.add_argument('--out_dir', default='results', help='Output directory for CSVs')
303
+ ap.add_argument('--drop_collections', action='store_true', help='Delete collections if they exist before rebuilding')
304
+ ap.add_argument('--deepface_model', default='Facenet512', help='DeepFace model to use as fallback')
305
+ ap.add_argument('--scenario_samples', type=int, default=12, help='Number of images per scenario to describe')
306
+
307
+ args = ap.parse_args()
308
+
309
+ faces_dir = Path(args.faces_dir)
310
+ voices_dir = Path(args.voices_dir)
311
+ print(voices_dir)
312
+ scenarios_dir = Path(args.scenarios_dir)
313
+ out_dir = Path(args.out_dir); out_dir.mkdir(parents=True, exist_ok=True)
314
+
315
+ client = ensure_chroma(Path(args.db_dir))
316
+
317
+ # 1) Faces index
318
+ build_faces_index(faces_dir, client, collection_name="index_faces", deepface_model=args.deepface_model, drop=args.drop_collections)
319
+
320
+ # 2) Identity features CSV
321
+ #id_csv = out_dir / 'identity_features.csv'
322
+ #aggregate_face_attributes(faces_dir, id_csv)
323
+
324
+ # 3) Voices index
325
+ build_voices_index(voices_dir, client, collection_name="index_voices", drop=args.drop_collections)
326
+
327
+ # 4) Scenarios descriptions
328
+ #vision = VisionClient()
329
+ #scen_csv = out_dir / 'scenarios_descriptions.csv'
330
+ #_, scen_rows = build_scenarios_descriptions(scenarios_dir, scen_csv, vision, sample_per_scenario=args.scenario_samples)
331
+
332
+ # 5) Scenarios index
333
+ #embedder = TextEmbedder()
334
+ #build_scenarios_index(client, scen_rows, embedder, collection_name="index_scenarios", drop=args.drop_collections)
335
+
336
+ log.info("✅ Identity encoding completed.")
337
+
338
+
339
+ if __name__ == '__main__' and '--video' not in sys.argv:
340
+ main()