import re from typing import List, Optional from schemas import CodeTaskType, CodeXResponse, ResponseMeta, RetrievedEvidence def clean_text(text: Optional[str]) -> str: if not text: return "" text = str(text).strip() text = re.sub(r"\r\n", "\n", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def extract_section(text: str, label: str) -> Optional[str]: pattern = rf"{label}:\s*(.*?)(?=\n[A-Z][A-Za-z ]*:\s|\Z)" match = re.search(pattern, text, flags=re.DOTALL) if match: value = clean_text(match.group(1)) return value if value else None return None def extract_code_block(text: str) -> Optional[str]: fenced = re.findall(r"```(?:\w+)?\n(.*?)```", text, flags=re.DOTALL) if fenced: return clean_text(fenced[0]) code_section = extract_section(text, "Code") if code_section: return code_section return None def extract_explanation(text: str, task_type: CodeTaskType) -> Optional[str]: if task_type == CodeTaskType.FIX: root_cause = extract_section(text, "Root Cause") explanation = extract_section(text, "Explanation") return explanation or root_cause if task_type == CodeTaskType.REVIEW: review = extract_section(text, "Review") suggestions = extract_section(text, "Suggestions") if review and suggestions: return f"{review}\n\nSuggestions:\n{suggestions}" return review or suggestions if task_type in { CodeTaskType.REFACTOR, CodeTaskType.EXPLAIN, CodeTaskType.GENERATE, }: return extract_section(text, "Explanation") return ( extract_section(text, "Explanation") or extract_section(text, "Root Cause") or extract_section(text, "Review") or extract_section(text, "Suggestions") ) def strip_structured_sections(text: str) -> str: text = re.sub(r"```(?:\w+)?\n.*?```", "", text, flags=re.DOTALL) text = re.sub(r"Code:\s*.*", "", text, flags=re.DOTALL) text = re.sub(r"Explanation:\s*.*", "", text, flags=re.DOTALL) text = re.sub(r"Root Cause:\s*.*", "", text, flags=re.DOTALL) text = re.sub(r"Review:\s*.*", "", text, flags=re.DOTALL) text = re.sub(r"Suggestions:\s*.*", "", text, flags=re.DOTALL) return clean_text(text) def normalize_summary_text(text: str) -> str: text = clean_text(text) if not text: return "" text = re.sub(r"^\d+\.\s*$", "", text).strip() text = re.sub(r"^\d+\.\s+", "", text) text = re.sub(r"^[-*]\s+", "", text) text = re.sub(r"^\(\d+\)\s+", "", text) return clean_text(text) def first_sentence(text: str) -> str: text = normalize_summary_text(text) if not text: return "" sentences = re.split(r"(?<=[.!?])\s+", text) for sentence in sentences: cleaned = normalize_summary_text(sentence) if cleaned and cleaned not in {"1.", "2.", "3.", "4.", "5."}: return cleaned return text def first_meaningful_line(text: str) -> str: text = clean_text(text) if not text: return "" for line in text.splitlines(): cleaned = normalize_summary_text(line) if not cleaned: continue if cleaned.lower().startswith("suggestions:"): continue if cleaned not in {"1.", "2.", "3.", "4.", "5."}: return cleaned return "" def build_review_summary(review_text: str) -> str: first_line = first_meaningful_line(review_text) if first_line: return first_sentence(first_line) return "Code review completed." def extract_original_callable_name(raw_text: str) -> Optional[str]: patterns = [ r"\bdef\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", r"\bclass\s+([A-Za-z_][A-Za-z0-9_]*)\s*[:(]", r"\bfunction\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", ] for pattern in patterns: match = re.search(pattern, raw_text or "") if match: return match.group(1) return None def replace_callable_name(code_output: str, old_name: str, new_name: str) -> str: if not code_output or not old_name or not new_name or old_name == new_name: return code_output patterns = [ (rf"(\bdef\s+){re.escape(old_name)}(\s*\()", rf"\1{new_name}\2"), (rf"(\bclass\s+){re.escape(old_name)}(\b)", rf"\1{new_name}\2"), (rf"(\bfunction\s+){re.escape(old_name)}(\s*\()", rf"\1{new_name}\2"), ] updated = code_output for pattern, replacement in patterns: updated = re.sub(pattern, replacement, updated, count=1) return updated def align_refactor_names_with_input(raw_text: str, code_output: Optional[str]) -> Optional[str]: if not code_output: return None original_name = extract_original_callable_name(raw_text) new_name = extract_original_callable_name(code_output) if original_name and new_name and original_name != new_name: return replace_callable_name(code_output, new_name, original_name) return code_output def should_include_code_output( task_type: CodeTaskType, code_output: Optional[str], explanation: Optional[str], ) -> Optional[str]: if not code_output: return None if task_type == CodeTaskType.EXPLAIN: return None if task_type == CodeTaskType.REVIEW: if explanation: lowered = explanation.lower() review_only_signals = [ "consider", "suggest", "add a check", "type hints", "docstring", "could improve", "readability", "maintainability", "edge case", ] if any(signal in lowered for signal in review_only_signals): return None return code_output def build_main_answer( task_type: CodeTaskType, raw_text: str, explanation: Optional[str], code_output: Optional[str], ) -> str: cleaned = clean_text(raw_text) if task_type == CodeTaskType.GENERATE: if explanation: return first_sentence(explanation) if code_output: return "Code generated successfully." return "Generation completed." if task_type == CodeTaskType.EXPLAIN: if explanation: return first_sentence(explanation) stripped = strip_structured_sections(cleaned) if stripped: return first_sentence(stripped) return "Code explanation generated." if task_type == CodeTaskType.FIX: root_cause = extract_section(cleaned, "Root Cause") if root_cause: return first_sentence(root_cause) if explanation: return first_sentence(explanation) if code_output: return "Code fix generated successfully." return "Fix completed." if task_type == CodeTaskType.REVIEW: review = extract_section(cleaned, "Review") if review: return build_review_summary(review) if explanation: return build_review_summary(explanation) stripped = strip_structured_sections(cleaned) if stripped: return first_sentence(stripped) return "Code review completed." if task_type == CodeTaskType.REFACTOR: if explanation: return first_sentence(explanation) if code_output: return "Code refactored successfully." return "Refactor completed." stripped = strip_structured_sections(cleaned) if stripped: return first_sentence(stripped) if explanation: return first_sentence(explanation) if code_output: return "Request processed successfully." return "Request processed successfully." def build_response( task_type: CodeTaskType, model_output: str, model_used: str, used_fallback: bool, retrieval_used: bool = False, source_count: int = 0, processing_time_ms: Optional[int] = None, original_code: Optional[str] = None, sources: Optional[List[RetrievedEvidence]] = None, ) -> CodeXResponse: cleaned_output = clean_text(model_output) raw_code_output = extract_code_block(cleaned_output) explanation = extract_explanation(cleaned_output, task_type) if task_type == CodeTaskType.REFACTOR and original_code: raw_code_output = align_refactor_names_with_input(original_code, raw_code_output) code_output = should_include_code_output(task_type, raw_code_output, explanation) answer = build_main_answer(task_type, cleaned_output, explanation, code_output) warnings = [] if not cleaned_output: warnings.append("Model returned an empty response.") if task_type == CodeTaskType.REFACTOR and original_code and raw_code_output and code_output: original_name = extract_original_callable_name(original_code) final_name = extract_original_callable_name(code_output) if original_name and final_name and original_name != final_name: warnings.append("Refactor output may have changed original naming unexpectedly.") if task_type == CodeTaskType.FIX and not code_output and cleaned_output: warnings.append("Fix response did not include code output.") return CodeXResponse( task_type=task_type, answer=answer, code_output=code_output, explanation=explanation, warnings=warnings, sources=sources or [], needs_clarification=False, meta=ResponseMeta( used_model=model_used, fallback_used=used_fallback, retrieval_used=retrieval_used, source_count=source_count, processing_time_ms=processing_time_ms, ), ) def build_error_response( task_type: CodeTaskType, error_message: str, processing_time_ms: Optional[int] = None, ) -> CodeXResponse: cleaned_error = clean_text(error_message) return CodeXResponse( task_type=task_type, answer="Request processing failed.", code_output=None, explanation=None, warnings=[cleaned_error] if cleaned_error else ["Unknown error occurred."], sources=[], needs_clarification=False, meta=ResponseMeta( used_model="none", fallback_used=False, retrieval_used=False, source_count=0, processing_time_ms=processing_time_ms, ), )