Chat7-CodeX-Backend / response_formatter.py
hashan-7's picture
Update code
cb1a883 verified
import re
from typing import List, Optional
from schemas import CodeTaskType, CodeXResponse, ResponseMeta, RetrievedEvidence
def clean_text(text: Optional[str]) -> str:
if not text:
return ""
text = str(text).strip()
text = re.sub(r"\r\n", "\n", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def extract_section(text: str, label: str) -> Optional[str]:
pattern = rf"{label}:\s*(.*?)(?=\n[A-Z][A-Za-z ]*:\s|\Z)"
match = re.search(pattern, text, flags=re.DOTALL)
if match:
value = clean_text(match.group(1))
return value if value else None
return None
def extract_code_block(text: str) -> Optional[str]:
fenced = re.findall(r"```(?:\w+)?\n(.*?)```", text, flags=re.DOTALL)
if fenced:
return clean_text(fenced[0])
code_section = extract_section(text, "Code")
if code_section:
return code_section
return None
def extract_explanation(text: str, task_type: CodeTaskType) -> Optional[str]:
if task_type == CodeTaskType.FIX:
root_cause = extract_section(text, "Root Cause")
explanation = extract_section(text, "Explanation")
return explanation or root_cause
if task_type == CodeTaskType.REVIEW:
review = extract_section(text, "Review")
suggestions = extract_section(text, "Suggestions")
if review and suggestions:
return f"{review}\n\nSuggestions:\n{suggestions}"
return review or suggestions
if task_type in {
CodeTaskType.REFACTOR,
CodeTaskType.EXPLAIN,
CodeTaskType.GENERATE,
}:
return extract_section(text, "Explanation")
return (
extract_section(text, "Explanation")
or extract_section(text, "Root Cause")
or extract_section(text, "Review")
or extract_section(text, "Suggestions")
)
def strip_structured_sections(text: str) -> str:
text = re.sub(r"```(?:\w+)?\n.*?```", "", text, flags=re.DOTALL)
text = re.sub(r"Code:\s*.*", "", text, flags=re.DOTALL)
text = re.sub(r"Explanation:\s*.*", "", text, flags=re.DOTALL)
text = re.sub(r"Root Cause:\s*.*", "", text, flags=re.DOTALL)
text = re.sub(r"Review:\s*.*", "", text, flags=re.DOTALL)
text = re.sub(r"Suggestions:\s*.*", "", text, flags=re.DOTALL)
return clean_text(text)
def normalize_summary_text(text: str) -> str:
text = clean_text(text)
if not text:
return ""
text = re.sub(r"^\d+\.\s*$", "", text).strip()
text = re.sub(r"^\d+\.\s+", "", text)
text = re.sub(r"^[-*]\s+", "", text)
text = re.sub(r"^\(\d+\)\s+", "", text)
return clean_text(text)
def first_sentence(text: str) -> str:
text = normalize_summary_text(text)
if not text:
return ""
sentences = re.split(r"(?<=[.!?])\s+", text)
for sentence in sentences:
cleaned = normalize_summary_text(sentence)
if cleaned and cleaned not in {"1.", "2.", "3.", "4.", "5."}:
return cleaned
return text
def first_meaningful_line(text: str) -> str:
text = clean_text(text)
if not text:
return ""
for line in text.splitlines():
cleaned = normalize_summary_text(line)
if not cleaned:
continue
if cleaned.lower().startswith("suggestions:"):
continue
if cleaned not in {"1.", "2.", "3.", "4.", "5."}:
return cleaned
return ""
def build_review_summary(review_text: str) -> str:
first_line = first_meaningful_line(review_text)
if first_line:
return first_sentence(first_line)
return "Code review completed."
def extract_original_callable_name(raw_text: str) -> Optional[str]:
patterns = [
r"\bdef\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(",
r"\bclass\s+([A-Za-z_][A-Za-z0-9_]*)\s*[:(]",
r"\bfunction\s+([A-Za-z_][A-Za-z0-9_]*)\s*\(",
]
for pattern in patterns:
match = re.search(pattern, raw_text or "")
if match:
return match.group(1)
return None
def replace_callable_name(code_output: str, old_name: str, new_name: str) -> str:
if not code_output or not old_name or not new_name or old_name == new_name:
return code_output
patterns = [
(rf"(\bdef\s+){re.escape(old_name)}(\s*\()", rf"\1{new_name}\2"),
(rf"(\bclass\s+){re.escape(old_name)}(\b)", rf"\1{new_name}\2"),
(rf"(\bfunction\s+){re.escape(old_name)}(\s*\()", rf"\1{new_name}\2"),
]
updated = code_output
for pattern, replacement in patterns:
updated = re.sub(pattern, replacement, updated, count=1)
return updated
def align_refactor_names_with_input(raw_text: str, code_output: Optional[str]) -> Optional[str]:
if not code_output:
return None
original_name = extract_original_callable_name(raw_text)
new_name = extract_original_callable_name(code_output)
if original_name and new_name and original_name != new_name:
return replace_callable_name(code_output, new_name, original_name)
return code_output
def should_include_code_output(
task_type: CodeTaskType,
code_output: Optional[str],
explanation: Optional[str],
) -> Optional[str]:
if not code_output:
return None
if task_type == CodeTaskType.EXPLAIN:
return None
if task_type == CodeTaskType.REVIEW:
if explanation:
lowered = explanation.lower()
review_only_signals = [
"consider",
"suggest",
"add a check",
"type hints",
"docstring",
"could improve",
"readability",
"maintainability",
"edge case",
]
if any(signal in lowered for signal in review_only_signals):
return None
return code_output
def build_main_answer(
task_type: CodeTaskType,
raw_text: str,
explanation: Optional[str],
code_output: Optional[str],
) -> str:
cleaned = clean_text(raw_text)
if task_type == CodeTaskType.GENERATE:
if explanation:
return first_sentence(explanation)
if code_output:
return "Code generated successfully."
return "Generation completed."
if task_type == CodeTaskType.EXPLAIN:
if explanation:
return first_sentence(explanation)
stripped = strip_structured_sections(cleaned)
if stripped:
return first_sentence(stripped)
return "Code explanation generated."
if task_type == CodeTaskType.FIX:
root_cause = extract_section(cleaned, "Root Cause")
if root_cause:
return first_sentence(root_cause)
if explanation:
return first_sentence(explanation)
if code_output:
return "Code fix generated successfully."
return "Fix completed."
if task_type == CodeTaskType.REVIEW:
review = extract_section(cleaned, "Review")
if review:
return build_review_summary(review)
if explanation:
return build_review_summary(explanation)
stripped = strip_structured_sections(cleaned)
if stripped:
return first_sentence(stripped)
return "Code review completed."
if task_type == CodeTaskType.REFACTOR:
if explanation:
return first_sentence(explanation)
if code_output:
return "Code refactored successfully."
return "Refactor completed."
stripped = strip_structured_sections(cleaned)
if stripped:
return first_sentence(stripped)
if explanation:
return first_sentence(explanation)
if code_output:
return "Request processed successfully."
return "Request processed successfully."
def build_response(
task_type: CodeTaskType,
model_output: str,
model_used: str,
used_fallback: bool,
retrieval_used: bool = False,
source_count: int = 0,
processing_time_ms: Optional[int] = None,
original_code: Optional[str] = None,
sources: Optional[List[RetrievedEvidence]] = None,
) -> CodeXResponse:
cleaned_output = clean_text(model_output)
raw_code_output = extract_code_block(cleaned_output)
explanation = extract_explanation(cleaned_output, task_type)
if task_type == CodeTaskType.REFACTOR and original_code:
raw_code_output = align_refactor_names_with_input(original_code, raw_code_output)
code_output = should_include_code_output(task_type, raw_code_output, explanation)
answer = build_main_answer(task_type, cleaned_output, explanation, code_output)
warnings = []
if not cleaned_output:
warnings.append("Model returned an empty response.")
if task_type == CodeTaskType.REFACTOR and original_code and raw_code_output and code_output:
original_name = extract_original_callable_name(original_code)
final_name = extract_original_callable_name(code_output)
if original_name and final_name and original_name != final_name:
warnings.append("Refactor output may have changed original naming unexpectedly.")
if task_type == CodeTaskType.FIX and not code_output and cleaned_output:
warnings.append("Fix response did not include code output.")
return CodeXResponse(
task_type=task_type,
answer=answer,
code_output=code_output,
explanation=explanation,
warnings=warnings,
sources=sources or [],
needs_clarification=False,
meta=ResponseMeta(
used_model=model_used,
fallback_used=used_fallback,
retrieval_used=retrieval_used,
source_count=source_count,
processing_time_ms=processing_time_ms,
),
)
def build_error_response(
task_type: CodeTaskType,
error_message: str,
processing_time_ms: Optional[int] = None,
) -> CodeXResponse:
cleaned_error = clean_text(error_message)
return CodeXResponse(
task_type=task_type,
answer="Request processing failed.",
code_output=None,
explanation=None,
warnings=[cleaned_error] if cleaned_error else ["Unknown error occurred."],
sources=[],
needs_clarification=False,
meta=ResponseMeta(
used_model="none",
fallback_used=False,
retrieval_used=False,
source_count=0,
processing_time_ms=processing_time_ms,
),
)