| import re |
| from typing import List, Optional |
|
|
| from schemas import CodeTaskType, CodeXResponse, ResponseMeta, RetrievedEvidence |
|
|
|
|
| def clean_text(text: Optional[str]) -> str: |
| if not text: |
| return "" |
| text = str(text).strip() |
| text = re.sub(r"\r\n", "\n", text) |
| text = re.sub(r"\n{3,}", "\n\n", text) |
| return text.strip() |
|
|
|
|
| def extract_section(text: str, label: str) -> Optional[str]: |
| pattern = rf"{label}:\s*(.*?)(?=\n[A-Z][A-Za-z ]*:\s|\Z)" |
| match = re.search(pattern, text, flags=re.DOTALL) |
| if match: |
| value = clean_text(match.group(1)) |
| return value if value else None |
| return None |
|
|
|
|
| def extract_code_block(text: str) -> Optional[str]: |
| fenced = re.findall(r"```(?:\w+)?\n(.*?)```", text, flags=re.DOTALL) |
| if fenced: |
| return clean_text(fenced[0]) |
|
|
| code_section = extract_section(text, "Code") |
| if code_section: |
| return code_section |
|
|
| return None |
|
|
|
|
| def extract_explanation(text: str, task_type: CodeTaskType) -> Optional[str]: |
| if task_type == CodeTaskType.FIX: |
| root_cause = extract_section(text, "Root Cause") |
| explanation = extract_section(text, "Explanation") |
| return explanation or root_cause |
|
|
| if task_type == CodeTaskType.REVIEW: |
| review = extract_section(text, "Review") |
| suggestions = extract_section(text, "Suggestions") |
| if review and suggestions: |
| return f"{review}\n\nSuggestions:\n{suggestions}" |
| return review or suggestions |
|
|
| if task_type in { |
| CodeTaskType.REFACTOR, |
| CodeTaskType.EXPLAIN, |
| CodeTaskType.GENERATE, |
| }: |
| return extract_section(text, "Explanation") |
|
|
| return ( |
| extract_section(text, "Explanation") |
| or extract_section(text, "Root Cause") |
| or extract_section(text, "Review") |
| or extract_section(text, "Suggestions") |
| ) |
|
|
|
|
| def strip_structured_sections(text: str) -> str: |
| text = re.sub(r"```(?:\w+)?\n.*?```", "", text, flags=re.DOTALL) |
| text = re.sub(r"Code:\s*.*", "", text, flags=re.DOTALL) |
| text = re.sub(r"Explanation:\s*.*", "", text, flags=re.DOTALL) |
| text = re.sub(r"Root Cause:\s*.*", "", text, flags=re.DOTALL) |
| text = re.sub(r"Review:\s*.*", "", text, flags=re.DOTALL) |
| text = re.sub(r"Suggestions:\s*.*", "", text, flags=re.DOTALL) |
| return clean_text(text) |
|
|
|
|
| def normalize_summary_text(text: str) -> str: |
| text = clean_text(text) |
| if not text: |
| return "" |
|
|
| text = re.sub(r"^\d+\.\s*$", "", text).strip() |
| text = re.sub(r"^\d+\.\s+", "", text) |
| text = re.sub(r"^[-*]\s+", "", text) |
| text = re.sub(r"^\(\d+\)\s+", "", text) |
| return clean_text(text) |
|
|
|
|
| def first_sentence(text: str) -> str: |
| text = normalize_summary_text(text) |
| if not text: |
| return "" |
|
|
| sentences = re.split(r"(?<=[.!?])\s+", text) |
| for sentence in sentences: |
| cleaned = normalize_summary_text(sentence) |
| if cleaned and cleaned not in {"1.", "2.", "3.", "4.", "5."}: |
| return cleaned |
|
|
| return text |
|
|
|
|
| def first_meaningful_line(text: str) -> str: |
| text = clean_text(text) |
| if not text: |
| return "" |
|
|
| for line in text.splitlines(): |
| cleaned = normalize_summary_text(line) |
| if not cleaned: |
| continue |
| if cleaned.lower().startswith("suggestions:"): |
| continue |
| if cleaned not in {"1.", "2.", "3.", "4.", "5."}: |
| return cleaned |
|
|
| return "" |
|
|
|
|
| def build_review_summary(review_text: str) -> str: |
| first_line = first_meaningful_line(review_text) |
| if first_line: |
| return first_sentence(first_line) |
| return "Code review completed." |
|
|
|
|
| def extract_original_callable_name(raw_text: str) -> Optional[str]: |
| patterns = [ |
| r"\bdef\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", |
| r"\bclass\s+([A-Za-z_][A-Za-z0-9_]*)\s*[:(]", |
| r"\bfunction\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(", |
| ] |
| for pattern in patterns: |
| match = re.search(pattern, raw_text or "") |
| if match: |
| return match.group(1) |
| return None |
|
|
|
|
| def replace_callable_name(code_output: str, old_name: str, new_name: str) -> str: |
| if not code_output or not old_name or not new_name or old_name == new_name: |
| return code_output |
|
|
| patterns = [ |
| (rf"(\bdef\s+){re.escape(old_name)}(\s*\()", rf"\1{new_name}\2"), |
| (rf"(\bclass\s+){re.escape(old_name)}(\b)", rf"\1{new_name}\2"), |
| (rf"(\bfunction\s+){re.escape(old_name)}(\s*\()", rf"\1{new_name}\2"), |
| ] |
|
|
| updated = code_output |
| for pattern, replacement in patterns: |
| updated = re.sub(pattern, replacement, updated, count=1) |
| return updated |
|
|
|
|
| def align_refactor_names_with_input(raw_text: str, code_output: Optional[str]) -> Optional[str]: |
| if not code_output: |
| return None |
|
|
| original_name = extract_original_callable_name(raw_text) |
| new_name = extract_original_callable_name(code_output) |
|
|
| if original_name and new_name and original_name != new_name: |
| return replace_callable_name(code_output, new_name, original_name) |
|
|
| return code_output |
|
|
|
|
| def should_include_code_output( |
| task_type: CodeTaskType, |
| code_output: Optional[str], |
| explanation: Optional[str], |
| ) -> Optional[str]: |
| if not code_output: |
| return None |
|
|
| if task_type == CodeTaskType.EXPLAIN: |
| return None |
|
|
| if task_type == CodeTaskType.REVIEW: |
| if explanation: |
| lowered = explanation.lower() |
| review_only_signals = [ |
| "consider", |
| "suggest", |
| "add a check", |
| "type hints", |
| "docstring", |
| "could improve", |
| "readability", |
| "maintainability", |
| "edge case", |
| ] |
| if any(signal in lowered for signal in review_only_signals): |
| return None |
|
|
| return code_output |
|
|
|
|
| def build_main_answer( |
| task_type: CodeTaskType, |
| raw_text: str, |
| explanation: Optional[str], |
| code_output: Optional[str], |
| ) -> str: |
| cleaned = clean_text(raw_text) |
|
|
| if task_type == CodeTaskType.GENERATE: |
| if explanation: |
| return first_sentence(explanation) |
| if code_output: |
| return "Code generated successfully." |
| return "Generation completed." |
|
|
| if task_type == CodeTaskType.EXPLAIN: |
| if explanation: |
| return first_sentence(explanation) |
| stripped = strip_structured_sections(cleaned) |
| if stripped: |
| return first_sentence(stripped) |
| return "Code explanation generated." |
|
|
| if task_type == CodeTaskType.FIX: |
| root_cause = extract_section(cleaned, "Root Cause") |
| if root_cause: |
| return first_sentence(root_cause) |
| if explanation: |
| return first_sentence(explanation) |
| if code_output: |
| return "Code fix generated successfully." |
| return "Fix completed." |
|
|
| if task_type == CodeTaskType.REVIEW: |
| review = extract_section(cleaned, "Review") |
| if review: |
| return build_review_summary(review) |
| if explanation: |
| return build_review_summary(explanation) |
| stripped = strip_structured_sections(cleaned) |
| if stripped: |
| return first_sentence(stripped) |
| return "Code review completed." |
|
|
| if task_type == CodeTaskType.REFACTOR: |
| if explanation: |
| return first_sentence(explanation) |
| if code_output: |
| return "Code refactored successfully." |
| return "Refactor completed." |
|
|
| stripped = strip_structured_sections(cleaned) |
| if stripped: |
| return first_sentence(stripped) |
|
|
| if explanation: |
| return first_sentence(explanation) |
|
|
| if code_output: |
| return "Request processed successfully." |
|
|
| return "Request processed successfully." |
|
|
|
|
| def build_response( |
| task_type: CodeTaskType, |
| model_output: str, |
| model_used: str, |
| used_fallback: bool, |
| retrieval_used: bool = False, |
| source_count: int = 0, |
| processing_time_ms: Optional[int] = None, |
| original_code: Optional[str] = None, |
| sources: Optional[List[RetrievedEvidence]] = None, |
| ) -> CodeXResponse: |
| cleaned_output = clean_text(model_output) |
|
|
| raw_code_output = extract_code_block(cleaned_output) |
| explanation = extract_explanation(cleaned_output, task_type) |
|
|
| if task_type == CodeTaskType.REFACTOR and original_code: |
| raw_code_output = align_refactor_names_with_input(original_code, raw_code_output) |
|
|
| code_output = should_include_code_output(task_type, raw_code_output, explanation) |
| answer = build_main_answer(task_type, cleaned_output, explanation, code_output) |
|
|
| warnings = [] |
| if not cleaned_output: |
| warnings.append("Model returned an empty response.") |
|
|
| if task_type == CodeTaskType.REFACTOR and original_code and raw_code_output and code_output: |
| original_name = extract_original_callable_name(original_code) |
| final_name = extract_original_callable_name(code_output) |
| if original_name and final_name and original_name != final_name: |
| warnings.append("Refactor output may have changed original naming unexpectedly.") |
|
|
| if task_type == CodeTaskType.FIX and not code_output and cleaned_output: |
| warnings.append("Fix response did not include code output.") |
|
|
| return CodeXResponse( |
| task_type=task_type, |
| answer=answer, |
| code_output=code_output, |
| explanation=explanation, |
| warnings=warnings, |
| sources=sources or [], |
| needs_clarification=False, |
| meta=ResponseMeta( |
| used_model=model_used, |
| fallback_used=used_fallback, |
| retrieval_used=retrieval_used, |
| source_count=source_count, |
| processing_time_ms=processing_time_ms, |
| ), |
| ) |
|
|
|
|
| def build_error_response( |
| task_type: CodeTaskType, |
| error_message: str, |
| processing_time_ms: Optional[int] = None, |
| ) -> CodeXResponse: |
| cleaned_error = clean_text(error_message) |
|
|
| return CodeXResponse( |
| task_type=task_type, |
| answer="Request processing failed.", |
| code_output=None, |
| explanation=None, |
| warnings=[cleaned_error] if cleaned_error else ["Unknown error occurred."], |
| sources=[], |
| needs_clarification=False, |
| meta=ResponseMeta( |
| used_model="none", |
| fallback_used=False, |
| retrieval_used=False, |
| source_count=0, |
| processing_time_ms=processing_time_ms, |
| ), |
| ) |