File size: 6,718 Bytes
02117ee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
"""
CodingAgent — Autonomous code generation, editing, refactoring (Devin/Genspark style)
"""
import json
import os
import re
from typing import Dict, List
import structlog
from .base_agent import BaseAgent

log = structlog.get_logger()

CODING_SYSTEM = """You are an elite autonomous software engineer — like Devin combined with Genspark.
You write production-quality code that is:
- Clean, readable, well-structured
- Properly typed (TypeScript/Python type hints)
- Error-handled and resilient
- Documented with clear comments
- Following best practices for the language/framework

When generating code:
1. Think about the full architecture first
2. Write complete, runnable code (not snippets)
3. Include proper imports
4. Add error handling
5. Include brief usage examples in comments

Support: Python, TypeScript, JavaScript, Go, Rust, SQL, Shell, YAML, JSON
"""


class CodingAgent(BaseAgent):
    def __init__(self, ws_manager=None, ai_router=None):
        super().__init__("CodingAgent", ws_manager, ai_router)

    async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
        session_id = kwargs.get("session_id", "")
        task_id = kwargs.get("task_id", "")

        await self.emit(task_id, "agent_start", {
            "agent": "CodingAgent",
            "task": task[:80],
        }, session_id)

        # Build context-aware messages
        prev_results = context.get("previous_results", [])
        project_ctx = context.get("project_context", "")
        plan = context.get("plan", "")

        system_content = CODING_SYSTEM
        if project_ctx:
            system_content += f"\n\nProject Context:\n{project_ctx[:1000]}"

        user_content = f"Task: {task}"
        if plan:
            user_content += f"\n\nExecution Plan:\n{plan[:500]}"
        if prev_results:
            user_content += f"\n\nPrevious results:\n" + "\n".join(str(r)[:200] for r in prev_results[-3:])

        messages = [
            {"role": "system", "content": system_content},
            {"role": "user", "content": user_content},
        ]

        await self.emit(task_id, "tool_called", {
            "agent": "CodingAgent",
            "tool": "code_generation",
            "step": task[:60],
        }, session_id)

        result = await self.llm(
            messages,
            task_id=task_id,
            session_id=session_id,
            temperature=0.2,
            max_tokens=8192,
        )

        # Extract code blocks for display
        code_blocks = self._extract_code_blocks(result)
        await self.emit(task_id, "code_generated", {
            "agent": "CodingAgent",
            "code_blocks": len(code_blocks),
            "total_lines": sum(len(b.split("\n")) for b in code_blocks),
            "languages": list(set(self._detect_language(b) for b in code_blocks)),
        }, session_id)

        return result

    async def generate_file(
        self,
        filename: str,
        description: str,
        task_id: str = "",
        session_id: str = "",
        context: Dict = {},
    ) -> str:
        """Generate a complete file with proper structure."""
        messages = [
            {"role": "system", "content": CODING_SYSTEM},
            {"role": "user", "content": (
                f"Generate a complete, production-ready file.\n"
                f"Filename: {filename}\n"
                f"Description: {description}\n"
                f"Context: {json.dumps(context)[:500]}\n\n"
                f"Return ONLY the file content, no explanation."
            )},
        ]
        content = await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.1, max_tokens=8192)

        # Strip markdown code fences if present
        content = self._strip_code_fences(content)

        # Write to workspace
        workspace = os.environ.get("WORKSPACE_DIR", "/tmp/god_workspace")
        filepath = os.path.join(workspace, filename)
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        with open(filepath, "w") as f:
            f.write(content)

        await self.emit(task_id, "file_written", {
            "filename": filename,
            "size": len(content),
            "lines": len(content.split("\n")),
        }, session_id)

        return content

    async def refactor(
        self,
        code: str,
        instructions: str,
        task_id: str = "",
        session_id: str = "",
    ) -> str:
        """Refactor existing code based on instructions."""
        messages = [
            {"role": "system", "content": CODING_SYSTEM},
            {"role": "user", "content": (
                f"Refactor this code based on these instructions:\n"
                f"Instructions: {instructions}\n\n"
                f"Original code:\n```\n{code}\n```\n\n"
                f"Return ONLY the refactored code."
            )},
        ]
        return await self.llm(messages, task_id=task_id, session_id=session_id, temperature=0.1, max_tokens=8192)

    async def scan_repository(self, repo_path: str) -> Dict:
        """Scan repository and build project intelligence graph."""
        import subprocess
        try:
            result = subprocess.run(
                ["find", repo_path, "-type", "f", "-name", "*.py", "-o",
                 "-name", "*.ts", "-o", "-name", "*.js", "-o", "-name", "*.go"],
                capture_output=True, text=True, timeout=10
            )
            files = result.stdout.strip().split("\n")[:50]

            # Read key files
            key_files = {}
            for f in ["package.json", "requirements.txt", "tsconfig.json", "pyproject.toml", "go.mod"]:
                path = os.path.join(repo_path, f)
                if os.path.exists(path):
                    with open(path) as fp:
                        key_files[f] = fp.read()[:1000]

            return {
                "files": files,
                "key_configs": key_files,
                "total_files": len(files),
            }
        except Exception as e:
            return {"error": str(e), "files": [], "key_configs": {}}

    def _extract_code_blocks(self, text: str) -> List[str]:
        pattern = r"```[\w]*\n(.*?)```"
        return re.findall(pattern, text, re.DOTALL)

    def _strip_code_fences(self, text: str) -> str:
        text = re.sub(r"^```[\w]*\n", "", text.strip())
        text = re.sub(r"\n```$", "", text)
        return text

    def _detect_language(self, code: str) -> str:
        if "def " in code and "import " in code:
            return "python"
        if "function " in code or "const " in code or "interface " in code:
            return "typescript"
        if "package main" in code:
            return "go"
        return "unknown"