Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import re | |
| from typing import List, Dict | |
| from app.core.model_loader import llm_engine | |
| logger = logging.getLogger(__name__) | |
| class AIReviewerService: | |
| def __init__(self): | |
| pass | |
| def review_batch_code(self, files: list) -> list: | |
| results = [] | |
| # Process in batches of 5 to stay within Gemini Free Tier limits (15 RPM) | |
| batch_size = 5 | |
| for i in range(0, len(files), batch_size): | |
| batch = files[i : i + batch_size] | |
| combined_code = "" | |
| file_names = [] | |
| for f in batch: | |
| # Minify code to save tokens | |
| raw_content = f.content or "" | |
| # Limit to 6k chars per file to stay within context window safely | |
| minified_content = self._minify_code(raw_content[:6000], f.fileName) | |
| combined_code += f"\n--- FILE: {f.fileName} ---\n{minified_content}\n" | |
| file_names.append(f.fileName) | |
| # Updated prompt to explicitly request suggestions and improvements | |
| prompt = f""" | |
| Analyze the following {len(batch)} source code files for security vulnerabilities and code quality. | |
| {combined_code} | |
| Task: | |
| 1. Detect severe security/logic issues (Vulnerabilities). | |
| 2. If a vulnerability is found, provide a concise 'suggestion' on how to fix it. | |
| 3. If NO vulnerabilities are found in a file, provide a list of 'improvement_suggestions' (clean code, performance, or architecture tips). | |
| 4. Provide metrics for complexity and maintainability (scale 1-10). | |
| Output a JSON array (exactly 1 object per file): | |
| [ | |
| {{ | |
| "fileName": "exact/path/from/header", | |
| "vulnerabilities": [ | |
| {{ | |
| "type": "SQLi/Logic/etc", | |
| "line": 10, | |
| "description": "Short explanation", | |
| "suggestion": "Specific code fix" | |
| }} | |
| ], | |
| "improvement_suggestions": ["Tip 1", "Tip 2"], | |
| "metrics": {{"complexity": 3, "maintainability": 8}} | |
| }} | |
| ] | |
| """ | |
| try: | |
| # 8k output tokens for the batch analysis | |
| response_text = llm_engine.generate(prompt, max_tokens=8192) | |
| batch_results = self._parse_json(response_text) | |
| # Map results by fileName for easy lookup | |
| processed_map = {item.get('fileName'): item for item in batch_results if isinstance(item, dict)} | |
| for fn in file_names: | |
| if fn in processed_map: | |
| res = processed_map[fn] | |
| # Ensure all required keys exist to prevent Android serialization errors | |
| res.setdefault("vulnerabilities", []) | |
| res.setdefault("improvement_suggestions", []) | |
| res.setdefault("metrics", {"complexity": 5, "maintainability": 5}) | |
| # Ensure every vulnerability has a suggestion field | |
| for vuln in res["vulnerabilities"]: | |
| if "suggestion" not in vuln: | |
| vuln["suggestion"] = "Review the implementation logic for improved safety." | |
| results.append(res) | |
| else: | |
| # Fallback if the AI skipped a file in its response | |
| results.append({ | |
| "fileName": fn, | |
| "vulnerabilities": [], | |
| "improvement_suggestions": ["No immediate improvements identified."], | |
| "metrics": {"complexity": 1, "maintainability": 10} | |
| }) | |
| except Exception as e: | |
| logger.error(f"Batch processing error: {e}") | |
| for fn in file_names: | |
| results.append({ | |
| "fileName": fn, | |
| "vulnerabilities": [], | |
| "improvement_suggestions": [], | |
| "metrics": {"complexity": 0, "maintainability": 0} | |
| }) | |
| return results | |
| def _minify_code(self, code: str, filename: str) -> str: | |
| """ | |
| Removes comments, empty lines, and logs to optimize token usage. | |
| """ | |
| lines = code.split('\n') | |
| cleaned_lines = [] | |
| is_python = filename.endswith('.py') | |
| is_js_style = filename.endswith(('.js', '.ts', '.jsx', '.tsx', '.java', '.kt', '.c', '.cpp')) | |
| for line in lines: | |
| stripped = line.strip() | |
| if not stripped: continue | |
| # Skip comments | |
| if is_python and stripped.startswith('#'): continue | |
| if is_js_style and stripped.startswith(('//', '/*', '*')): continue | |
| # Skip common logging | |
| if any(log in stripped for log in ['console.log', 'print(', 'logger.', 'Log.d', 'Log.e']): | |
| continue | |
| # Strip inline comments | |
| if is_python and ' #' in line: line = line.split(' #', 1)[0] | |
| if is_js_style and ' //' in line: line = line.split(' //', 1)[0] | |
| if not line.strip(): continue | |
| cleaned_lines.append(line.rstrip()) | |
| return '\n'.join(cleaned_lines) | |
| def _parse_json(self, text: str): | |
| """ | |
| Cleans and parses the LLM response into a Python list/dict. | |
| """ | |
| text = text.strip() | |
| if not text: | |
| return [] | |
| # Clean markdown formatting if present | |
| text = re.sub(r"^[^[]*\[", "[", text) | |
| text = re.sub(r"\][^]]*$", "]", text) | |
| text = text.replace("```json", "").replace("```", "").strip() | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError as e: | |
| logger.warning(f"JSON Decode Error: {e}") | |
| return [] | |
| # Instantiate the service | |
| service = AIReviewerService() |