from pathlib import Path # Clean BOM for path in Path("app").rglob("*.py"): text = path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") path.write_text(text, encoding="utf-8") print("BOM cleanup completed.") # ===================================================== # 1. Add answer quality enhancer # ===================================================== Path("app/generation/answer_quality_enhancer.py").write_text(r''' from typing import Any, Dict, List SHORT_ANSWER_WORD_LIMIT = 70 def to_dict(obj: Any) -> Dict[str, Any]: if obj is None: return {} if isinstance(obj, dict): return obj if hasattr(obj, "model_dump"): try: return obj.model_dump() except Exception: pass if hasattr(obj, "dict"): try: return obj.dict() except Exception: pass if hasattr(obj, "__dict__"): try: return dict(obj.__dict__) except Exception: pass return {} def value_from(data: Dict[str, Any], keys: List[str], default: str = "") -> str: for key in keys: value = data.get(key) if value not in [None, ""]: return str(value) metadata = data.get("metadata") if isinstance(metadata, dict): for key in keys: value = metadata.get(key) if value not in [None, ""]: return str(value) return default def text_from_source(source: Dict[str, Any]) -> str: return value_from( source, [ "text", "content", "chunk_text", "page_text", "cleaned_text", "raw_text", "text_preview", "preview", "chunk_preview", "body" ], "" ) def normalize_sources(raw_sources: Any, raw_citations: Any = None) -> List[Dict[str, Any]]: sources = [] if isinstance(raw_sources, list): for item in raw_sources: sources.append(to_dict(item)) if isinstance(raw_citations, list): for item in raw_citations: sources.append(to_dict(item)) cleaned = [] seen = set() for index, source in enumerate(sources): if not source: continue source_id = value_from( source, ["source_id", "citation_id", "id"], f"S{index + 1}" ) chunk_id = value_from( source, ["chunk_id", "source_chunk_id", "chunk", "chunk_index", "id"], source_id ) text = text_from_source(source) document_name = value_from( source, ["document_name", "source_file_name", "file_name", "filename", "document_title"], "Selected document" ) page = value_from( source, ["page_number", "page", "page_no", "page_index"], "Not available" ) key = f"{source_id}|{chunk_id}|{page}" if key in seen: continue seen.add(key) cleaned.append({ "source_id": source_id, "chunk_id": chunk_id, "document_name": document_name, "page": page, "text": text, "raw": source }) return cleaned[:6] def is_answer_too_short(answer: str) -> bool: if not answer: return True word_count = len(answer.split()) if word_count < SHORT_ANSWER_WORD_LIMIT: return True weak_phrases = [ "i could not find", "not enough information", "maternity leave", "rag is retrieval-augmented generation", "the answer is" ] lower = answer.lower().strip() for phrase in weak_phrases: if lower == phrase or lower.startswith(phrase) and word_count < 90: return True return False def source_label(index: int, source: Dict[str, Any]) -> str: sid = source.get("source_id") or f"S{index + 1}" if str(sid).upper().startswith("S"): return str(sid) return f"S{index + 1}" def make_key_points_from_sources(query: str, sources: List[Dict[str, Any]]) -> List[str]: points = [] for index, source in enumerate(sources[:4]): text = source.get("text", "").strip() label = source_label(index, source) if not text: continue cleaned = " ".join(text.split()) if len(cleaned) > 290: cleaned = cleaned[:290].rsplit(" ", 1)[0] + "..." points.append(f"- {cleaned} [{label}]") return points def build_detailed_evidence_answer( query: str, original_answer: str, sources: List[Dict[str, Any]] ) -> str: if not sources: return original_answer or "I could not find enough grounded evidence in the indexed document to answer this clearly." direct_answer = (original_answer or "").strip() if not direct_answer or is_answer_too_short(direct_answer): direct_answer = ( "Based on the retrieved document evidence, the answer is connected to the points below. " "The indexed sources provide supporting context, but the final interpretation should be verified from the cited source chunks." ) key_points = make_key_points_from_sources(query=query, sources=sources) evidence_lines = [] for index, source in enumerate(sources[:5]): label = source_label(index, source) document_name = source.get("document_name", "Selected document") page = source.get("page", "Not available") chunk_id = source.get("chunk_id", label) evidence_lines.append( f"- [{label}] Document: {document_name}; Page: {page}; Chunk: {chunk_id}" ) answer_parts = [] answer_parts.append("Direct answer") answer_parts.append(direct_answer) if key_points: answer_parts.append("\nKey evidence from the document") answer_parts.extend(key_points) answer_parts.append("\nSources used") answer_parts.extend(evidence_lines) answer_parts.append( "\nNote\nThis answer is grounded in the retrieved chunks above. " "If a page number is unavailable, it means the parser did not expose page metadata for that source." ) return "\n".join(answer_parts) def safe_enhance_answer_for_response(local_vars: Dict[str, Any]) -> str: """ Designed to be called from answer_service response dict using locals(). It avoids crashing the /ask endpoint even if variable names differ. """ try: answer = ( local_vars.get("answer") or local_vars.get("final_answer") or local_vars.get("generated_answer") or local_vars.get("response_text") or "" ) query = local_vars.get("query") or "" request_obj = local_vars.get("request") if not query and request_obj is not None: query = getattr(request_obj, "query", "") sources = ( local_vars.get("sourced_results") or local_vars.get("cleaned_results") or local_vars.get("retrieved_results") or local_vars.get("results") or [] ) citations = local_vars.get("citations") or [] normalized_sources = normalize_sources(sources, citations) if is_answer_too_short(answer): return build_detailed_evidence_answer( query=str(query), original_answer=str(answer), sources=normalized_sources ) # If answer is okay but has no citation marker, add source summary. if normalized_sources and "[S" not in str(answer): source_refs = [] for index, source in enumerate(normalized_sources[:3]): label = source_label(index, source) page = source.get("page", "Not available") source_refs.append(f"[{label}: page {page}]") return str(answer).strip() + "\n\nSources: " + ", ".join(source_refs) return str(answer) except Exception: return str( local_vars.get("answer") or local_vars.get("final_answer") or local_vars.get("generated_answer") or local_vars.get("response_text") or "" ) ''', encoding="utf-8") # ===================================================== # 2. Patch answer_service.py safely # ===================================================== answer_path = Path("app/generation/answer_service.py") if not answer_path.exists(): print("WARNING: answer_service.py not found. Created enhancer only.") else: text = answer_path.read_text(encoding="utf-8-sig") text = text.replace("\ufeff", "") if "from app.generation.answer_quality_enhancer import safe_enhance_answer_for_response" not in text: text = ( "from app.generation.answer_quality_enhancer import safe_enhance_answer_for_response\n" + text ) print("Added answer enhancer import.") replacements = { '"answer": answer,': '"answer": safe_enhance_answer_for_response(locals()),', "'answer': answer,": "'answer': safe_enhance_answer_for_response(locals()),", '"answer": final_answer,': '"answer": safe_enhance_answer_for_response(locals()),', "'answer': final_answer,": "'answer': safe_enhance_answer_for_response(locals()),", '"answer": generated_answer,': '"answer": safe_enhance_answer_for_response(locals()),', "'answer': generated_answer,": "'answer': safe_enhance_answer_for_response(locals()),", } changed = False for old, new in replacements.items(): if old in text: text = text.replace(old, new) changed = True print(f"Replaced {old}") if not changed: print("WARNING: Could not find answer return pattern. Enhancer file created but answer_service not wired automatically.") answer_path.write_text(text, encoding="utf-8") # ===================================================== # 3. Make UI default style more detailed # ===================================================== hf_path = Path("app/deployment/hf_status.py") if hf_path.exists(): ui = hf_path.read_text(encoding="utf-8-sig") ui = ui.replace("\ufeff", "") ui = ui.replace( "Answer in a detailed but readable format. Start with a direct answer, then explain important points with evidence. Include citations after key claims.", "Answer in a detailed, useful, and source-grounded format. Use this structure: Direct answer, Key points, Evidence from sources, and Limitations. Mention citations after important claims." ) ui = ui.replace( 'top_k: 7,', 'top_k: 8,' ) ui = ui.replace( 'graph_entity_limit: 10,', 'graph_entity_limit: 12,' ) hf_path.write_text(ui, encoding="utf-8") print("Updated UI answer instruction defaults.") print("Phase 30 better answer quality backend patch complete.")