Spaces:
Sleeping
Sleeping
| """ | |
| core/consultant.py | |
| ------------------ | |
| Multi-modal RAG pipeline using SPECIALIST HuggingFace models per task. | |
| MODEL ROSTER | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TASK MODEL WHY | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| Embeddings all-MiniLM-L6-v2 Fast, accurate semantic search | |
| Strategic Q&A Qwen/Qwen2.5-7B-Instruct Free tier, excellent reasoning | |
| Creative Writing microsoft/Phi-3.5-mini-instruct Free tier, strong creative output | |
| Poster / Image Gen black-forest-labs/FLUX.1-schnell State-of-the-art textβimage, fast | |
| Audio Narration TTSEngine (SpeechT5 local) β HF API fallback chain | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| Audio runs locally via core/tts_engine.py (SpeechT5 + HiFi-GAN). | |
| If unavailable, falls back to HF API: mms-tts-eng β VITS β fastspeech2. | |
| All other inference = HTTPS calls to HuggingFace API. | |
| """ | |
| import base64 | |
| import io | |
| import logging | |
| import os | |
| import re | |
| import textwrap | |
| import time | |
| from pathlib import Path | |
| import numpy as np | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| logger = logging.getLogger(__name__) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL REGISTRY | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MODELS = { | |
| "embed": "sentence-transformers/all-MiniLM-L6-v2", | |
| "strategy": "Qwen/Qwen2.5-7B-Instruct", | |
| "creative": "microsoft/Phi-3.5-mini-instruct", | |
| "image": "black-forest-labs/FLUX.1-schnell", | |
| "video": "damo-vilab/text-to-video-ms-1.7b", | |
| "audio": "facebook/mms-tts-eng", | |
| } | |
| # ββ RAG constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| FAISS_INDEX_DIR = Path("data/faiss_index") | |
| CHUNK_SIZE = 400 | |
| CHUNK_OVERLAP = 50 | |
| TOP_K = 5 | |
| EMBED_BATCH = 32 | |
| MAX_INPUT_CHARS = 80_000 | |
| # ββ Audio constants βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| AUDIO_SAMPLE_RATE = 16_000 # default sample rate (SpeechT5 native) | |
| AUDIO_MAX_CHUNK = 250 # chars per chunk β matches SpeechT5 max_chunk_chars | |
| AUDIO_SILENCE_SHORT = 0.20 # seconds of silence between sentences | |
| # ββ HF API fallback chain (used only when local TTSEngine fails) βββββββββββββββ | |
| AUDIO_MODEL_CHAIN = [ | |
| "facebook/mms-tts-eng", | |
| "espnet/kan-bayashi_ljspeech_vits", | |
| "facebook/fastspeech2-en-ljspeech", | |
| ] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 1. TEXT CHUNKING | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]: | |
| """Split text into overlapping chunks, respecting sentence boundaries.""" | |
| if len(text) > MAX_INPUT_CHARS: | |
| logger.warning("Truncating input: %d β %d chars", len(text), MAX_INPUT_CHARS) | |
| text = text[:MAX_INPUT_CHARS] | |
| text = re.sub(r"\n{3,}", "\n\n", text).strip() | |
| if not text: | |
| return [] | |
| sentences = re.split(r"(?<=[.!?])\s+", text) | |
| chunks: list[str] = [] | |
| current = "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| if len(current) + len(sentence) + 1 > chunk_size and current: | |
| chunks.append(current.strip()) | |
| # Keep overlap: last N chars of current chunk | |
| current = current[-overlap:].strip() + " " + sentence if overlap else sentence | |
| else: | |
| current = f"{current} {sentence}".strip() if current else sentence | |
| if current.strip(): | |
| chunks.append(current.strip()) | |
| return chunks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 2. EMBEDDINGS (HuggingFace Inference API β all-MiniLM-L6-v2) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def embed_texts(texts: list[str], hf_token: str) -> "np.ndarray": | |
| """Embed a list of texts via HF Inference API. Returns float32 (N, 384) array.""" | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(model=MODELS["embed"], token=hf_token) | |
| all_vecs: list[list[float]] = [] | |
| for i in range(0, len(texts), EMBED_BATCH): | |
| batch = texts[i : i + EMBED_BATCH] | |
| try: | |
| result = client.feature_extraction(batch) | |
| # result shape: (batch, seq, dim) or (batch, dim) β reduce to (batch, dim) | |
| arr = np.array(result, dtype=np.float32) | |
| if arr.ndim == 3: | |
| arr = arr.mean(axis=1) | |
| all_vecs.extend(arr.tolist()) | |
| except Exception as exc: | |
| logger.error("Embedding batch %d failed: %s", i // EMBED_BATCH, exc) | |
| # Fallback: zero vectors so index isn't broken | |
| all_vecs.extend([[0.0] * 384] * len(batch)) | |
| return np.array(all_vecs, dtype=np.float32) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 3. FAISS VECTOR STORE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FAISSStore: | |
| """Persistent FAISS flat-L2 index with HF embedding.""" | |
| def __init__(self, index_dir: Path = FAISS_INDEX_DIR, hf_token: str = "") -> None: | |
| self.index_dir = Path(index_dir) | |
| self.hf_token = hf_token | |
| self.index_path = self.index_dir / "index.faiss" | |
| self.texts_path = self.index_dir / "texts.npy" | |
| self._index = None | |
| self._chunks: list[str] = [] | |
| self.index_dir.mkdir(parents=True, exist_ok=True) | |
| # ββ Embedding helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def embed(self, texts: list[str]) -> "np.ndarray": | |
| return embed_texts(texts, self.hf_token) | |
| # ββ Persistence ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load(self) -> None: | |
| """Load index and texts from disk if they exist.""" | |
| try: | |
| import faiss | |
| if self.index_path.exists() and self.texts_path.exists(): | |
| self._index = faiss.read_index(str(self.index_path)) | |
| self._chunks = np.load(str(self.texts_path), allow_pickle=True).tolist() | |
| logger.info("FAISSStore loaded: %d vectors", self._index.ntotal) | |
| except Exception as exc: | |
| logger.warning("FAISSStore load failed (%s) β starting fresh", exc) | |
| self._index, self._chunks = None, [] | |
| def save(self) -> None: | |
| """Persist index and texts to disk.""" | |
| try: | |
| import faiss | |
| if self._index is not None: | |
| faiss.write_index(self._index, str(self.index_path)) | |
| np.save(str(self.texts_path), np.array(self._chunks, dtype=object)) | |
| except Exception as exc: | |
| logger.warning("FAISSStore save failed: %s", exc) | |
| # ββ Indexing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_texts(self, texts: list[str]) -> None: | |
| """Embed and add texts to the index.""" | |
| import faiss | |
| if not texts: | |
| return | |
| vecs = self.embed(texts) | |
| dim = vecs.shape[1] | |
| if self._index is None: | |
| self._index = faiss.IndexFlatL2(dim) | |
| self._index.add(vecs) | |
| self._chunks.extend(texts) | |
| self.save() | |
| logger.info("FAISSStore: added %d texts, total=%d", len(texts), self._index.ntotal) | |
| # ββ Search βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def search(self, query: str, top_k: int = TOP_K) -> list[str]: | |
| if self._index is None or self._index.ntotal == 0: | |
| return [] | |
| q_vec = self.embed([query]) | |
| _, idxs = self._index.search(q_vec, min(top_k, self._index.ntotal)) | |
| return [self._chunks[i] for i in idxs[0] if i >= 0] | |
| def clear(self) -> None: | |
| self._index, self._chunks = None, [] | |
| for p in [self.index_path, self.texts_path]: | |
| if p.exists(): | |
| p.unlink() | |
| def is_ready(self) -> bool: | |
| return self._index is not None and self._index.ntotal > 0 | |
| def vector_count(self) -> int: | |
| return self._index.ntotal if self._index else 0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4. LLM INFERENCE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _chat_completion( | |
| messages: list[dict], | |
| hf_token: str, | |
| model_key: str, | |
| max_tokens: int = 1024, | |
| temperature: float = 0.4, | |
| ) -> str: | |
| from huggingface_hub import InferenceClient | |
| fallbacks = { | |
| "strategy": [ | |
| "Qwen/Qwen2.5-7B-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| "microsoft/Phi-3.5-mini-instruct", | |
| ], | |
| "creative": [ | |
| "microsoft/Phi-3.5-mini-instruct", | |
| "Qwen/Qwen2.5-7B-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| ], | |
| } | |
| candidates = fallbacks.get(model_key, [MODELS[model_key]]) | |
| last_exc = None | |
| for model_id in candidates: | |
| try: | |
| logger.info("Trying model: %s", model_id) | |
| client = InferenceClient(model=model_id, token=hf_token) | |
| response = client.chat_completion( | |
| messages = messages, | |
| max_tokens = max_tokens, | |
| temperature = temperature, | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as exc: | |
| logger.warning("Model %s failed: %s β trying next fallbackβ¦", model_id, exc) | |
| last_exc = exc | |
| time.sleep(1) | |
| return ( | |
| f"β οΈ **All models failed.** Last error: {last_exc}\n\n" | |
| "Please check:\n" | |
| "- Your HuggingFace token is valid (huggingface.co/settings/tokens)\n" | |
| "- You have internet connectivity\n" | |
| "- Try refreshing your token or upgrading to HF PRO" | |
| ) | |
| # ββ 4a. Strategic Q&A ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def call_strategy_llm( | |
| question: str, context_chunks: list[str], hf_token: str, | |
| max_tokens: int = 1024, temperature: float = 0.4, | |
| ) -> str: | |
| context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No documents indexed yet." | |
| system_msg = textwrap.dedent(""" | |
| You are an expert ESG strategic consultant for SPJIMR | |
| (SP Jain Institute of Management & Research), Mumbai. | |
| You specialise in campus sustainability, SDG alignment, circular economy, | |
| and ESG benchmarking for educational institutions. | |
| Always cite specific data points from the provided context. | |
| Structure every response with these four sections: | |
| 1. π Key Findings | |
| 2. π― Strategic Recommendations | |
| 3. π Implementation Roadmap | |
| 4. β οΈ Risks & Mitigation | |
| """).strip() | |
| user_msg = f"Context from SPJIMR sustainability documents:\n\n{context}\n\nQuestion: {question}" | |
| return _chat_completion( | |
| messages=[{"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}], | |
| hf_token=hf_token, model_key="strategy", max_tokens=max_tokens, temperature=temperature, | |
| ) | |
| # ββ 4b. Creative Writing βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _CREATIVE_SYSTEM = textwrap.dedent(""" | |
| You are a creative director and sustainability communications expert for SPJIMR, | |
| SP Jain Institute of Management & Research, Mumbai. | |
| You craft compelling, data-driven ESG communications for Indian higher education audiences. | |
| Always ground your output in the provided sustainability context. | |
| """).strip() | |
| _MODE_INSTRUCTIONS: dict[str, str] = { | |
| "poster": ( | |
| "Generate a detailed FLUX image generation prompt for a sustainability poster. " | |
| "Include: visual style, color palette, key visual elements, mood, and typography guidance. " | |
| "Make it striking, professional, and specific to SPJIMR's campus sustainability story. " | |
| "Output ONLY the image prompt β no explanation." | |
| ), | |
| "video": ( | |
| "Write a cinematic director's brief for a 30β60 second sustainability video. " | |
| "Include: opening shot, narrative arc, key scenes, voiceover tone, music mood, and closing CTA. " | |
| "Ground it in SPJIMR's actual sustainability achievements from the context." | |
| ), | |
| "social": ( | |
| "Create a complete social media content package including: " | |
| "a compelling caption (platform-appropriate length), 10β15 relevant hashtags, " | |
| "a visual brief describing the ideal image/graphic, and a CTA. " | |
| "Make it authentic, data-grounded, and optimised for engagement." | |
| ), | |
| "audio_script": ( | |
| "Write a polished narration script for audio/podcast production. " | |
| "Include natural speech patterns, emphasis cues [EMPHASIS], and pause markers [PAUSE]. " | |
| "Match the requested tone and duration. Ground facts in the provided context. " | |
| "Output ONLY the script β no stage directions or scene headings." | |
| ), | |
| } | |
| def call_creative_llm( | |
| brief: str, context_chunks: list[str], hf_token: str, | |
| mode: str = "poster", max_tokens: int = 700, | |
| ) -> str: | |
| context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No context available." | |
| instruction = _MODE_INSTRUCTIONS.get(mode, _MODE_INSTRUCTIONS["poster"]) | |
| user_msg = ( | |
| f"SPJIMR Sustainability Context:\n{context}\n\n" | |
| f"Creative Brief:\n{brief}\n\n" | |
| f"Task:\n{instruction}" | |
| ) | |
| return _chat_completion( | |
| messages=[{"role": "system", "content": _CREATIVE_SYSTEM}, {"role": "user", "content": user_msg}], | |
| hf_token=hf_token, model_key="creative", max_tokens=max_tokens, temperature=0.8, | |
| ) | |
| # ββ 4c. Image Generation βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_image(prompt: str, hf_token: str, width: int = 1024, height: int = 1024) -> bytes | None: | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(model=MODELS["image"], token=hf_token) | |
| enriched = f"{prompt}, sustainability theme, green campus, professional quality, high resolution, photorealistic" | |
| image = client.text_to_image(enriched, width=width, height=height) | |
| buf = io.BytesIO() | |
| image.save(buf, format="PNG") | |
| return buf.getvalue() | |
| except Exception as exc: | |
| logger.error("Image generation error: %s", exc) | |
| return None | |
| # ββ 4e. Video Generation βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4d. Audio Narration | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # | |
| # PRIMARY: core/tts_engine.py β local SpeechT5 + HiFi-GAN via Transformers | |
| # β’ High quality, dual-speaker podcast support, fully offline | |
| # β’ Lazy-loaded; model downloads once then cached | |
| # | |
| # FALLBACK: HuggingFace Inference API chain (no local deps required) | |
| # facebook/mms-tts-eng β espnet VITS β facebook/fastspeech2 | |
| # | |
| # generate_audio() tries primary first; if it raises, walks the API chain. | |
| # Raises RuntimeError with the real reason if everything fails so the UI | |
| # can show an actionable message instead of a generic fallback prompt. | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _split_audio_chunks(text: str, max_chars: int = AUDIO_MAX_CHUNK) -> list[str]: | |
| """Sentence-boundary chunking with word-level hard-split for long sentences.""" | |
| def hard_split(sentence: str, limit: int) -> list[str]: | |
| parts, current = [], "" | |
| for word in sentence.split(): | |
| candidate = f"{current} {word}".strip() if current else word | |
| if len(candidate) > limit and current: | |
| parts.append(current) | |
| current = word | |
| else: | |
| current = candidate | |
| if current: | |
| parts.append(current) | |
| return parts or [sentence] | |
| sentences = re.split(r"(?<=[.!?])\s+", text) | |
| chunks, current = [], "" | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| subs = hard_split(sentence, max_chars) if len(sentence) > max_chars else [sentence] | |
| for sub in subs: | |
| if len(current) + len(sub) + 1 > max_chars and current: | |
| chunks.append(current.strip()) | |
| current = sub | |
| else: | |
| current = f"{current} {sub}".strip() if current else sub | |
| if current.strip(): | |
| chunks.append(current.strip()) | |
| return chunks | |
| def _audio_bytes_to_numpy(raw: bytes) -> tuple[np.ndarray, int]: | |
| """Decode audio bytes β (float32 array, sample_rate). Tries soundfile then stdlib wave.""" | |
| try: | |
| import soundfile as sf | |
| audio, sr = sf.read(io.BytesIO(raw), dtype="float32") | |
| if audio.ndim > 1: | |
| audio = audio.mean(axis=1) | |
| return audio, sr | |
| except Exception as sf_exc: | |
| pass | |
| try: | |
| import wave, struct | |
| with wave.open(io.BytesIO(raw)) as wf: | |
| sr = wf.getframerate() | |
| n_frames = wf.getnframes() | |
| n_ch = wf.getnchannels() | |
| sampwidth = wf.getsampwidth() | |
| frames = wf.readframes(n_frames) | |
| fmt = f"<{n_frames * n_ch}{({1:'b',2:'h',4:'i'}).get(sampwidth,'h')}" | |
| maxval = float(2 ** (8 * sampwidth - 1)) | |
| pcm = np.array(struct.unpack(fmt, frames), dtype=np.float32) / maxval | |
| if n_ch > 1: | |
| pcm = pcm.reshape(-1, n_ch).mean(axis=1) | |
| return pcm, sr | |
| except Exception as wave_exc: | |
| raise RuntimeError(f"Audio decode failed β sf: {sf_exc} | wave: {wave_exc}") | |
| def _numpy_to_wav_bytes(audio: np.ndarray, sample_rate: int) -> bytes: | |
| """Normalise float32 array β PCM-16 WAV bytes.""" | |
| max_val = np.abs(audio).max() | |
| if max_val > 1e-6: | |
| audio = audio / max_val * 0.95 | |
| try: | |
| import soundfile as sf | |
| buf = io.BytesIO() | |
| sf.write(buf, audio, sample_rate, format="WAV", subtype="PCM_16") | |
| buf.seek(0) | |
| return buf.read() | |
| except Exception: | |
| import wave, struct | |
| pcm = (audio * 32767).astype(np.int16) | |
| buf = io.BytesIO() | |
| with wave.open(buf, "wb") as wf: | |
| wf.setnchannels(1) | |
| wf.setsampwidth(2) | |
| wf.setframerate(sample_rate) | |
| wf.writeframes(struct.pack(f"<{len(pcm)}h", *pcm)) | |
| buf.seek(0) | |
| return buf.read() | |
| # def _generate_audio_local(script: str) -> bytes: | |
| # """ | |
| # Primary path: synthesise via local TTSEngine (SpeechT5). | |
| # Raises any exception so the caller can fall back to the API chain. | |
| # """ | |
| # # 1. Detect if it's a podcast BEFORE cleaning tags | |
| # is_podcast = bool(re.search(r"\[(HOST|GUEST)\]", script, re.IGNORECASE)) | |
| # if is_podcast: | |
| # # Removes [PAUSE], [EMPHASIS], etc., but keeps [HOST] and [GUEST] | |
| # clean = re.sub(r"\[(?!(HOST|GUEST)\]).*?\]", " ", script, flags=re.IGNORECASE) | |
| # else: | |
| # clean = re.sub(r"\[.*?\]", " ", script) | |
| # clean = re.sub(r"\s{2,}", " ", clean).strip() | |
| # try: | |
| # from core.tts_engine import TTSEngine, TTSConfig, TTSBackend | |
| # engine = TTSEngine(TTSConfig( | |
| # backend = TTSBackend.SPEECHT5, | |
| # max_chunk_chars= AUDIO_MAX_CHUNK, | |
| # sample_rate = AUDIO_SAMPLE_RATE, )) | |
| # if is_podcast: | |
| # logger.info("Routing to synthesize_podcast (Dual-Speaker)") | |
| # return engine.synthesize_podcast(clean) | |
| # else: | |
| # logger.info("Routing to synthesize (Solo Narration)") | |
| # return engine.synthesize(clean) | |
| # except Exception as local_exc: | |
| # # Fallback to API (Note: API only supports Solo) | |
| # logger.warning("Local engine failed, falling back to API (Solo only)") | |
| # # We must strip ALL tags for the API fallback | |
| # api_clean = re.sub(r"\[.*?\]", " ", clean) | |
| # return _generate_audio_api(api_clean, hf_token) | |
| def _generate_audio_local(script: str) -> bytes: | |
| """ | |
| Primary path: synthesise via local TTSEngine (SpeechT5). | |
| FIX: Forces speaker tags to the start of the line to trigger voice switching. | |
| """ | |
| import re | |
| from core.tts_engine import TTSEngine, TTSConfig, TTSBackend | |
| # 1. CLEANUP: Remove instruction tags (like [PAUSE], [EMPHASIS]) | |
| # but PROTECT [HOST] and [GUEST] tags using negative lookahead. | |
| # This prevents the engine from literally speaking the word "PAUSE". | |
| clean = re.sub(r"\[(?!(?:HOST|GUEST)\]).*?\]", "", script, flags=re.IGNORECASE) | |
| # 2. THE CRITICAL FIX: Force [HOST] and [GUEST] to start on a NEW LINE. | |
| # Your engine uses re.match(r'^\[HOST\]'), which ONLY works if the tag | |
| # is at the very beginning of a line. This replaces the tag with | |
| # a newline + the tag. | |
| clean = re.sub(r"(\[(?:HOST|GUEST)\])", r"\n\1", clean, flags=re.IGNORECASE) | |
| # 3. Final whitespace cleanup (collapsing multiple spaces into one) | |
| clean = re.sub(r"\s{2,}", " ", clean).strip() | |
| # 4. Check if we should trigger the Multi-Speaker logic | |
| is_podcast = bool(re.search(r"\[(HOST|GUEST)\]", clean, re.IGNORECASE)) | |
| # 5. Initialize the Engine with your constants | |
| engine = TTSEngine(TTSConfig( | |
| backend = TTSBackend.SPEECHT5, | |
| max_chunk_chars = 250, # Matches your TTSConfig default | |
| sample_rate = 16000, | |
| )) | |
| # 6. ROUTING: Call the specific podcast method if tags are present | |
| if is_podcast: | |
| logger.info("β SUCCESS: Routing to synthesize_podcast (Dual-Speaker Mode)") | |
| return engine.synthesize_podcast(clean) | |
| else: | |
| logger.info("βΉοΈ INFO: Routing to synthesize (Solo Narration Mode)") | |
| return engine.synthesize(clean) | |
| # 4. Routing to the correct engine method | |
| if is_podcast: | |
| logger.info("Routing to synthesize_podcast (Dual-Speaker)") | |
| return engine.synthesize_podcast(clean) | |
| else: | |
| logger.info("Routing to synthesize (Solo Narration)") | |
| return engine.synthesize(clean) | |
| def _generate_audio_api(script: str, hf_token: str) -> bytes: | |
| """ | |
| Fallback path: HuggingFace Inference API chain. | |
| Tries each model in AUDIO_MODEL_CHAIN until one succeeds. | |
| Raises RuntimeError if all fail. | |
| """ | |
| from huggingface_hub import InferenceClient | |
| model_errors: list[str] = [] | |
| chunks = _split_audio_chunks(script, max_chars=400) # HF API supports longer chunks | |
| for model_id in AUDIO_MODEL_CHAIN: | |
| short = model_id.split("/")[-1] | |
| logger.info("generate_audio API fallback: trying %s", model_id) | |
| all_audio: list[np.ndarray] = [] | |
| sample_rate = AUDIO_SAMPLE_RATE | |
| chunk_errors = 0 | |
| for i, chunk in enumerate(chunks): | |
| chunk = chunk.strip() | |
| if not chunk: | |
| continue | |
| try: | |
| client = InferenceClient(model=model_id, token=hf_token) | |
| result = client.text_to_speech(chunk) | |
| raw = bytes(result) if isinstance(result, (bytes, bytearray)) else result.read() | |
| if len(raw) < 100: | |
| raise RuntimeError(f"Response too small ({len(raw)} bytes)") | |
| audio_np, sr = _audio_bytes_to_numpy(raw) | |
| sample_rate = sr | |
| all_audio.append(audio_np) | |
| all_audio.append(np.zeros(int(sr * AUDIO_SILENCE_SHORT), dtype=np.float32)) | |
| logger.info(" [%s] chunk %d/%d ok", short, i + 1, len(chunks)) | |
| except Exception as exc: | |
| logger.warning(" [%s] chunk %d/%d failed: %s", short, i + 1, len(chunks), exc) | |
| chunk_errors += 1 | |
| if all_audio and chunk_errors < len(chunks): | |
| logger.info("generate_audio: success via API model %s", short) | |
| return _numpy_to_wav_bytes(np.concatenate(all_audio), sample_rate) | |
| model_errors.append(f"{short}: {chunk_errors}/{len(chunks)} chunks failed") | |
| raise RuntimeError( | |
| "All HF API TTS models failed. " + " | ".join(model_errors) + "\n" | |
| "Check: HF token validity, internet connectivity, or HF PRO quota." | |
| ) | |
| def generate_audio(script: str, hf_token: str) -> bytes: | |
| """ | |
| Convert a narration script to speech. | |
| Strategy: | |
| 1. Try local TTSEngine (SpeechT5) β best quality, works offline | |
| 2. Fall back to HF Inference API chain if local engine fails | |
| Strips [PAUSE] / [EMPHASIS] cue tags before synthesis. | |
| Raises RuntimeError with full details if both paths fail. | |
| """ | |
| # Clean cue tags inserted by the creative LLM | |
| clean = re.sub(r"\[.*?\]", " ", script) | |
| clean = re.sub(r"\s{2,}", " ", clean).strip() | |
| if not clean: | |
| raise ValueError("Audio script is empty after cleaning cue tags.") | |
| logger.info("generate_audio: %d chars β trying local TTSEngine firstβ¦", len(clean)) | |
| # ββ Path 1: local SpeechT5 engine ββββββββββββββββββββββββββββββββββββββββ | |
| local_exc: Exception | None = None # must be defined before try so it's in scope for Path 2 | |
| try: | |
| result = _generate_audio_local(clean) | |
| logger.info("generate_audio: local TTSEngine succeeded") | |
| return result | |
| except Exception as _exc: | |
| local_exc = _exc # capture before the except block deletes the binding | |
| logger.warning( | |
| "generate_audio: local TTSEngine failed (%s) β falling back to HF APIβ¦", | |
| local_exc, | |
| ) | |
| # ββ Path 2: HF Inference API chain ββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| result = _generate_audio_api(clean, hf_token) | |
| logger.info("generate_audio: HF API fallback succeeded") | |
| return result | |
| except RuntimeError as api_exc: | |
| raise RuntimeError( | |
| f"Audio generation failed on both paths.\n" | |
| f"Local TTSEngine: {local_exc}\n" | |
| f"HF API chain: {api_exc}" | |
| ) from api_exc | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 4e. Video Generation | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| VIDEO_MODEL_CHAIN = [ | |
| "BestWishYsh/Helios-Base", | |
| "Wan-AI/Wan2.2-T2V-A14B-Diffusers", | |
| "damo-vilab/text-to-video-ms-1.7b", | |
| ] | |
| def generate_video( | |
| prompt: str, hf_token: str, num_frames: int = 16, | |
| timeout: int = 180, status_cb=None, | |
| ) -> bytes | None: | |
| from huggingface_hub import InferenceClient | |
| def _status(msg: str): | |
| logger.info(msg) | |
| if status_cb: | |
| try: status_cb(msg) | |
| except Exception: pass | |
| enriched = f"{prompt.strip()}, green campus, sustainability, smooth cinematic motion, vibrant colors" | |
| for model_id in VIDEO_MODEL_CHAIN: | |
| short = model_id.split("/")[1] | |
| _status(f"Trying {short}β¦") | |
| for attempt in range(1, 4): | |
| try: | |
| _status(f"Attempt {attempt}/3 β sending to {short} (timeout {timeout}s)β¦") | |
| client = InferenceClient(model=model_id, token=hf_token, timeout=timeout) | |
| result = client.text_to_video(enriched) | |
| if isinstance(result, (bytes, bytearray)): | |
| raw = bytes(result) | |
| elif hasattr(result, "read"): | |
| raw = result.read() | |
| else: | |
| try: | |
| import imageio, tempfile, os as _os | |
| frames = [np.array(f) for f in result] | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| writer = imageio.get_writer(tmp.name, fps=8) | |
| for f in frames: writer.append_data(f) | |
| writer.close() | |
| raw = open(tmp.name, "rb").read() | |
| _os.unlink(tmp.name) | |
| except Exception as conv_exc: | |
| logger.warning("Could not convert result type %s: %s", type(result), conv_exc) | |
| raw = b"" | |
| if len(raw) > 500: | |
| _status(f"β Got {len(raw):,} bytes from {short}") | |
| return raw | |
| logger.warning("%s returned only %d bytes β skipping", short, len(raw)) | |
| break | |
| except Exception as exc: | |
| err_str = str(exc) | |
| if "503" in err_str or "loading" in err_str.lower(): | |
| wait = 40 | |
| _status(f"β³ {short} is loading, waiting {wait}sβ¦") | |
| time.sleep(wait) | |
| elif "401" in err_str or "403" in err_str: | |
| _status("β Invalid HF token β please check your token in the sidebar.") | |
| return None | |
| elif "404" in err_str: | |
| _status(f"β οΈ {short} not found on router β trying next modelβ¦") | |
| break | |
| else: | |
| logger.warning("%s attempt %d failed: %s", short, attempt, exc) | |
| if attempt < 3: time.sleep(5) | |
| else: break | |
| time.sleep(2) | |
| return None | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # 5. ESGConsultant β orchestration class | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ESGConsultant: | |
| """ | |
| Multi-modal ESG intelligence platform. | |
| βββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββ | |
| β Method β Model Used β Output β | |
| βββββββββββββββββββΌβββββββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββ€ | |
| β query() β Qwen2.5-7B (+ fallback chain) β Strategic text report β | |
| β creative_text() β Phi-3.5-mini (+ fallback chain) β Creative brief/copy β | |
| β create_image() β FLUX.1-Schnell β PNG image bytes β | |
| β create_audio() β TTSEngine (local) β HF API β WAV bytes (mono) β | |
| β create_podcast_audio() β TTSEngine (local, dual-speaker)β WAV bytes (stereo-feel)β | |
| βββββββββββββββββββ΄βββββββββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββ | |
| """ | |
| def __init__(self, hf_token: str, index_dir: Path = FAISS_INDEX_DIR): | |
| self.hf_token = hf_token | |
| self.store = FAISSStore(index_dir=index_dir, hf_token=hf_token) | |
| self.store.load() | |
| def update_token(self, hf_token: str) -> None: | |
| self.hf_token = hf_token | |
| self.store.hf_token = hf_token | |
| def index_documents(self, text: str, reset: bool = False) -> int: | |
| if reset: | |
| self.store.clear() | |
| chunks = chunk_text(text) | |
| if not chunks: | |
| return 0 | |
| self.store.add_texts(chunks) | |
| return len(chunks) | |
| def query(self, question: str, top_k: int = TOP_K, max_tokens: int = 1024, temperature: float = 0.4) -> dict: | |
| if not self.store.is_ready: | |
| return {"answer": "β οΈ No documents indexed. Upload files on the **π€ Data Ingestion** page.", "sources": [], "chunks_used": 0} | |
| retrieved = self.store.search(question, top_k=top_k) | |
| answer = call_strategy_llm(question, retrieved, self.hf_token, max_tokens, temperature) | |
| return {"answer": answer, "sources": retrieved, "chunks_used": len(retrieved)} | |
| def creative_text(self, brief: str, mode: str = "poster", top_k: int = TOP_K, max_tokens: int = 700) -> dict: | |
| retrieved = self.store.search(brief, top_k=top_k) if self.store.is_ready else [] | |
| answer = call_creative_llm(brief, retrieved, self.hf_token, mode, max_tokens) | |
| return {"answer": answer, "sources": retrieved, "chunks_used": len(retrieved)} | |
| def create_image(self, prompt: str, width: int = 1024, height: int = 1024) -> bytes | None: | |
| return generate_image(prompt, self.hf_token, width, height) | |
| def create_audio(self, script: str) -> bytes: | |
| """ | |
| Convert a narration script to speech (single speaker). | |
| Primary: local TTSEngine.synthesize() β SpeechT5 + HiFi-GAN | |
| Fallback: HF Inference API chain β mms-tts-eng β VITS β fastspeech2 | |
| Strips [PAUSE]/[EMPHASIS] cue tags automatically. | |
| Raises RuntimeError with the real reason if all paths fail β catch it | |
| in your UI to display an actionable message. | |
| """ | |
| return generate_audio(script, self.hf_token) | |
| def create_podcast_audio(self, script: str) -> bytes: | |
| """ | |
| Convert a [HOST]/[GUEST]-tagged podcast script to dual-speaker audio. | |
| Uses TTSEngine.synthesize_podcast() directly (local SpeechT5 only β | |
| the HF API does not support dual-speaker synthesis). | |
| Script format expected: | |
| [HOST] Welcome to the SPJIMR ESG podcast... | |
| [GUEST] Thank you for having me... | |
| Raises RuntimeError if the local engine fails (no API fallback for | |
| dual-speaker β log the error and suggest using create_audio() instead). | |
| """ | |
| clean = re.sub(r"\s{2,}", " ", script).strip() | |
| if not clean: | |
| raise ValueError("Podcast script is empty.") | |
| try: | |
| from core.tts_engine import TTSEngine, TTSConfig, TTSBackend | |
| engine = TTSEngine(TTSConfig( | |
| backend = TTSBackend.SPEECHT5, | |
| max_chunk_chars = AUDIO_MAX_CHUNK, | |
| sample_rate = AUDIO_SAMPLE_RATE, | |
| )) | |
| return engine.synthesize_podcast(clean) | |
| except Exception as exc: | |
| raise RuntimeError( | |
| f"Podcast audio generation failed: {exc}\n" | |
| "Tip: use create_audio() for single-speaker fallback via HF API." | |
| ) from exc | |
| def _condense_for_video(self, long_brief: str) -> str: | |
| system = ( | |
| "You are a video prompt engineer. " | |
| "Given a long director's brief, extract ONE concise sentence (max 150 characters) " | |
| "that describes the most visually striking scene. " | |
| "Output ONLY that sentence β no explanation, no punctuation beyond the sentence itself." | |
| ) | |
| user = f"Director's brief:\n{long_brief[:1000]}\n\nOne-sentence video prompt:" | |
| try: | |
| condensed = _chat_completion( | |
| messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], | |
| hf_token=self.hf_token, model_key="strategy", max_tokens=80, temperature=0.3, | |
| ) | |
| return condensed.strip().strip('"\'\n').split("\n")[0][:200] | |
| except Exception as exc: | |
| logger.warning("Condense LLM failed (%s) β using fallback truncation", exc) | |
| for sentence in long_brief.replace("\n", " ").split("."): | |
| s = sentence.strip() | |
| if 20 < len(s) < 150: | |
| return s | |
| return long_brief[:140].strip() | |
| def create_video(self, prompt: str, num_frames: int = 16, status_cb=None) -> bytes | None: | |
| return generate_video(prompt, self.hf_token, num_frames=num_frames, status_cb=status_cb) | |
| def is_ready(self) -> bool: | |
| return self.store.is_ready | |
| def vector_count(self) -> int: | |
| return self.store.vector_count | |
| def reset_index(self) -> None: | |
| self.store.clear() |