Keeby-smilyai commited on
Commit
44a54f8
·
verified ·
1 Parent(s): 7c1ca6d

Create backend.py

Browse files
Files changed (1) hide show
  1. backend.py +372 -0
backend.py ADDED
@@ -0,0 +1,372 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
+ import os
3
+ import json
4
+ import uuid
5
+ import zipfile
6
+ import tempfile
7
+ import subprocess
8
+ import re
9
+ from huggingface_hub import whoami
10
+
11
+ DB_PATH = "code_agents.db"
12
+ PROJECT_ROOT = "./projects"
13
+ os.makedirs(PROJECT_ROOT, exist_ok=True)
14
+
15
+ def init_db():
16
+ conn = sqlite3.connect(DB_PATH)
17
+ cursor = conn.cursor()
18
+ cursor.executescript("""
19
+ CREATE TABLE IF NOT EXISTS users (
20
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
21
+ hf_token TEXT UNIQUE NOT NULL,
22
+ username TEXT,
23
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP
24
+ );
25
+ CREATE TABLE IF NOT EXISTS projects (
26
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
27
+ user_id INTEGER NOT NULL,
28
+ title TEXT,
29
+ description TEXT,
30
+ zip_path TEXT,
31
+ created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
32
+ FOREIGN KEY (user_id) REFERENCES users(id)
33
+ );
34
+ """)
35
+ conn.commit()
36
+ conn.close()
37
+
38
+ init_db()
39
+
40
+ def get_user_by_token(hf_token):
41
+ conn = sqlite3.connect(DB_PATH)
42
+ cursor = conn.cursor()
43
+ cursor.execute("SELECT id, username FROM users WHERE hf_token = ?", (hf_token,))
44
+ row = cursor.fetchone()
45
+ conn.close()
46
+ return row if row else None
47
+
48
+ def create_user(hf_token, username=None):
49
+ conn = sqlite3.connect(DB_PATH)
50
+ cursor = conn.cursor()
51
+ cursor.execute("INSERT INTO users (hf_token, username) VALUES (?, ?)", (hf_token, username))
52
+ user_id = cursor.lastrowid
53
+ conn.commit()
54
+ conn.close()
55
+ return user_id
56
+
57
+ def create_project(user_id, title, description, zip_path):
58
+ conn = sqlite3.connect(DB_PATH)
59
+ cursor = conn.cursor()
60
+ cursor.execute("""
61
+ INSERT INTO projects (user_id, title, description, zip_path)
62
+ VALUES (?, ?, ?, ?)
63
+ """, (user_id, title, description, zip_path))
64
+ project_id = cursor.lastrowid
65
+ conn.commit()
66
+ conn.close()
67
+ return project_id
68
+
69
+ def get_user_projects(user_id):
70
+ conn = sqlite3.connect(DB_PATH)
71
+ cursor = conn.cursor()
72
+ cursor.execute("""
73
+ SELECT id, title, description, zip_path, created_at
74
+ FROM projects WHERE user_id = ? ORDER BY created_at DESC
75
+ """, (user_id,))
76
+ projects = cursor.fetchall()
77
+ conn.close()
78
+ return projects
79
+
80
+ def verify_hf_token(token):
81
+ try:
82
+ user_info = whoami(token=token)
83
+ username = user_info.get("name", "Anonymous")
84
+ user = get_user_by_token(token)
85
+ if not user:
86
+ user_id = create_user(token, username)
87
+ return user_id, f"Welcome to Code Agents, {username}! Your AI team is ready."
88
+ else:
89
+ return user[0], f"Welcome back, {user[1]}! Your projects are waiting."
90
+ except Exception as e:
91
+ return None, f"Invalid token. Please try again. ({str(e)})"
92
+
93
+ def get_user_project_dir(user_id):
94
+ user_dir = os.path.join(PROJECT_ROOT, str(user_id))
95
+ os.makedirs(user_dir, exist_ok=True)
96
+ return user_dir
97
+
98
+ def clear_user_project_dir(user_id):
99
+ user_dir = get_user_project_dir(user_id)
100
+ for f in os.listdir(user_dir):
101
+ path = os.path.join(user_dir, f)
102
+ if os.path.isdir(path):
103
+ import shutil
104
+ shutil.rmtree(path)
105
+ else:
106
+ os.remove(path)
107
+
108
+ TOOLS = {}
109
+
110
+ def create_file(user_id, path: str, content: str):
111
+ user_dir = get_user_project_dir(user_id)
112
+ full_path = os.path.join(user_dir, path.lstrip("/"))
113
+ os.makedirs(os.path.dirname(full_path), exist_ok=True)
114
+ with open(full_path, 'w') as f:
115
+ f.write(content)
116
+ return f"Created: {path}"
117
+
118
+ def read_file(user_id, path: str):
119
+ user_dir = get_user_project_dir(user_id)
120
+ full_path = os.path.join(user_dir, path.lstrip("/"))
121
+ if not os.path.exists(full_path):
122
+ return f"File not found: {path}"
123
+ with open(full_path, 'r') as f:
124
+ return f.read()
125
+
126
+ def list_files(user_id):
127
+ user_dir = get_user_project_dir(user_id)
128
+ files = []
129
+ for root, _, filenames in os.walk(user_dir):
130
+ for f in filenames:
131
+ files.append(os.path.relpath(os.path.join(root, f), user_dir))
132
+ return files
133
+
134
+ def execute_code(user_id, code: str):
135
+ try:
136
+ user_dir = get_user_project_dir(user_id)
137
+ with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, dir=user_dir) as f:
138
+ f.write(code)
139
+ temp_path = f.name
140
+ result = subprocess.run(["python3", temp_path], capture_output=True, text=True, timeout=10)
141
+ os.unlink(temp_path)
142
+ if result.returncode == 0:
143
+ return result.stdout
144
+ else:
145
+ return f"Error: {result.stderr}"
146
+ except Exception as e:
147
+ return f"Timeout/Error: {str(e)}"
148
+
149
+ def run_tests(user_id):
150
+ user_dir = get_user_project_dir(user_id)
151
+ test_dir = os.path.join(user_dir, "tests")
152
+ if not os.path.exists(test_dir):
153
+ return "No test directory found."
154
+ result = subprocess.run(["pytest", test_dir], capture_output=True, text=True, cwd=user_dir)
155
+ return result.stdout
156
+
157
+ def lint_code(user_id):
158
+ user_dir = get_user_project_dir(user_id)
159
+ result = subprocess.run(["flake8", user_dir], capture_output=True, text=True)
160
+ return result.stdout if result.returncode != 0 else "No linting errors."
161
+
162
+ def scan_vulns(user_id):
163
+ reqs = os.path.join(get_user_project_dir(user_id), "requirements.txt")
164
+ if not os.path.exists(reqs):
165
+ return "No requirements.txt found."
166
+ result = subprocess.run(["pip-audit", "-r", reqs], capture_output=True, text=True)
167
+ return result.stdout if result.returncode != 0 else "No known vulnerabilities found."
168
+
169
+ def search_hf_docs(query: str):
170
+ return f"Searching Hugging Face docs for: '{query}' -> Found: https://huggingface.co/docs/transformers/main/en/quicktour"
171
+
172
+ def write_readme(user_id, project_name: str, description: str):
173
+ user_dir = get_user_project_dir(user_id)
174
+ readme_content = f"""Project Name: {project_name}
175
+
176
+ Description:
177
+ {description}
178
+
179
+ How to Run:
180
+ 1. Install dependencies: pip install -r requirements.txt
181
+ 2. Run the main file: python cli.py
182
+
183
+ This project was built by Code Agents — an AI team on Hugging Face.
184
+ """
185
+ create_file(user_id, "README.md", readme_content)
186
+ return "README.md generated."
187
+
188
+ def generate_dockerfile(user_id):
189
+ user_dir = get_user_project_dir(user_id)
190
+ dockerfile = """FROM python:3.10-slim
191
+ COPY . /app
192
+ WORKDIR /app
193
+ RUN pip install --no-cache-dir -r requirements.txt
194
+ CMD ["python", "cli.py"]
195
+ """
196
+ create_file(user_id, "Dockerfile", dockerfile)
197
+ return "Dockerfile generated."
198
+
199
+ def zip_project(user_id, project_name: str):
200
+ user_dir = get_user_project_dir(user_id)
201
+ zip_path = os.path.join(user_dir, f"{project_name}.zip")
202
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
203
+ for root, _, files in os.walk(user_dir):
204
+ for f in files:
205
+ if f.endswith(".zip"): continue
206
+ full_path = os.path.join(root, f)
207
+ arcname = os.path.relpath(full_path, user_dir)
208
+ zf.write(full_path, arcname)
209
+ return zip_path
210
+
211
+ def fix_error(error_msg: str, code: str):
212
+ if "ModuleNotFoundError" in error_msg and "yt_dlp" in error_msg:
213
+ return "Add 'yt-dlp' to requirements.txt"
214
+ elif "SyntaxError" in error_msg:
215
+ return "Fix indentation or missing colon"
216
+ return "Unable to auto-fix. Please review manually."
217
+
218
+ TOOLS = {
219
+ "create_file": {"func": create_file, "desc": "Create a new file at specified path with content"},
220
+ "read_file": {"func": read_file, "desc": "Read content of a file"},
221
+ "list_files": {"func": list_files, "desc": "List all files in the project directory"},
222
+ "execute_code": {"func": execute_code, "desc": "Execute Python code and return stdout/stderr"},
223
+ "run_tests": {"func": run_tests, "desc": "Run pytest on test files in /tests directory"},
224
+ "lint_code": {"func": lint_code, "desc": "Run flake8 to check code style and errors"},
225
+ "scan_vulns": {"func": scan_vulns, "desc": "Scan dependencies for security vulnerabilities"},
226
+ "search_hf_docs": {"func": search_hf_docs, "desc": "Search Hugging Face documentation for model usage"},
227
+ "write_readme": {"func": write_readme, "desc": "Generate a README.md for the project"},
228
+ "generate_dockerfile": {"func": generate_dockerfile, "desc": "Generate a Dockerfile for containerization"},
229
+ "zip_project": {"func": zip_project, "desc": "Compress entire project into a downloadable ZIP"},
230
+ "fix_error": {"func": fix_error, "desc": "Given an error message and code, suggest a fix"}
231
+ }
232
+
233
+ AGENT_PROMPTS = {
234
+ "architect": """
235
+ You are an expert software architect. Your job is to plan the project structure.
236
+ Given a user request, decide:
237
+ - File/folder structure
238
+ - Required libraries
239
+ - Tech stack (e.g., Flask, CLI, etc.)
240
+
241
+ Respond only in JSON format:
242
+ {
243
+ "plan": "Brief plan",
244
+ "files": [
245
+ {"path": "cli.py", "content": "// initial code"},
246
+ {"path": "requirements.txt", "content": "yt-dlp\\nflask"}
247
+ ]
248
+ }
249
+ """,
250
+ "coder": """
251
+ You are a senior Python developer. Write clean, efficient, well-commented code.
252
+ Use the tools available: create_file, read_file, execute_code, search_hf_docs.
253
+ Never guess — use tools to verify.
254
+ When done, write the file using create_file().
255
+ """,
256
+ "tester": """
257
+ You are a QA engineer. Write unit tests for all functions.
258
+ Use pytest. Create test files in /tests/.
259
+ Use execute_code() to run tests and verify output.
260
+ If tests fail, ask coder to fix.
261
+ """,
262
+ "reviewer": """
263
+ You are a security and code review expert.
264
+ Check for:
265
+ - Security flaws (e.g., eval(), shell injection)
266
+ - Poor practices
267
+ - Missing error handling
268
+ - Unused imports
269
+ Use lint_code(), scan_vulns(), read_file()
270
+ Suggest fixes using fix_error(). Update files with create_file().
271
+ """,
272
+ "deployer": """
273
+ You are a DevOps engineer.
274
+ Generate:
275
+ - README.md with usage instructions
276
+ - Dockerfile
277
+ - ZIP archive of project
278
+ Use: write_readme(), generate_dockerfile(), zip_project()
279
+ Return final ZIP path when done.
280
+ """
281
+ }
282
+
283
+ class Agent:
284
+ def __init__(self, name: str, prompt: str, tools: dict, user_id: int):
285
+ self.name = name
286
+ self.prompt = prompt
287
+ self.tools = tools
288
+ self.user_id = user_id
289
+ self.memory = []
290
+
291
+ def think(self, task: str) -> str:
292
+ return f"[{self.name}] {task} (simulated)"
293
+
294
+ def act(self, action: str) -> str:
295
+ try:
296
+ if "(" not in action or ")" not in action:
297
+ return "Invalid action format"
298
+ func_name = action.split("(")[0]
299
+ args_str = action[len(func_name)+1:-1]
300
+ args = {}
301
+ if args_str:
302
+ for pair in args_str.split(","):
303
+ k, v = pair.split("=")
304
+ args[k.strip()] = v.strip().strip('"\'')
305
+ if func_name in self.tools:
306
+ bound_func = lambda **kwargs: self.tools[func_name]["func"](self.user_id, **kwargs)
307
+ return bound_func(**args)
308
+ return f"Unknown tool: {func_name}"
309
+ except Exception as e:
310
+ return f"Error executing {action}: {str(e)}"
311
+
312
+ def run(self, task: str) -> str:
313
+ response = self.think(task)
314
+ self.memory.append(f"{self.name}: {response}")
315
+
316
+ actions = re.findall(r'(\w+\(.*?\))', response)
317
+ for action in actions:
318
+ result = self.act(action)
319
+ self.memory.append(f"{self.name} → {action} → {result}")
320
+
321
+ return "\n".join(self.memory[-5:])
322
+
323
+ def run_code_agents(user_id: int, project_description: str):
324
+ clear_user_project_dir(user_id)
325
+ user_dir = get_user_project_dir(user_id)
326
+
327
+ agents = {
328
+ "architect": Agent("Architect", AGENT_PROMPTS["architect"], TOOLS, user_id),
329
+ "coder": Agent("Coder", AGENT_PROMPTS["coder"], TOOLS, user_id),
330
+ "tester": Agent("Tester", AGENT_PROMPTS["tester"], TOOLS, user_id),
331
+ "reviewer": Agent("Reviewer", AGENT_PROMPTS["reviewer"], TOOLS, user_id),
332
+ "deployer": Agent("Deployer", AGENT_PROMPTS["deployer"], TOOLS, user_id),
333
+ }
334
+
335
+ timeline = []
336
+ logs = ""
337
+
338
+ logs += "Architect planning...\n"
339
+ plan = agents["architect"].run(f"Plan a project: {project_description}")
340
+ timeline.append({"agent": "architect", "step": "plan", "output": plan})
341
+ logs += plan + "\n\n"
342
+
343
+ logs += "Coder implementing...\n"
344
+ coder_task = f"Implement the project based on this plan:\n{plan}"
345
+ coder_result = agents["coder"].run(coder_task)
346
+ timeline.append({"agent": "coder", "step": "implement", "output": coder_result})
347
+ logs += coder_result + "\n\n"
348
+
349
+ logs += "Tester adding tests...\n"
350
+ test_task = "Write tests for the implemented code. Save to /tests/test_main.py"
351
+ test_result = agents["tester"].run(test_task)
352
+ timeline.append({"agent": "tester", "step": "test", "output": test_result})
353
+ logs += test_result + "\n\n"
354
+
355
+ logs += "Reviewer checking code...\n"
356
+ review_task = "Review all files. Fix any bugs or security issues."
357
+ review_result = agents["reviewer"].run(review_task)
358
+ timeline.append({"agent": "reviewer", "step": "review", "output": review_result})
359
+ logs += review_result + "\n\n"
360
+
361
+ logs += "Deployer packaging...\n"
362
+ deploy_task = f"Package the project named '{project_description[:20]}...' into a ZIP with README and Dockerfile."
363
+ deploy_result = agents["deployer"].run(deploy_task)
364
+ timeline.append({"agent": "deployer", "step": "deploy", "output": deploy_result})
365
+
366
+ zip_files = [f for f in os.listdir(user_dir) if f.endswith(".zip")]
367
+ zip_path = os.path.join(user_dir, zip_files[0]) if zip_files else None
368
+
369
+ if zip_path:
370
+ project_id = create_project(user_id, project_description[:50], project_description, zip_path)
371
+
372
+ return logs, zip_path, timeline