import json import logging import re from typing import List, Dict from app.core.model_loader import llm_engine logger = logging.getLogger(__name__) class AIReviewerService: def __init__(self): pass def review_batch_code(self, files: list) -> list: results = [] # Process in batches of 5 to stay within Gemini Free Tier limits (15 RPM) batch_size = 5 for i in range(0, len(files), batch_size): batch = files[i : i + batch_size] combined_code = "" file_names = [] for f in batch: # Minify code to save tokens raw_content = f.content or "" # Limit to 6k chars per file to stay within context window safely minified_content = self._minify_code(raw_content[:6000], f.fileName) combined_code += f"\n--- FILE: {f.fileName} ---\n{minified_content}\n" file_names.append(f.fileName) # Updated prompt to explicitly request suggestions and improvements prompt = f""" Analyze the following {len(batch)} source code files for security vulnerabilities and code quality. {combined_code} Task: 1. Detect severe security/logic issues (Vulnerabilities). 2. If a vulnerability is found, provide a concise 'suggestion' on how to fix it. 3. If NO vulnerabilities are found in a file, provide a list of 'improvement_suggestions' (clean code, performance, or architecture tips). 4. Provide metrics for complexity and maintainability (scale 1-10). Output a JSON array (exactly 1 object per file): [ {{ "fileName": "exact/path/from/header", "vulnerabilities": [ {{ "type": "SQLi/Logic/etc", "line": 10, "description": "Short explanation", "suggestion": "Specific code fix" }} ], "improvement_suggestions": ["Tip 1", "Tip 2"], "metrics": {{"complexity": 3, "maintainability": 8}} }} ] """ try: # 8k output tokens for the batch analysis response_text = llm_engine.generate(prompt, max_tokens=8192) batch_results = self._parse_json(response_text) # Map results by fileName for easy lookup processed_map = {item.get('fileName'): item for item in batch_results if isinstance(item, dict)} for fn in file_names: if fn in processed_map: res = processed_map[fn] # Ensure all required keys exist to prevent Android serialization errors res.setdefault("vulnerabilities", []) res.setdefault("improvement_suggestions", []) res.setdefault("metrics", {"complexity": 5, "maintainability": 5}) # Ensure every vulnerability has a suggestion field for vuln in res["vulnerabilities"]: if "suggestion" not in vuln: vuln["suggestion"] = "Review the implementation logic for improved safety." results.append(res) else: # Fallback if the AI skipped a file in its response results.append({ "fileName": fn, "vulnerabilities": [], "improvement_suggestions": ["No immediate improvements identified."], "metrics": {"complexity": 1, "maintainability": 10} }) except Exception as e: logger.error(f"Batch processing error: {e}") for fn in file_names: results.append({ "fileName": fn, "vulnerabilities": [], "improvement_suggestions": [], "metrics": {"complexity": 0, "maintainability": 0} }) return results def _minify_code(self, code: str, filename: str) -> str: """ Removes comments, empty lines, and logs to optimize token usage. """ lines = code.split('\n') cleaned_lines = [] is_python = filename.endswith('.py') is_js_style = filename.endswith(('.js', '.ts', '.jsx', '.tsx', '.java', '.kt', '.c', '.cpp')) for line in lines: stripped = line.strip() if not stripped: continue # Skip comments if is_python and stripped.startswith('#'): continue if is_js_style and stripped.startswith(('//', '/*', '*')): continue # Skip common logging if any(log in stripped for log in ['console.log', 'print(', 'logger.', 'Log.d', 'Log.e']): continue # Strip inline comments if is_python and ' #' in line: line = line.split(' #', 1)[0] if is_js_style and ' //' in line: line = line.split(' //', 1)[0] if not line.strip(): continue cleaned_lines.append(line.rstrip()) return '\n'.join(cleaned_lines) def _parse_json(self, text: str): """ Cleans and parses the LLM response into a Python list/dict. """ text = text.strip() if not text: return [] # Clean markdown formatting if present text = re.sub(r"^[^[]*\[", "[", text) text = re.sub(r"\][^]]*$", "]", text) text = text.replace("```json", "").replace("```", "").strip() try: return json.loads(text) except json.JSONDecodeError as e: logger.warning(f"JSON Decode Error: {e}") return [] # Instantiate the service service = AIReviewerService()