WISE_Energy / src /core /consultant.py
ahanbose's picture
Update src/core/consultant.py
b653635 verified
"""
core/consultant.py
------------------
Multi-modal RAG pipeline using SPECIALIST HuggingFace models per task.
MODEL ROSTER
─────────────────────────────────────────────────────────────────────
TASK MODEL WHY
─────────────────────────────────────────────────────────────────────
Embeddings all-MiniLM-L6-v2 Fast, accurate semantic search
Strategic Q&A Qwen/Qwen2.5-7B-Instruct Free tier, excellent reasoning
Creative Writing microsoft/Phi-3.5-mini-instruct Free tier, strong creative output
Poster / Image Gen black-forest-labs/FLUX.1-schnell State-of-the-art text→image, fast
Audio Narration TTSEngine (SpeechT5 local) β†’ HF API fallback chain
─────────────────────────────────────────────────────────────────────
Audio runs locally via core/tts_engine.py (SpeechT5 + HiFi-GAN).
If unavailable, falls back to HF API: mms-tts-eng β†’ VITS β†’ fastspeech2.
All other inference = HTTPS calls to HuggingFace API.
"""
import base64
import io
import logging
import os
import re
import textwrap
import time
from pathlib import Path
import numpy as np
os.environ["TOKENIZERS_PARALLELISM"] = "false"
logger = logging.getLogger(__name__)
# ══════════════════════════════════════════════════════════════════════════════
# MODEL REGISTRY
# ══════════════════════════════════════════════════════════════════════════════
MODELS = {
"embed": "sentence-transformers/all-MiniLM-L6-v2",
"strategy": "Qwen/Qwen2.5-7B-Instruct",
"creative": "microsoft/Phi-3.5-mini-instruct",
"image": "black-forest-labs/FLUX.1-schnell",
"video": "damo-vilab/text-to-video-ms-1.7b",
"audio": "facebook/mms-tts-eng",
}
# ── RAG constants ──────────────────────────────────────────────────────────────
FAISS_INDEX_DIR = Path("data/faiss_index")
CHUNK_SIZE = 400
CHUNK_OVERLAP = 50
TOP_K = 5
EMBED_BATCH = 32
MAX_INPUT_CHARS = 80_000
# ── Audio constants ───────────────────────────────────────────────────────────
AUDIO_SAMPLE_RATE = 16_000 # default sample rate (SpeechT5 native)
AUDIO_MAX_CHUNK = 250 # chars per chunk β€” matches SpeechT5 max_chunk_chars
AUDIO_SILENCE_SHORT = 0.20 # seconds of silence between sentences
# ── HF API fallback chain (used only when local TTSEngine fails) ───────────────
AUDIO_MODEL_CHAIN = [
"facebook/mms-tts-eng",
"espnet/kan-bayashi_ljspeech_vits",
"facebook/fastspeech2-en-ljspeech",
]
# ══════════════════════════════════════════════════════════════════════════════
# 1. TEXT CHUNKING
# ══════════════════════════════════════════════════════════════════════════════
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> list[str]:
"""Split text into overlapping chunks, respecting sentence boundaries."""
if len(text) > MAX_INPUT_CHARS:
logger.warning("Truncating input: %d β†’ %d chars", len(text), MAX_INPUT_CHARS)
text = text[:MAX_INPUT_CHARS]
text = re.sub(r"\n{3,}", "\n\n", text).strip()
if not text:
return []
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks: list[str] = []
current = ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(current) + len(sentence) + 1 > chunk_size and current:
chunks.append(current.strip())
# Keep overlap: last N chars of current chunk
current = current[-overlap:].strip() + " " + sentence if overlap else sentence
else:
current = f"{current} {sentence}".strip() if current else sentence
if current.strip():
chunks.append(current.strip())
return chunks
# ══════════════════════════════════════════════════════════════════════════════
# 2. EMBEDDINGS (HuggingFace Inference API β€” all-MiniLM-L6-v2)
# ══════════════════════════════════════════════════════════════════════════════
def embed_texts(texts: list[str], hf_token: str) -> "np.ndarray":
"""Embed a list of texts via HF Inference API. Returns float32 (N, 384) array."""
from huggingface_hub import InferenceClient
client = InferenceClient(model=MODELS["embed"], token=hf_token)
all_vecs: list[list[float]] = []
for i in range(0, len(texts), EMBED_BATCH):
batch = texts[i : i + EMBED_BATCH]
try:
result = client.feature_extraction(batch)
# result shape: (batch, seq, dim) or (batch, dim) β€” reduce to (batch, dim)
arr = np.array(result, dtype=np.float32)
if arr.ndim == 3:
arr = arr.mean(axis=1)
all_vecs.extend(arr.tolist())
except Exception as exc:
logger.error("Embedding batch %d failed: %s", i // EMBED_BATCH, exc)
# Fallback: zero vectors so index isn't broken
all_vecs.extend([[0.0] * 384] * len(batch))
return np.array(all_vecs, dtype=np.float32)
# ══════════════════════════════════════════════════════════════════════════════
# 3. FAISS VECTOR STORE
# ══════════════════════════════════════════════════════════════════════════════
class FAISSStore:
"""Persistent FAISS flat-L2 index with HF embedding."""
def __init__(self, index_dir: Path = FAISS_INDEX_DIR, hf_token: str = "") -> None:
self.index_dir = Path(index_dir)
self.hf_token = hf_token
self.index_path = self.index_dir / "index.faiss"
self.texts_path = self.index_dir / "texts.npy"
self._index = None
self._chunks: list[str] = []
self.index_dir.mkdir(parents=True, exist_ok=True)
# ── Embedding helper ───────────────────────────────────────────────────────
def embed(self, texts: list[str]) -> "np.ndarray":
return embed_texts(texts, self.hf_token)
# ── Persistence ────────────────────────────────────────────────────────────
def load(self) -> None:
"""Load index and texts from disk if they exist."""
try:
import faiss
if self.index_path.exists() and self.texts_path.exists():
self._index = faiss.read_index(str(self.index_path))
self._chunks = np.load(str(self.texts_path), allow_pickle=True).tolist()
logger.info("FAISSStore loaded: %d vectors", self._index.ntotal)
except Exception as exc:
logger.warning("FAISSStore load failed (%s) β€” starting fresh", exc)
self._index, self._chunks = None, []
def save(self) -> None:
"""Persist index and texts to disk."""
try:
import faiss
if self._index is not None:
faiss.write_index(self._index, str(self.index_path))
np.save(str(self.texts_path), np.array(self._chunks, dtype=object))
except Exception as exc:
logger.warning("FAISSStore save failed: %s", exc)
# ── Indexing ───────────────────────────────────────────────────────────────
def add_texts(self, texts: list[str]) -> None:
"""Embed and add texts to the index."""
import faiss
if not texts:
return
vecs = self.embed(texts)
dim = vecs.shape[1]
if self._index is None:
self._index = faiss.IndexFlatL2(dim)
self._index.add(vecs)
self._chunks.extend(texts)
self.save()
logger.info("FAISSStore: added %d texts, total=%d", len(texts), self._index.ntotal)
# ── Search ─────────────────────────────────────────────────────────────────
def search(self, query: str, top_k: int = TOP_K) -> list[str]:
if self._index is None or self._index.ntotal == 0:
return []
q_vec = self.embed([query])
_, idxs = self._index.search(q_vec, min(top_k, self._index.ntotal))
return [self._chunks[i] for i in idxs[0] if i >= 0]
def clear(self) -> None:
self._index, self._chunks = None, []
for p in [self.index_path, self.texts_path]:
if p.exists():
p.unlink()
@property
def is_ready(self) -> bool:
return self._index is not None and self._index.ntotal > 0
@property
def vector_count(self) -> int:
return self._index.ntotal if self._index else 0
# ══════════════════════════════════════════════════════════════════════════════
# 4. LLM INFERENCE
# ══════════════════════════════════════════════════════════════════════════════
def _chat_completion(
messages: list[dict],
hf_token: str,
model_key: str,
max_tokens: int = 1024,
temperature: float = 0.4,
) -> str:
from huggingface_hub import InferenceClient
fallbacks = {
"strategy": [
"Qwen/Qwen2.5-7B-Instruct",
"mistralai/Mistral-7B-Instruct-v0.3",
"microsoft/Phi-3.5-mini-instruct",
],
"creative": [
"microsoft/Phi-3.5-mini-instruct",
"Qwen/Qwen2.5-7B-Instruct",
"mistralai/Mistral-7B-Instruct-v0.3",
],
}
candidates = fallbacks.get(model_key, [MODELS[model_key]])
last_exc = None
for model_id in candidates:
try:
logger.info("Trying model: %s", model_id)
client = InferenceClient(model=model_id, token=hf_token)
response = client.chat_completion(
messages = messages,
max_tokens = max_tokens,
temperature = temperature,
)
return response.choices[0].message.content.strip()
except Exception as exc:
logger.warning("Model %s failed: %s β€” trying next fallback…", model_id, exc)
last_exc = exc
time.sleep(1)
return (
f"⚠️ **All models failed.** Last error: {last_exc}\n\n"
"Please check:\n"
"- Your HuggingFace token is valid (huggingface.co/settings/tokens)\n"
"- You have internet connectivity\n"
"- Try refreshing your token or upgrading to HF PRO"
)
# ── 4a. Strategic Q&A ──────────────────────────────────────────────────────────
def call_strategy_llm(
question: str, context_chunks: list[str], hf_token: str,
max_tokens: int = 1024, temperature: float = 0.4,
) -> str:
context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No documents indexed yet."
system_msg = textwrap.dedent("""
You are an expert ESG strategic consultant for SPJIMR
(SP Jain Institute of Management & Research), Mumbai.
You specialise in campus sustainability, SDG alignment, circular economy,
and ESG benchmarking for educational institutions.
Always cite specific data points from the provided context.
Structure every response with these four sections:
1. πŸ“Š Key Findings
2. 🎯 Strategic Recommendations
3. πŸ“ˆ Implementation Roadmap
4. ⚠️ Risks & Mitigation
""").strip()
user_msg = f"Context from SPJIMR sustainability documents:\n\n{context}\n\nQuestion: {question}"
return _chat_completion(
messages=[{"role": "system", "content": system_msg}, {"role": "user", "content": user_msg}],
hf_token=hf_token, model_key="strategy", max_tokens=max_tokens, temperature=temperature,
)
# ── 4b. Creative Writing ───────────────────────────────────────────────────────
_CREATIVE_SYSTEM = textwrap.dedent("""
You are a creative director and sustainability communications expert for SPJIMR,
SP Jain Institute of Management & Research, Mumbai.
You craft compelling, data-driven ESG communications for Indian higher education audiences.
Always ground your output in the provided sustainability context.
""").strip()
_MODE_INSTRUCTIONS: dict[str, str] = {
"poster": (
"Generate a detailed FLUX image generation prompt for a sustainability poster. "
"Include: visual style, color palette, key visual elements, mood, and typography guidance. "
"Make it striking, professional, and specific to SPJIMR's campus sustainability story. "
"Output ONLY the image prompt β€” no explanation."
),
"video": (
"Write a cinematic director's brief for a 30–60 second sustainability video. "
"Include: opening shot, narrative arc, key scenes, voiceover tone, music mood, and closing CTA. "
"Ground it in SPJIMR's actual sustainability achievements from the context."
),
"social": (
"Create a complete social media content package including: "
"a compelling caption (platform-appropriate length), 10–15 relevant hashtags, "
"a visual brief describing the ideal image/graphic, and a CTA. "
"Make it authentic, data-grounded, and optimised for engagement."
),
"audio_script": (
"Write a polished narration script for audio/podcast production. "
"Include natural speech patterns, emphasis cues [EMPHASIS], and pause markers [PAUSE]. "
"Match the requested tone and duration. Ground facts in the provided context. "
"Output ONLY the script β€” no stage directions or scene headings."
),
}
def call_creative_llm(
brief: str, context_chunks: list[str], hf_token: str,
mode: str = "poster", max_tokens: int = 700,
) -> str:
context = "\n\n---\n\n".join(context_chunks) if context_chunks else "No context available."
instruction = _MODE_INSTRUCTIONS.get(mode, _MODE_INSTRUCTIONS["poster"])
user_msg = (
f"SPJIMR Sustainability Context:\n{context}\n\n"
f"Creative Brief:\n{brief}\n\n"
f"Task:\n{instruction}"
)
return _chat_completion(
messages=[{"role": "system", "content": _CREATIVE_SYSTEM}, {"role": "user", "content": user_msg}],
hf_token=hf_token, model_key="creative", max_tokens=max_tokens, temperature=0.8,
)
# ── 4c. Image Generation ───────────────────────────────────────────────────────
def generate_image(prompt: str, hf_token: str, width: int = 1024, height: int = 1024) -> bytes | None:
try:
from huggingface_hub import InferenceClient
client = InferenceClient(model=MODELS["image"], token=hf_token)
enriched = f"{prompt}, sustainability theme, green campus, professional quality, high resolution, photorealistic"
image = client.text_to_image(enriched, width=width, height=height)
buf = io.BytesIO()
image.save(buf, format="PNG")
return buf.getvalue()
except Exception as exc:
logger.error("Image generation error: %s", exc)
return None
# ── 4e. Video Generation ───────────────────────────────────────────────────────
# ══════════════════════════════════════════════════════════════════════════════
# 4d. Audio Narration
# ══════════════════════════════════════════════════════════════════════════════
#
# PRIMARY: core/tts_engine.py β€” local SpeechT5 + HiFi-GAN via Transformers
# β€’ High quality, dual-speaker podcast support, fully offline
# β€’ Lazy-loaded; model downloads once then cached
#
# FALLBACK: HuggingFace Inference API chain (no local deps required)
# facebook/mms-tts-eng β†’ espnet VITS β†’ facebook/fastspeech2
#
# generate_audio() tries primary first; if it raises, walks the API chain.
# Raises RuntimeError with the real reason if everything fails so the UI
# can show an actionable message instead of a generic fallback prompt.
# ─────────────────────────────────────────────────────────────────────────────
def _split_audio_chunks(text: str, max_chars: int = AUDIO_MAX_CHUNK) -> list[str]:
"""Sentence-boundary chunking with word-level hard-split for long sentences."""
def hard_split(sentence: str, limit: int) -> list[str]:
parts, current = [], ""
for word in sentence.split():
candidate = f"{current} {word}".strip() if current else word
if len(candidate) > limit and current:
parts.append(current)
current = word
else:
current = candidate
if current:
parts.append(current)
return parts or [sentence]
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks, current = [], ""
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
subs = hard_split(sentence, max_chars) if len(sentence) > max_chars else [sentence]
for sub in subs:
if len(current) + len(sub) + 1 > max_chars and current:
chunks.append(current.strip())
current = sub
else:
current = f"{current} {sub}".strip() if current else sub
if current.strip():
chunks.append(current.strip())
return chunks
def _audio_bytes_to_numpy(raw: bytes) -> tuple[np.ndarray, int]:
"""Decode audio bytes β†’ (float32 array, sample_rate). Tries soundfile then stdlib wave."""
try:
import soundfile as sf
audio, sr = sf.read(io.BytesIO(raw), dtype="float32")
if audio.ndim > 1:
audio = audio.mean(axis=1)
return audio, sr
except Exception as sf_exc:
pass
try:
import wave, struct
with wave.open(io.BytesIO(raw)) as wf:
sr = wf.getframerate()
n_frames = wf.getnframes()
n_ch = wf.getnchannels()
sampwidth = wf.getsampwidth()
frames = wf.readframes(n_frames)
fmt = f"<{n_frames * n_ch}{({1:'b',2:'h',4:'i'}).get(sampwidth,'h')}"
maxval = float(2 ** (8 * sampwidth - 1))
pcm = np.array(struct.unpack(fmt, frames), dtype=np.float32) / maxval
if n_ch > 1:
pcm = pcm.reshape(-1, n_ch).mean(axis=1)
return pcm, sr
except Exception as wave_exc:
raise RuntimeError(f"Audio decode failed β€” sf: {sf_exc} | wave: {wave_exc}")
def _numpy_to_wav_bytes(audio: np.ndarray, sample_rate: int) -> bytes:
"""Normalise float32 array β†’ PCM-16 WAV bytes."""
max_val = np.abs(audio).max()
if max_val > 1e-6:
audio = audio / max_val * 0.95
try:
import soundfile as sf
buf = io.BytesIO()
sf.write(buf, audio, sample_rate, format="WAV", subtype="PCM_16")
buf.seek(0)
return buf.read()
except Exception:
import wave, struct
pcm = (audio * 32767).astype(np.int16)
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(struct.pack(f"<{len(pcm)}h", *pcm))
buf.seek(0)
return buf.read()
# def _generate_audio_local(script: str) -> bytes:
# """
# Primary path: synthesise via local TTSEngine (SpeechT5).
# Raises any exception so the caller can fall back to the API chain.
# """
# # 1. Detect if it's a podcast BEFORE cleaning tags
# is_podcast = bool(re.search(r"\[(HOST|GUEST)\]", script, re.IGNORECASE))
# if is_podcast:
# # Removes [PAUSE], [EMPHASIS], etc., but keeps [HOST] and [GUEST]
# clean = re.sub(r"\[(?!(HOST|GUEST)\]).*?\]", " ", script, flags=re.IGNORECASE)
# else:
# clean = re.sub(r"\[.*?\]", " ", script)
# clean = re.sub(r"\s{2,}", " ", clean).strip()
# try:
# from core.tts_engine import TTSEngine, TTSConfig, TTSBackend
# engine = TTSEngine(TTSConfig(
# backend = TTSBackend.SPEECHT5,
# max_chunk_chars= AUDIO_MAX_CHUNK,
# sample_rate = AUDIO_SAMPLE_RATE, ))
# if is_podcast:
# logger.info("Routing to synthesize_podcast (Dual-Speaker)")
# return engine.synthesize_podcast(clean)
# else:
# logger.info("Routing to synthesize (Solo Narration)")
# return engine.synthesize(clean)
# except Exception as local_exc:
# # Fallback to API (Note: API only supports Solo)
# logger.warning("Local engine failed, falling back to API (Solo only)")
# # We must strip ALL tags for the API fallback
# api_clean = re.sub(r"\[.*?\]", " ", clean)
# return _generate_audio_api(api_clean, hf_token)
def _generate_audio_local(script: str) -> bytes:
"""
Primary path: synthesise via local TTSEngine (SpeechT5).
FIX: Forces speaker tags to the start of the line to trigger voice switching.
"""
import re
from core.tts_engine import TTSEngine, TTSConfig, TTSBackend
# 1. CLEANUP: Remove instruction tags (like [PAUSE], [EMPHASIS])
# but PROTECT [HOST] and [GUEST] tags using negative lookahead.
# This prevents the engine from literally speaking the word "PAUSE".
clean = re.sub(r"\[(?!(?:HOST|GUEST)\]).*?\]", "", script, flags=re.IGNORECASE)
# 2. THE CRITICAL FIX: Force [HOST] and [GUEST] to start on a NEW LINE.
# Your engine uses re.match(r'^\[HOST\]'), which ONLY works if the tag
# is at the very beginning of a line. This replaces the tag with
# a newline + the tag.
clean = re.sub(r"(\[(?:HOST|GUEST)\])", r"\n\1", clean, flags=re.IGNORECASE)
# 3. Final whitespace cleanup (collapsing multiple spaces into one)
clean = re.sub(r"\s{2,}", " ", clean).strip()
# 4. Check if we should trigger the Multi-Speaker logic
is_podcast = bool(re.search(r"\[(HOST|GUEST)\]", clean, re.IGNORECASE))
# 5. Initialize the Engine with your constants
engine = TTSEngine(TTSConfig(
backend = TTSBackend.SPEECHT5,
max_chunk_chars = 250, # Matches your TTSConfig default
sample_rate = 16000,
))
# 6. ROUTING: Call the specific podcast method if tags are present
if is_podcast:
logger.info("βœ… SUCCESS: Routing to synthesize_podcast (Dual-Speaker Mode)")
return engine.synthesize_podcast(clean)
else:
logger.info("ℹ️ INFO: Routing to synthesize (Solo Narration Mode)")
return engine.synthesize(clean)
# 4. Routing to the correct engine method
if is_podcast:
logger.info("Routing to synthesize_podcast (Dual-Speaker)")
return engine.synthesize_podcast(clean)
else:
logger.info("Routing to synthesize (Solo Narration)")
return engine.synthesize(clean)
def _generate_audio_api(script: str, hf_token: str) -> bytes:
"""
Fallback path: HuggingFace Inference API chain.
Tries each model in AUDIO_MODEL_CHAIN until one succeeds.
Raises RuntimeError if all fail.
"""
from huggingface_hub import InferenceClient
model_errors: list[str] = []
chunks = _split_audio_chunks(script, max_chars=400) # HF API supports longer chunks
for model_id in AUDIO_MODEL_CHAIN:
short = model_id.split("/")[-1]
logger.info("generate_audio API fallback: trying %s", model_id)
all_audio: list[np.ndarray] = []
sample_rate = AUDIO_SAMPLE_RATE
chunk_errors = 0
for i, chunk in enumerate(chunks):
chunk = chunk.strip()
if not chunk:
continue
try:
client = InferenceClient(model=model_id, token=hf_token)
result = client.text_to_speech(chunk)
raw = bytes(result) if isinstance(result, (bytes, bytearray)) else result.read()
if len(raw) < 100:
raise RuntimeError(f"Response too small ({len(raw)} bytes)")
audio_np, sr = _audio_bytes_to_numpy(raw)
sample_rate = sr
all_audio.append(audio_np)
all_audio.append(np.zeros(int(sr * AUDIO_SILENCE_SHORT), dtype=np.float32))
logger.info(" [%s] chunk %d/%d ok", short, i + 1, len(chunks))
except Exception as exc:
logger.warning(" [%s] chunk %d/%d failed: %s", short, i + 1, len(chunks), exc)
chunk_errors += 1
if all_audio and chunk_errors < len(chunks):
logger.info("generate_audio: success via API model %s", short)
return _numpy_to_wav_bytes(np.concatenate(all_audio), sample_rate)
model_errors.append(f"{short}: {chunk_errors}/{len(chunks)} chunks failed")
raise RuntimeError(
"All HF API TTS models failed. " + " | ".join(model_errors) + "\n"
"Check: HF token validity, internet connectivity, or HF PRO quota."
)
def generate_audio(script: str, hf_token: str) -> bytes:
"""
Convert a narration script to speech.
Strategy:
1. Try local TTSEngine (SpeechT5) β€” best quality, works offline
2. Fall back to HF Inference API chain if local engine fails
Strips [PAUSE] / [EMPHASIS] cue tags before synthesis.
Raises RuntimeError with full details if both paths fail.
"""
# Clean cue tags inserted by the creative LLM
clean = re.sub(r"\[.*?\]", " ", script)
clean = re.sub(r"\s{2,}", " ", clean).strip()
if not clean:
raise ValueError("Audio script is empty after cleaning cue tags.")
logger.info("generate_audio: %d chars β€” trying local TTSEngine first…", len(clean))
# ── Path 1: local SpeechT5 engine ────────────────────────────────────────
local_exc: Exception | None = None # must be defined before try so it's in scope for Path 2
try:
result = _generate_audio_local(clean)
logger.info("generate_audio: local TTSEngine succeeded")
return result
except Exception as _exc:
local_exc = _exc # capture before the except block deletes the binding
logger.warning(
"generate_audio: local TTSEngine failed (%s) β€” falling back to HF API…",
local_exc,
)
# ── Path 2: HF Inference API chain ────────────────────────────────────────
try:
result = _generate_audio_api(clean, hf_token)
logger.info("generate_audio: HF API fallback succeeded")
return result
except RuntimeError as api_exc:
raise RuntimeError(
f"Audio generation failed on both paths.\n"
f"Local TTSEngine: {local_exc}\n"
f"HF API chain: {api_exc}"
) from api_exc
# ══════════════════════════════════════════════════════════════════════════════
# 4e. Video Generation
# ══════════════════════════════════════════════════════════════════════════════
VIDEO_MODEL_CHAIN = [
"BestWishYsh/Helios-Base",
"Wan-AI/Wan2.2-T2V-A14B-Diffusers",
"damo-vilab/text-to-video-ms-1.7b",
]
def generate_video(
prompt: str, hf_token: str, num_frames: int = 16,
timeout: int = 180, status_cb=None,
) -> bytes | None:
from huggingface_hub import InferenceClient
def _status(msg: str):
logger.info(msg)
if status_cb:
try: status_cb(msg)
except Exception: pass
enriched = f"{prompt.strip()}, green campus, sustainability, smooth cinematic motion, vibrant colors"
for model_id in VIDEO_MODEL_CHAIN:
short = model_id.split("/")[1]
_status(f"Trying {short}…")
for attempt in range(1, 4):
try:
_status(f"Attempt {attempt}/3 β€” sending to {short} (timeout {timeout}s)…")
client = InferenceClient(model=model_id, token=hf_token, timeout=timeout)
result = client.text_to_video(enriched)
if isinstance(result, (bytes, bytearray)):
raw = bytes(result)
elif hasattr(result, "read"):
raw = result.read()
else:
try:
import imageio, tempfile, os as _os
frames = [np.array(f) for f in result]
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
writer = imageio.get_writer(tmp.name, fps=8)
for f in frames: writer.append_data(f)
writer.close()
raw = open(tmp.name, "rb").read()
_os.unlink(tmp.name)
except Exception as conv_exc:
logger.warning("Could not convert result type %s: %s", type(result), conv_exc)
raw = b""
if len(raw) > 500:
_status(f"βœ… Got {len(raw):,} bytes from {short}")
return raw
logger.warning("%s returned only %d bytes β€” skipping", short, len(raw))
break
except Exception as exc:
err_str = str(exc)
if "503" in err_str or "loading" in err_str.lower():
wait = 40
_status(f"⏳ {short} is loading, waiting {wait}s…")
time.sleep(wait)
elif "401" in err_str or "403" in err_str:
_status("❌ Invalid HF token β€” please check your token in the sidebar.")
return None
elif "404" in err_str:
_status(f"⚠️ {short} not found on router β€” trying next model…")
break
else:
logger.warning("%s attempt %d failed: %s", short, attempt, exc)
if attempt < 3: time.sleep(5)
else: break
time.sleep(2)
return None
# ══════════════════════════════════════════════════════════════════════════════
# 5. ESGConsultant β€” orchestration class
# ══════════════════════════════════════════════════════════════════════════════
class ESGConsultant:
"""
Multi-modal ESG intelligence platform.
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Method β”‚ Model Used β”‚ Output β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ query() β”‚ Qwen2.5-7B (+ fallback chain) β”‚ Strategic text report β”‚
β”‚ creative_text() β”‚ Phi-3.5-mini (+ fallback chain) β”‚ Creative brief/copy β”‚
β”‚ create_image() β”‚ FLUX.1-Schnell β”‚ PNG image bytes β”‚
β”‚ create_audio() β”‚ TTSEngine (local) β†’ HF API β”‚ WAV bytes (mono) β”‚
β”‚ create_podcast_audio() β”‚ TTSEngine (local, dual-speaker)β”‚ WAV bytes (stereo-feel)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
"""
def __init__(self, hf_token: str, index_dir: Path = FAISS_INDEX_DIR):
self.hf_token = hf_token
self.store = FAISSStore(index_dir=index_dir, hf_token=hf_token)
self.store.load()
def update_token(self, hf_token: str) -> None:
self.hf_token = hf_token
self.store.hf_token = hf_token
def index_documents(self, text: str, reset: bool = False) -> int:
if reset:
self.store.clear()
chunks = chunk_text(text)
if not chunks:
return 0
self.store.add_texts(chunks)
return len(chunks)
def query(self, question: str, top_k: int = TOP_K, max_tokens: int = 1024, temperature: float = 0.4) -> dict:
if not self.store.is_ready:
return {"answer": "⚠️ No documents indexed. Upload files on the **πŸ“€ Data Ingestion** page.", "sources": [], "chunks_used": 0}
retrieved = self.store.search(question, top_k=top_k)
answer = call_strategy_llm(question, retrieved, self.hf_token, max_tokens, temperature)
return {"answer": answer, "sources": retrieved, "chunks_used": len(retrieved)}
def creative_text(self, brief: str, mode: str = "poster", top_k: int = TOP_K, max_tokens: int = 700) -> dict:
retrieved = self.store.search(brief, top_k=top_k) if self.store.is_ready else []
answer = call_creative_llm(brief, retrieved, self.hf_token, mode, max_tokens)
return {"answer": answer, "sources": retrieved, "chunks_used": len(retrieved)}
def create_image(self, prompt: str, width: int = 1024, height: int = 1024) -> bytes | None:
return generate_image(prompt, self.hf_token, width, height)
def create_audio(self, script: str) -> bytes:
"""
Convert a narration script to speech (single speaker).
Primary: local TTSEngine.synthesize() β€” SpeechT5 + HiFi-GAN
Fallback: HF Inference API chain β€” mms-tts-eng β†’ VITS β†’ fastspeech2
Strips [PAUSE]/[EMPHASIS] cue tags automatically.
Raises RuntimeError with the real reason if all paths fail β€” catch it
in your UI to display an actionable message.
"""
return generate_audio(script, self.hf_token)
def create_podcast_audio(self, script: str) -> bytes:
"""
Convert a [HOST]/[GUEST]-tagged podcast script to dual-speaker audio.
Uses TTSEngine.synthesize_podcast() directly (local SpeechT5 only β€”
the HF API does not support dual-speaker synthesis).
Script format expected:
[HOST] Welcome to the SPJIMR ESG podcast...
[GUEST] Thank you for having me...
Raises RuntimeError if the local engine fails (no API fallback for
dual-speaker β€” log the error and suggest using create_audio() instead).
"""
clean = re.sub(r"\s{2,}", " ", script).strip()
if not clean:
raise ValueError("Podcast script is empty.")
try:
from core.tts_engine import TTSEngine, TTSConfig, TTSBackend
engine = TTSEngine(TTSConfig(
backend = TTSBackend.SPEECHT5,
max_chunk_chars = AUDIO_MAX_CHUNK,
sample_rate = AUDIO_SAMPLE_RATE,
))
return engine.synthesize_podcast(clean)
except Exception as exc:
raise RuntimeError(
f"Podcast audio generation failed: {exc}\n"
"Tip: use create_audio() for single-speaker fallback via HF API."
) from exc
def _condense_for_video(self, long_brief: str) -> str:
system = (
"You are a video prompt engineer. "
"Given a long director's brief, extract ONE concise sentence (max 150 characters) "
"that describes the most visually striking scene. "
"Output ONLY that sentence β€” no explanation, no punctuation beyond the sentence itself."
)
user = f"Director's brief:\n{long_brief[:1000]}\n\nOne-sentence video prompt:"
try:
condensed = _chat_completion(
messages=[{"role": "system", "content": system}, {"role": "user", "content": user}],
hf_token=self.hf_token, model_key="strategy", max_tokens=80, temperature=0.3,
)
return condensed.strip().strip('"\'\n').split("\n")[0][:200]
except Exception as exc:
logger.warning("Condense LLM failed (%s) β€” using fallback truncation", exc)
for sentence in long_brief.replace("\n", " ").split("."):
s = sentence.strip()
if 20 < len(s) < 150:
return s
return long_brief[:140].strip()
def create_video(self, prompt: str, num_frames: int = 16, status_cb=None) -> bytes | None:
return generate_video(prompt, self.hf_token, num_frames=num_frames, status_cb=status_cb)
@property
def is_ready(self) -> bool:
return self.store.is_ready
@property
def vector_count(self) -> int:
return self.store.vector_count
def reset_index(self) -> None:
self.store.clear()