|
|
""" |
|
|
qa.py — GPT-4o (SAP Gen AI Hub) + ReRank Retrieval (Stable Strict, English Only) |
|
|
-------------------------------------------------- |
|
|
✅ Semantic retrieval (FAISS + cosine re-rank + neighbor fill) |
|
|
✅ Bullet-aware similarity boost for procedural chunks |
|
|
✅ Embedding caching (per PDF + chunk config aware) |
|
|
✅ Smart factual mode (fast) |
|
|
✅ Deep reasoning mode (ChatGPT-like) |
|
|
✅ genai_generate() helper for suggestions |
|
|
✅ Token-safe truncation (prevents 128k overflow) |
|
|
""" |
|
|
|
|
|
import os |
|
|
import re |
|
|
import json |
|
|
import pickle |
|
|
import hashlib |
|
|
import numpy as np |
|
|
from sentence_transformers import SentenceTransformer |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
from gen_ai_hub.proxy.core.proxy_clients import get_proxy_client |
|
|
from gen_ai_hub.proxy.langchain.openai import ChatOpenAI |
|
|
|
|
|
print("✅ qa.py (GPT-4o via Gen AI Hub + Bullet-Aware Retrieval + Cache) loaded from:", __file__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_EMB_DIR = os.path.join(os.path.dirname(__file__), "embed_cache") |
|
|
os.makedirs(CACHE_EMB_DIR, exist_ok=True) |
|
|
|
|
|
try: |
|
|
test_file = os.path.join(CACHE_EMB_DIR, "test_write.tmp") |
|
|
with open(test_file, "w") as f: |
|
|
f.write("ok") |
|
|
os.remove(test_file) |
|
|
print(f"✅ Cache directory ready and writable: {CACHE_EMB_DIR}") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Cache directory not writable ({CACHE_EMB_DIR}): {e}") |
|
|
CACHE_EMB_DIR = "/tmp/embed_cache" |
|
|
os.makedirs(CACHE_EMB_DIR, exist_ok=True) |
|
|
print(f"🔄 Fallback to temporary cache: {CACHE_EMB_DIR}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CACHE_DIR = "/tmp/hf_cache" |
|
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
|
os.environ.update({ |
|
|
"HF_HOME": CACHE_DIR, |
|
|
"TRANSFORMERS_CACHE": CACHE_DIR, |
|
|
"HF_DATASETS_CACHE": CACHE_DIR, |
|
|
"HF_MODULES_CACHE": CACHE_DIR |
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
_query_model = SentenceTransformer("intfloat/e5-small-v2", cache_folder=CACHE_DIR) |
|
|
print("✅ Loaded embedding model: intfloat/e5-small-v2 (English mode)") |
|
|
except Exception as e: |
|
|
print(f"⚠️ Embedding load failed ({e}), attempting fallback...") |
|
|
try: |
|
|
_query_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2", cache_folder=CACHE_DIR) |
|
|
print("🔁 Fallback: all-MiniLM-L6-v2 loaded successfully.") |
|
|
except Exception as e2: |
|
|
raise RuntimeError(f"❌ Could not load any embedding model: {e2}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CRED_PATH = os.path.join(os.path.dirname(__file__), "GEN AI HUB PROXY.json") |
|
|
_chat_llm = None |
|
|
|
|
|
def get_chat_llm(model_name: str = "gpt-4o", temperature: float = 0.3, max_tokens: int = 1500): |
|
|
global _chat_llm |
|
|
if _chat_llm is not None: |
|
|
return _chat_llm |
|
|
|
|
|
try: |
|
|
if os.path.exists(CRED_PATH): |
|
|
with open(CRED_PATH, "r") as key_file: |
|
|
svcKey = json.load(key_file) |
|
|
os.environ.update({ |
|
|
"AICORE_AUTH_URL": svcKey.get("url", ""), |
|
|
"AICORE_CLIENT_ID": svcKey.get("clientid", ""), |
|
|
"AICORE_CLIENT_SECRET": svcKey.get("clientsecret", ""), |
|
|
"AICORE_BASE_URL": svcKey.get("serviceurls", {}).get("AI_API_URL", ""), |
|
|
}) |
|
|
|
|
|
proxy_client = get_proxy_client("gen-ai-hub") |
|
|
_chat_llm = ChatOpenAI( |
|
|
proxy_model_name=model_name, |
|
|
proxy_client=proxy_client, |
|
|
temperature=temperature, |
|
|
max_tokens=max_tokens, |
|
|
) |
|
|
print(f"✅ GPT-4o (via Gen AI Hub) initialized lazily for model: {model_name}") |
|
|
return _chat_llm |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Gen AI Hub lazy init failed: {e}") |
|
|
_chat_llm = None |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def embed_chunks(chunks, batch_size: int = 32): |
|
|
if not chunks: |
|
|
return np.array([]) |
|
|
|
|
|
all_embeddings = [] |
|
|
for i in range(0, len(chunks), batch_size): |
|
|
batch = [f"passage: {c}" for c in chunks[i:i + batch_size]] |
|
|
batch_embs = _query_model.encode( |
|
|
batch, |
|
|
convert_to_numpy=True, |
|
|
normalize_embeddings=True, |
|
|
show_progress_bar=False |
|
|
) |
|
|
all_embeddings.extend(batch_embs) |
|
|
print(f"⚡ Embedded {len(all_embeddings)} chunks in batches of {batch_size}") |
|
|
return np.array(all_embeddings) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _hash_name(file_name: str, chunk_size: int, overlap: int, num_chunks: int): |
|
|
combo = f"{file_name}_{chunk_size}_{overlap}_{num_chunks}" |
|
|
return hashlib.md5(combo.encode()).hexdigest()[:8] |
|
|
|
|
|
def _clean_old_caches(base_name: str, keep_latest: int = 5): |
|
|
files = [ |
|
|
(os.path.getmtime(os.path.join(CACHE_EMB_DIR, f)), f) |
|
|
for f in os.listdir(CACHE_EMB_DIR) |
|
|
if f.startswith(base_name) |
|
|
] |
|
|
if len(files) > keep_latest: |
|
|
files.sort(reverse=True) |
|
|
for _, old_file in files[keep_latest:]: |
|
|
try: |
|
|
os.remove(os.path.join(CACHE_EMB_DIR, old_file)) |
|
|
print(f"🧹 Removed old cache: {old_file}") |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
def cache_embeddings(file_name: str, chunks, embed_func, chunk_size: int = None, overlap: int = None): |
|
|
cache_key = _hash_name(file_name, chunk_size or 1000, overlap or 100, len(chunks)) |
|
|
cache_file = f"{os.path.basename(file_name)}_cs{chunk_size}_ov{overlap}_{cache_key}.pkl" |
|
|
cache_path = os.path.join(CACHE_EMB_DIR, cache_file) |
|
|
base_name = os.path.basename(file_name) |
|
|
|
|
|
if os.path.exists(cache_path): |
|
|
print(f"🧠 Loaded cached embeddings for {base_name} ({chunk_size}/{overlap})") |
|
|
with open(cache_path, "rb") as f: |
|
|
return pickle.load(f) |
|
|
|
|
|
print(f"💡 No cache found for {base_name} ({chunk_size}/{overlap}). Generating new embeddings...") |
|
|
embeddings = embed_func(chunks) |
|
|
with open(cache_path, "wb") as f: |
|
|
pickle.dump(embeddings, f) |
|
|
print(f"💾 Cached embeddings saved as {cache_file}") |
|
|
_clean_old_caches(base_name, keep_latest=5) |
|
|
return embeddings |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
STRICT_PROMPT = ( |
|
|
"You are an enterprise documentation assistant.\n" |
|
|
"Use all relevant information from the CONTEXT below.\n" |
|
|
"When multiple causes, steps, or key points are discussed, present them as short, well-structured bullet points.\n" |
|
|
"When the answer focuses on a single concept, definition, or explanation, write it as a clear and compact paragraph.\n" |
|
|
"Keep the tone professional and concise. Do not invent facts outside the provided content.\n" |
|
|
"If nothing in the CONTEXT relates to the question, reply exactly:\n" |
|
|
"'I don't know based on the provided document.'\n\n" |
|
|
"Context:\n{context}\n\nQuestion: {query}\nAnswer:" |
|
|
) |
|
|
|
|
|
REASONING_PROMPT = ( |
|
|
"You are an expert enterprise assistant capable of reasoning.\n" |
|
|
"Think step by step and synthesize information even if scattered across chunks.\n" |
|
|
"Base your answer primarily on the CONTEXT, but if multiple partial clues exist, combine them logically.\n" |
|
|
"If absolutely nothing in the document relates, say exactly:\n" |
|
|
"'I don't know based on the provided document.'\n\n" |
|
|
"Context:\n{context}\n\nQuestion: {query}\nLet's reason step-by-step:\nAnswer:" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from vectorstore import build_faiss_index |
|
|
|
|
|
def retrieve_chunks(query: str, index, chunks: list, top_k: int = 7, |
|
|
min_similarity: float = 0.6, candidate_multiplier: int = 3, |
|
|
embeddings: list = None): |
|
|
if not index or not chunks: |
|
|
print("⚠️ No FAISS index or chunks provided — returning empty result.") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
q_emb = _query_model.encode( |
|
|
[f"query: {query.strip()}"], |
|
|
convert_to_numpy=True, |
|
|
normalize_embeddings=True |
|
|
)[0] |
|
|
|
|
|
if hasattr(index, "d") and q_emb.shape[0] != index.d: |
|
|
print(f"⚠️ FAISS dimension mismatch: index={index.d}, query={q_emb.shape[0]}") |
|
|
if embeddings: |
|
|
print("🔄 Rebuilding FAISS index...") |
|
|
index = build_faiss_index(embeddings) |
|
|
else: |
|
|
return [] |
|
|
|
|
|
num_candidates = max(top_k * candidate_multiplier, top_k + 2) |
|
|
distances, indices = index.search(np.array([q_emb]).astype("float32"), num_candidates) |
|
|
candidate_indices = list(dict.fromkeys([int(i) for i in indices[0] if i >= 0])) |
|
|
|
|
|
doc_embs = _query_model.encode( |
|
|
[f"passage: {chunks[i]}" for i in candidate_indices], |
|
|
convert_to_numpy=True, |
|
|
normalize_embeddings=True, |
|
|
) |
|
|
sims = cosine_similarity([q_emb], doc_embs)[0] |
|
|
|
|
|
boosted_sims = [] |
|
|
for idx, sim in zip(candidate_indices, sims): |
|
|
text = chunks[idx].strip() |
|
|
if re.match(r"^[-•\d]+[\.\s]", text): |
|
|
sim += 0.05 |
|
|
boosted_sims.append((idx, sim)) |
|
|
|
|
|
ranked = sorted(boosted_sims, key=lambda x: x[1], reverse=True) |
|
|
filtered = [idx for idx, sim in ranked if sim >= min_similarity][:top_k] |
|
|
if not filtered: |
|
|
print(f"⚠️ No chunks ≥ {min_similarity:.2f} — using top {top_k} ranked chunks instead.") |
|
|
filtered = [idx for idx, sim in ranked[:top_k]] |
|
|
|
|
|
neighbors = set() |
|
|
for idx in filtered: |
|
|
for n in [idx - 1, idx + 1]: |
|
|
if 0 <= n < len(chunks): |
|
|
neighbors.add(n) |
|
|
filtered = sorted(set(filtered) | neighbors) |
|
|
final_chunks = [chunks[i] for i in filtered] |
|
|
avg_sim = np.mean([s for _, s in ranked[:top_k]]) |
|
|
print(f"✅ Retrieved {len(final_chunks)} chunks | avg_sim={avg_sim:.3f} | threshold={min_similarity:.2f}") |
|
|
return final_chunks |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ Retrieval error: {repr(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def truncate_context(context_text: str, max_tokens: int = 100000, model: str = "gpt-4o") -> str: |
|
|
""" |
|
|
Truncate context to stay safely within model limits (~128k tokens). |
|
|
""" |
|
|
try: |
|
|
import tiktoken |
|
|
enc = tiktoken.encoding_for_model(model) |
|
|
except Exception: |
|
|
try: |
|
|
import tiktoken |
|
|
enc = tiktoken.get_encoding("cl100k_base") |
|
|
except Exception: |
|
|
return context_text[: max_tokens * 4] |
|
|
|
|
|
tokens = enc.encode(context_text) |
|
|
if len(tokens) > max_tokens: |
|
|
truncated = enc.decode(tokens[:max_tokens]) |
|
|
print(f"⚠️ Context truncated from {len(tokens):,} → {max_tokens:,} tokens.") |
|
|
return truncated |
|
|
return context_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_answer(query: str, retrieved_chunks: list, reasoning_mode: bool = False): |
|
|
""" |
|
|
Generates an English answer using GPT-4o (SAP Gen AI Hub proxy). |
|
|
Handles both strict and reasoning modes with smart fallback guidance. |
|
|
""" |
|
|
if not retrieved_chunks: |
|
|
return "Sorry, I couldn’t find relevant information in the document." |
|
|
|
|
|
try: |
|
|
chat_llm_local = get_chat_llm() |
|
|
except Exception: |
|
|
return "⚠️ GPT-4o not initialized. Check credentials or rebuild the Space." |
|
|
|
|
|
|
|
|
context = "\n".join(chunk.strip() for chunk in retrieved_chunks) |
|
|
context = "\n".join(dict.fromkeys(context.splitlines())) |
|
|
context = truncate_context(context, 100000) |
|
|
|
|
|
prompt = (REASONING_PROMPT if reasoning_mode else STRICT_PROMPT).format( |
|
|
context=context, query=query |
|
|
) |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": ( |
|
|
"You are an expert enterprise documentation assistant. " |
|
|
"When reasoning_mode is off, stay strictly factual and concise. " |
|
|
"When reasoning_mode is on, combine insights across chunks logically. " |
|
|
"If the answer is not in the document, reply exactly: " |
|
|
"'I don't know based on the provided document.'" |
|
|
)}, |
|
|
{"role": "user", "content": prompt}, |
|
|
] |
|
|
|
|
|
try: |
|
|
response = chat_llm_local.invoke(messages) |
|
|
output = response.content.strip() |
|
|
|
|
|
|
|
|
if "I don't know based on the provided document" in output: |
|
|
if reasoning_mode: |
|
|
output = ( |
|
|
"I couldn’t infer enough from the context. " |
|
|
"Try rephrasing your question for a clearer reasoning path." |
|
|
) |
|
|
else: |
|
|
output = ( |
|
|
"I couldn’t find a clear answer in this document. " |
|
|
"You can try rephrasing the query or switch to Extended Mode " |
|
|
"(Document + General) for a broader explanation." |
|
|
) |
|
|
|
|
|
return output |
|
|
|
|
|
except Exception as e: |
|
|
print(f"⚠️ GPT-4o generation failed: {e}") |
|
|
return "⚠️ Error: Could not generate an answer." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def genai_generate(prompt: str) -> str: |
|
|
try: |
|
|
chat_llm_local = get_chat_llm() |
|
|
except Exception: |
|
|
raise RuntimeError("⚠️ GPT-4o not initialized. Check credentials or rebuild the Space.") |
|
|
|
|
|
messages = [ |
|
|
{"role": "system", "content": "You are a concise, intelligent text generator."}, |
|
|
{"role": "user", "content": prompt.strip()}, |
|
|
] |
|
|
|
|
|
try: |
|
|
response = chat_llm_local.invoke(messages) |
|
|
return response.content.strip() |
|
|
except Exception as e: |
|
|
print(f"⚠️ genai_generate() failed: {e}") |
|
|
return "⚠️ Unable to generate response." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
from vectorstore import build_faiss_index |
|
|
|
|
|
dummy_chunks = [ |
|
|
"- Step 1: Enable order confirmation capability.", |
|
|
"- Step 2: Configure supplier email.", |
|
|
"Setup instructions and configuration details.", |
|
|
"Prerequisites for automation are described here." |
|
|
] |
|
|
|
|
|
embeddings = embed_chunks(dummy_chunks) |
|
|
index = build_faiss_index(embeddings) |
|
|
|
|
|
query = "What are the prerequisites for commerce automation?" |
|
|
retrieved = retrieve_chunks(query, index, dummy_chunks) |
|
|
print("🔍 Retrieved:", retrieved) |
|
|
print("💬 Answer:", generate_answer(query, retrieved, reasoning_mode=False)) |
|
|
|