| """Koda ģenerēšana un labošana ar Qwen3.""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import re |
| import zipfile |
| from pathlib import Path |
| from uuid import uuid4 |
|
|
| from fastapi import APIRouter, HTTPException |
| from pydantic import BaseModel, Field |
|
|
| from maris_core.text.generate import ( |
| DEFAULT_MAX_NEW_TOKENS, |
| call_generation_pipeline, |
| complete_with_hf_fallback, |
| get_pipeline, |
| ) |
|
|
| logger = logging.getLogger(__name__) |
| router = APIRouter() |
| WORKSPACE_ARTIFACT_ROOT = Path("/tmp/maris-ai/generated-projects") |
| DEFAULT_REPO_ROOT = Path(__file__).resolve().parents[3] |
| _MAX_REPO_CONTEXT_FILES = 6 |
| _MAX_REPO_CONTEXT_CHARS = 1800 |
| _FENCED_BLOCK_PATTERN = re.compile( |
| r"```(?P<label>[^\n`]*)\n(?P<body>.*?)```", |
| flags=re.DOTALL, |
| ) |
| _EDIT_INTENT_PATTERN = re.compile( |
| r"\b(edit|modify|update|refactor|fix|patch|rewrite|change|cleanup|labo|salabo|rediģē|pārstrādā|uzlabo)\b", |
| flags=re.IGNORECASE, |
| ) |
| _PATH_HINT_PATTERN = re.compile(r"(?P<path>(?:/|\./|\.\./)[^\s,;:()\[\]{}<>]+)") |
| _REPO_RELATIVE_HINT_PATTERN = re.compile( |
| r"(?P<path>(?:[A-Za-z0-9_.-]+/)+[A-Za-z0-9_.-]+\.[A-Za-z0-9_.-]+)" |
| ) |
| _STACK_KEYWORDS: dict[str, tuple[str, ...]] = { |
| "nextjs": ("next.js", "nextjs", " app router", "next app"), |
| "react": ("react", "vite", "tsx component"), |
| "rust": ("rust", "cargo", "actix", "axum"), |
| "python": ("python", "fastapi", "flask", "django", "pytest"), |
| "web": ("html", "css", "landing page", "calculator", "kalkulator", "web app", "web-app"), |
| } |
| _STACK_DISPLAY_NAMES = { |
| "nextjs": "Next.js (TypeScript)", |
| "react": "React (TypeScript)", |
| "rust": "Rust", |
| "python": "Python", |
| "web": "HTML/CSS/JavaScript", |
| } |
| _STACK_ENTRYPOINTS = { |
| "nextjs": "app/page.tsx", |
| "react": "src/App.tsx", |
| "rust": "src/main.rs", |
| "python": "src/main.py", |
| "web": "index.html", |
| } |
|
|
|
|
| class CodeRequest(BaseModel): |
| prompt: str |
| language: str = "Python" |
| context: str = "" |
| repo_path: str | None = None |
| fallback_model: str | None = None |
| max_new_tokens: int = Field(default=DEFAULT_MAX_NEW_TOKENS, ge=32) |
|
|
|
|
| class FixCodeRequest(BaseModel): |
| code: str |
| error_message: str = "" |
| language: str = "Python" |
|
|
|
|
| class ProjectFile(BaseModel): |
| path: str |
| content: str |
| absolute_path: str | None = None |
|
|
|
|
| class CodeResponse(BaseModel): |
| code: str |
| explanation: str |
| language: str |
| detected_stack: str |
| files: list[ProjectFile] = Field(default_factory=list) |
| workspace_artifact_dir: str | None = None |
| bundle_path: str | None = None |
| entrypoint: str | None = None |
| repo_path: str | None = None |
|
|
|
|
| class RepoContext(BaseModel): |
| repo_path: str |
| files: list[str] = Field(default_factory=list) |
|
|
|
|
| def _extract_code_block(text: str, language: str) -> tuple[str, str]: |
| """Izvelk koda bloku un paskaidrojumu no LLM atbildes.""" |
| del language |
| lines = text.split("\n") |
| code_lines: list[str] = [] |
| explanation_lines: list[str] = [] |
| in_code = False |
|
|
| for line in lines: |
| if line.strip().startswith("```"): |
| in_code = not in_code |
| continue |
| if in_code: |
| code_lines.append(line) |
| else: |
| explanation_lines.append(line) |
|
|
| code = "\n".join(code_lines).strip() |
| explanation = "\n".join(explanation_lines).strip() |
|
|
| if not code: |
| code = text.strip() |
| explanation = "" |
|
|
| return code, explanation |
|
|
|
|
| def _extract_structured_project_payload(text: str) -> dict[str, object] | None: |
| candidates: list[str] = [] |
| stripped = text.strip() |
| if stripped.startswith("{") and stripped.endswith("}"): |
| candidates.append(stripped) |
| for match in _FENCED_BLOCK_PATTERN.finditer(text): |
| label = match.group("label").strip().lower() |
| if "json" in label: |
| candidates.append(match.group("body").strip()) |
| for candidate in candidates: |
| try: |
| payload = json.loads(candidate) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(payload, dict) and isinstance(payload.get("files"), list): |
| return payload |
| return None |
|
|
|
|
| def _sanitize_relative_path(path: str) -> str: |
| candidate = Path(path.strip().replace("\\", "/")) |
| cleaned_parts = [part for part in candidate.parts if part not in {"", ".", ".."}] |
| if not cleaned_parts: |
| return "main.txt" |
| return Path(*cleaned_parts).as_posix() |
|
|
|
|
| def _slugify_prompt(prompt: str) -> str: |
| tokens = re.findall(r"[a-z0-9]+", prompt.lower()) |
| return "-".join(tokens[:6]) or "generated-project" |
|
|
|
|
| def _looks_like_repo_edit_request(prompt: str) -> bool: |
| return bool(_EDIT_INTENT_PATTERN.search(prompt)) |
|
|
|
|
| def _extract_repo_path_from_prompt(prompt: str) -> Path | None: |
| for match in _PATH_HINT_PATTERN.finditer(prompt): |
| candidate = Path(match.group("path")).expanduser() |
| try: |
| resolved = candidate.resolve() |
| except OSError: |
| continue |
| if resolved.is_file(): |
| resolved = resolved.parent |
| if resolved.is_dir(): |
| return resolved |
| return None |
|
|
|
|
| def _resolve_repo_path(raw_repo_path: str | None, prompt: str) -> Path | None: |
| if raw_repo_path: |
| candidate = Path(raw_repo_path).expanduser() |
| try: |
| resolved = candidate.resolve() |
| except OSError: |
| resolved = candidate |
| if resolved.is_file(): |
| resolved = resolved.parent |
| if resolved.is_dir(): |
| return resolved |
|
|
| extracted = _extract_repo_path_from_prompt(prompt) |
| if extracted is not None: |
| return extracted |
|
|
| if _looks_like_repo_edit_request(prompt) and DEFAULT_REPO_ROOT.exists(): |
| return DEFAULT_REPO_ROOT |
| return None |
|
|
|
|
| def _read_json(path: Path) -> dict[str, object]: |
| try: |
| return json.loads(path.read_text(encoding="utf-8")) |
| except (OSError, json.JSONDecodeError): |
| return {} |
|
|
|
|
| def _detect_stack(prompt: str, requested_language: str, repo_path: Path | None) -> str: |
| if repo_path is not None: |
| package_json = _read_json(repo_path / "package.json") |
| dependencies = { |
| **( |
| {str(k): v for k, v in package_json.get("dependencies", {}).items()} |
| if isinstance(package_json.get("dependencies"), dict) |
| else {} |
| ), |
| **( |
| {str(k): v for k, v in package_json.get("devDependencies", {}).items()} |
| if isinstance(package_json.get("devDependencies"), dict) |
| else {} |
| ), |
| } |
| if ( |
| (repo_path / "next.config.js").exists() |
| or (repo_path / "next.config.mjs").exists() |
| or "next" in dependencies |
| ): |
| return "nextjs" |
| if ( |
| "react" in dependencies |
| or (repo_path / "src/App.tsx").exists() |
| or (repo_path / "src/main.tsx").exists() |
| ): |
| return "react" |
| if (repo_path / "Cargo.toml").exists() or (repo_path / "src/main.rs").exists(): |
| return "rust" |
| if ( |
| (repo_path / "pyproject.toml").exists() |
| or (repo_path / "requirements.txt").exists() |
| or (repo_path / "src/main.py").exists() |
| ): |
| return "python" |
|
|
| normalized_prompt = f" {prompt.lower()} " |
| for stack, keywords in _STACK_KEYWORDS.items(): |
| if any(keyword in normalized_prompt for keyword in keywords): |
| return stack |
|
|
| normalized_language = requested_language.strip().lower() |
| if "next" in normalized_language: |
| return "nextjs" |
| if "html/css/javascript" in normalized_language: |
| return "web" |
| if "react" in normalized_language or "typescript" in normalized_language: |
| return "react" |
| if "javascript" in normalized_language: |
| return "web" |
| if "rust" in normalized_language: |
| return "rust" |
| return "python" |
|
|
|
|
| def _display_language_for_stack(requested_language: str, detected_stack: str) -> str: |
| if requested_language.strip() and requested_language.strip().lower() != "python": |
| if detected_stack in {"nextjs", "react"}: |
| return _STACK_DISPLAY_NAMES[detected_stack] |
| if detected_stack == "rust" and "rust" in requested_language.strip().lower(): |
| return "Rust" |
| if detected_stack == "python" and "python" in requested_language.strip().lower(): |
| return "Python" |
| return _STACK_DISPLAY_NAMES.get(detected_stack, requested_language) |
|
|
|
|
| def _default_entrypoint_for_stack(detected_stack: str, repo_path: Path | None) -> str: |
| if repo_path is not None: |
| candidates = { |
| "nextjs": ["app/page.tsx", "pages/index.tsx", "src/app/page.tsx"], |
| "react": ["src/App.tsx", "src/main.tsx", "src/App.jsx"], |
| "rust": ["src/main.rs"], |
| "python": ["src/main.py", "main.py", "app/main.py"], |
| }.get(detected_stack, []) |
| for candidate in candidates: |
| if (repo_path / candidate).exists(): |
| return candidate |
| return _STACK_ENTRYPOINTS.get(detected_stack, "main.py") |
|
|
|
|
| def _stack_scaffold_templates(detected_stack: str) -> dict[str, str]: |
| if detected_stack == "nextjs": |
| package_json = { |
| "name": "maris-next-app", |
| "private": True, |
| "scripts": {"dev": "next dev", "build": "next build", "start": "next start"}, |
| "dependencies": {"next": "15.0.0", "react": "18.3.1", "react-dom": "18.3.1"}, |
| "devDependencies": { |
| "typescript": "5.6.3", |
| "@types/react": "18.3.3", |
| "@types/node": "22.7.4", |
| }, |
| } |
| return { |
| "package.json": json.dumps(package_json, ensure_ascii=False, indent=2) + "\n", |
| "tsconfig.json": json.dumps( |
| { |
| "compilerOptions": { |
| "target": "ES2022", |
| "lib": ["dom", "dom.iterable", "es2022"], |
| "allowJs": False, |
| "skipLibCheck": True, |
| "strict": True, |
| "noEmit": True, |
| "module": "esnext", |
| "moduleResolution": "bundler", |
| "resolveJsonModule": True, |
| "isolatedModules": True, |
| "jsx": "preserve", |
| "incremental": True, |
| }, |
| "include": ["next-env.d.ts", "**/*.ts", "**/*.tsx"], |
| "exclude": ["node_modules"], |
| }, |
| ensure_ascii=False, |
| indent=2, |
| ) |
| + "\n", |
| "next-env.d.ts": '/// <reference types="next" />\n/// <reference types="next/image-types/global" />\n', |
| "next.config.mjs": "/** @type {import('next').NextConfig} */\nconst nextConfig = {};\n\nexport default nextConfig;\n", |
| "app/layout.tsx": 'export default function RootLayout({ children }: { children: React.ReactNode }) {\n return (\n <html lang="en">\n <body>{children}</body>\n </html>\n );\n}\n', |
| "app/page.tsx": "export default function HomePage() {\n return <main>Maris Next.js app</main>;\n}\n", |
| } |
| if detected_stack == "react": |
| package_json = { |
| "name": "maris-react-app", |
| "private": True, |
| "type": "module", |
| "scripts": {"dev": "vite", "build": "vite build", "preview": "vite preview"}, |
| "dependencies": {"react": "18.3.1", "react-dom": "18.3.1"}, |
| "devDependencies": { |
| "typescript": "5.6.3", |
| "vite": "5.4.8", |
| "@vitejs/plugin-react": "4.3.1", |
| "@types/react": "18.3.3", |
| "@types/react-dom": "18.3.0", |
| }, |
| } |
| return { |
| "package.json": json.dumps(package_json, ensure_ascii=False, indent=2) + "\n", |
| "tsconfig.json": json.dumps( |
| { |
| "compilerOptions": { |
| "target": "ES2020", |
| "useDefineForClassFields": True, |
| "lib": ["DOM", "DOM.Iterable", "ES2020"], |
| "allowJs": False, |
| "skipLibCheck": True, |
| "esModuleInterop": True, |
| "allowSyntheticDefaultImports": True, |
| "strict": True, |
| "module": "ESNext", |
| "moduleResolution": "bundler", |
| "resolveJsonModule": True, |
| "isolatedModules": True, |
| "noEmit": True, |
| "jsx": "react-jsx", |
| }, |
| "include": ["src"], |
| }, |
| ensure_ascii=False, |
| indent=2, |
| ) |
| + "\n", |
| "vite.config.ts": "import { defineConfig } from 'vite';\nimport react from '@vitejs/plugin-react';\n\nexport default defineConfig({\n plugins: [react()],\n});\n", |
| "index.html": '<!doctype html>\n<html lang="en">\n <head>\n <meta charset="UTF-8" />\n <meta name="viewport" content="width=device-width, initial-scale=1.0" />\n <title>Maris React App</title>\n </head>\n <body>\n <div id="root"></div>\n <script type="module" src="/src/main.tsx"></script>\n </body>\n</html>\n', |
| "src/main.tsx": "import React from 'react';\nimport ReactDOM from 'react-dom/client';\nimport App from './App';\n\nReactDOM.createRoot(document.getElementById('root')!).render(\n <React.StrictMode>\n <App />\n </React.StrictMode>,\n);\n", |
| "src/App.tsx": "export default function App() {\n return <main>Maris React app</main>;\n}\n", |
| } |
| if detected_stack == "rust": |
| return { |
| "Cargo.toml": '[package]\nname = "maris-rust-app"\nversion = "0.1.0"\nedition = "2021"\n\n[dependencies]\n', |
| "src/main.rs": 'fn main() {\n println!("Hello from Maris Rust app");\n}\n', |
| } |
| if detected_stack == "web": |
| return { |
| "index.html": '<!doctype html>\n<html lang="en">\n <head>\n <meta charset="UTF-8" />\n <meta name="viewport" content="width=device-width, initial-scale=1.0" />\n <title>Maris Web App</title>\n </head>\n <body>\n <main>Maris web artifact</main>\n </body>\n</html>\n', |
| } |
| return { |
| "pyproject.toml": '[project]\nname = "maris-python-app"\nversion = "0.1.0"\ndescription = "Generated by Maris AI"\nrequires-python = ">=3.11"\n\n[project.scripts]\nmaris-app = "src.main:main"\n', |
| "src/main.py": "def main() -> None:\n print('Hello from Maris Python app')\n\n\nif __name__ == '__main__':\n main()\n", |
| } |
|
|
|
|
| def _normalize_files(files: list[ProjectFile]) -> list[ProjectFile]: |
| normalized: dict[str, ProjectFile] = {} |
| for file in files: |
| path = _sanitize_relative_path(file.path) |
| normalized[path] = ProjectFile( |
| path=path, content=file.content, absolute_path=file.absolute_path |
| ) |
| return list(normalized.values()) |
|
|
|
|
| def _ensure_stack_scaffold( |
| files: list[ProjectFile], |
| *, |
| detected_stack: str, |
| entrypoint: str | None, |
| repo_path: Path | None, |
| ) -> tuple[list[ProjectFile], str]: |
| normalized_files = _normalize_files(files) |
| resolved_entrypoint = _sanitize_relative_path( |
| entrypoint or _default_entrypoint_for_stack(detected_stack, repo_path) |
| ) |
|
|
| if repo_path is not None: |
| if len(normalized_files) == 1: |
| only_file = normalized_files[0] |
| if only_file.path != resolved_entrypoint: |
| normalized_files[0] = ProjectFile( |
| path=resolved_entrypoint, content=only_file.content |
| ) |
| return normalized_files, resolved_entrypoint |
|
|
| templates = _stack_scaffold_templates(detected_stack) |
| file_map = {file.path: file for file in normalized_files} |
| fallback_paths = { |
| "main.py", |
| "src/main.py", |
| "src/main.rs", |
| "src/App.tsx", |
| "app/page.tsx", |
| "index.html", |
| } |
| if len(file_map) == 1: |
| only_path, only_file = next(iter(file_map.items())) |
| if only_path in fallback_paths and only_path != resolved_entrypoint: |
| file_map.pop(only_path) |
| file_map[resolved_entrypoint] = ProjectFile( |
| path=resolved_entrypoint, content=only_file.content |
| ) |
|
|
| for path, content in templates.items(): |
| if path not in file_map: |
| file_map[path] = ProjectFile(path=path, content=content) |
|
|
| ordered_paths = list(templates.keys()) + [path for path in file_map if path not in templates] |
| ordered_files = [file_map[path] for path in ordered_paths] |
| return ordered_files, resolved_entrypoint |
|
|
|
|
| def _extract_project_files( |
| text: str, |
| *, |
| language: str, |
| fallback_entrypoint: str = "main.py", |
| ) -> tuple[list[ProjectFile], str | None, str]: |
| payload = _extract_structured_project_payload(text) |
| if payload is not None: |
| files_payload = payload.get("files") |
| files: list[ProjectFile] = [] |
| if isinstance(files_payload, list): |
| for item in files_payload: |
| if not isinstance(item, dict): |
| continue |
| raw_path = str(item.get("path", "")).strip() |
| raw_content = str(item.get("content", "")) |
| if not raw_path or not raw_content.strip(): |
| continue |
| files.append( |
| ProjectFile( |
| path=_sanitize_relative_path(raw_path), |
| content=raw_content, |
| ) |
| ) |
| entrypoint = payload.get("entrypoint") or payload.get("primary_file") |
| explanation = str(payload.get("explanation") or payload.get("summary") or "").strip() |
| return ( |
| files, |
| ( |
| _sanitize_relative_path(str(entrypoint)) |
| if isinstance(entrypoint, str) and entrypoint |
| else None |
| ), |
| explanation, |
| ) |
|
|
| code, explanation = _extract_code_block(text, language) |
| if not code.strip(): |
| return [], None, explanation |
| return [ProjectFile(path=fallback_entrypoint, content=code)], fallback_entrypoint, explanation |
|
|
|
|
| def _select_primary_code(files: list[ProjectFile], entrypoint: str | None) -> str: |
| if entrypoint: |
| for file in files: |
| if file.path == entrypoint: |
| return file.content |
| return files[0].content if files else "" |
|
|
|
|
| def _extract_repo_relative_hints(prompt: str, repo_path: Path) -> list[str]: |
| hinted_paths: list[str] = [] |
| repo_root = repo_path.resolve() |
| for pattern in (_PATH_HINT_PATTERN, _REPO_RELATIVE_HINT_PATTERN): |
| for match in pattern.finditer(prompt): |
| raw_path = match.group("path").strip() |
| candidate = repo_path / raw_path if not raw_path.startswith("/") else Path(raw_path) |
| try: |
| resolved = candidate.resolve() |
| except OSError: |
| continue |
| try: |
| relative_path = resolved.relative_to(repo_root).as_posix() |
| except ValueError: |
| continue |
| if resolved.is_file() and relative_path not in hinted_paths: |
| hinted_paths.append(relative_path) |
| return hinted_paths |
|
|
|
|
| def _repo_context_candidates(repo_path: Path, detected_stack: str, prompt: str = "") -> list[str]: |
| candidates = _extract_repo_relative_hints(prompt, repo_path) + ["README.md"] |
| candidates.extend( |
| { |
| "nextjs": [ |
| "package.json", |
| "tsconfig.json", |
| "next.config.mjs", |
| "app/layout.tsx", |
| "app/page.tsx", |
| "pages/index.tsx", |
| ], |
| "react": [ |
| "package.json", |
| "tsconfig.json", |
| "vite.config.ts", |
| "index.html", |
| "src/main.tsx", |
| "src/App.tsx", |
| ], |
| "rust": ["Cargo.toml", "src/main.rs", "src/lib.rs"], |
| "python": [ |
| "pyproject.toml", |
| "requirements.txt", |
| "src/main.py", |
| "main.py", |
| "app/main.py", |
| ], |
| "web": ["index.html"], |
| }.get(detected_stack, []) |
| ) |
| existing: list[str] = [] |
| for candidate in candidates: |
| if (repo_path / candidate).exists() and candidate not in existing: |
| existing.append(candidate) |
| if len(existing) >= _MAX_REPO_CONTEXT_FILES: |
| break |
| return existing |
|
|
|
|
| def _build_repo_context( |
| repo_path: Path | None, detected_stack: str, prompt: str = "" |
| ) -> RepoContext | None: |
| if repo_path is None or not repo_path.exists(): |
| return None |
| files = _repo_context_candidates(repo_path, detected_stack, prompt) |
| if not files: |
| return RepoContext(repo_path=str(repo_path), files=[]) |
| excerpts: list[str] = [] |
| for relative_path in files: |
| try: |
| content = (repo_path / relative_path).read_text(encoding="utf-8") |
| except OSError: |
| continue |
| excerpts.append(f"[FILE {relative_path}]\n{content[:_MAX_REPO_CONTEXT_CHARS].strip()}") |
| return ( |
| RepoContext(repo_path=str(repo_path), files=files) |
| if not excerpts |
| else RepoContext( |
| repo_path=str(repo_path), |
| files=["\n\n".join(excerpts)], |
| ) |
| ) |
|
|
|
|
| def _build_system_prompt( |
| *, |
| language: str, |
| detected_stack: str, |
| repo_context: RepoContext | None, |
| ) -> str: |
| stack_label = _STACK_DISPLAY_NAMES.get(detected_stack, language) |
| prompt = ( |
| f"Tu esi Maris AI — eksperts programmētājs. Uzraksti {language} kodu ar pareizu {stack_label} projekta struktūru. " |
| "Dod production-ready risinājumu ar drošiem noklusējumiem, ievades validāciju un saprātīgu kļūdu apstrādi, ja tas attiecas uz uzdevumu. " |
| "Nepiedāvā pseidokodu, ja vien lietotājs to tieši neprasa. " |
| "Atbildi ar īsu paskaidrojumu, kurā nosauc edge cases un kā risinājumu pārbaudīt. " |
| "Ja pieprasījums ir par lietotni, UI vai workspace artefaktiem, atdod pilnu izpildāmu artefaktu. " |
| "Kad risinājums satur vienu vai vairākus failus, atbildi ar vienu ```json``` bloku formā " |
| '{"explanation":"...","entrypoint":"...","files":[{"path":"...","content":"..."}]} ' |
| "un pārliecinies, ka files satur gala failus ar reālu saturu bez TODO placeholderiem. " |
| f"Primāri mērķē uz {stack_label} stacku un izmanto šim stackam idiomātiskus failu nosaukumus." |
| ) |
| if repo_context is not None: |
| prompt += ( |
| " Šī ir repo-aware rediģēšana esošam projektam: izmanto precīzus repo-relatīvos failu ceļus, " |
| "rediģē tikai nepieciešamos failus, saglabā esošo projekta struktūru un, ja prompt piesauc konkrētus failus, balsti risinājumu tieši uz tiem." |
| ) |
| return prompt |
|
|
|
|
| def _materialize_workspace_artifacts( |
| files: list[ProjectFile], |
| *, |
| prompt: str, |
| ) -> str | None: |
| if not files: |
| return None |
| artifact_dir = WORKSPACE_ARTIFACT_ROOT / f"{_slugify_prompt(prompt)}-{uuid4().hex[:8]}" |
| artifact_dir.mkdir(parents=True, exist_ok=True) |
| artifact_root = artifact_dir.resolve() |
| for file in files: |
| relative = Path(_sanitize_relative_path(file.path)) |
| target = (artifact_root / relative).resolve() |
| if not str(target).startswith(str(artifact_root)): |
| raise HTTPException(status_code=400, detail="Nederīgs ģenerētā faila ceļš.") |
| target.parent.mkdir(parents=True, exist_ok=True) |
| target.write_text(file.content, encoding="utf-8") |
| file.absolute_path = str(target) |
| return str(artifact_root) |
|
|
|
|
| def _create_bundle_zip(workspace_artifact_dir: str | None) -> str | None: |
| if not workspace_artifact_dir: |
| return None |
| artifact_root = Path(workspace_artifact_dir) |
| if not artifact_root.exists(): |
| return None |
| bundle_path = artifact_root.with_suffix(".zip") |
| with zipfile.ZipFile(bundle_path, mode="w", compression=zipfile.ZIP_DEFLATED) as bundle: |
| for path in sorted(artifact_root.rglob("*")): |
| if path.is_file(): |
| bundle.write(path, arcname=path.relative_to(artifact_root)) |
| return str(bundle_path) |
|
|
|
|
| @router.post("/generate", response_model=CodeResponse) |
| async def generate_code(req: CodeRequest) -> CodeResponse: |
| """Ģenerē kodu pēc apraksta.""" |
| from maris_core.utils.hf_integration import HFIntegration |
|
|
| hf = HFIntegration() |
| repo_path = _resolve_repo_path(req.repo_path, req.prompt) |
| detected_stack = _detect_stack(req.prompt, req.language, repo_path) |
| resolved_language = _display_language_for_stack(req.language, detected_stack) |
| repo_context = _build_repo_context(repo_path, detected_stack, req.prompt) |
| fallback_entrypoint = _default_entrypoint_for_stack(detected_stack, repo_path) |
| system_prompt = _build_system_prompt( |
| language=resolved_language, |
| detected_stack=detected_stack, |
| repo_context=repo_context, |
| ) |
|
|
| messages = [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": req.prompt}, |
| ] |
|
|
| if repo_context is not None: |
| repo_context_content = ( |
| f"Repo sakne: {repo_context.repo_path}\n" |
| f"Esošie svarīgie faili: {', '.join(_repo_context_candidates(repo_path or DEFAULT_REPO_ROOT, detected_stack, req.prompt)) or 'nav'}" |
| ) |
| if repo_context.files: |
| repo_context_content += f"\n\n{repo_context.files[0]}" |
| messages.insert(1, {"role": "user", "content": repo_context_content}) |
|
|
| if req.context: |
| messages.insert(1, {"role": "user", "content": f"Konteksts:\n{req.context}"}) |
|
|
| pipe = get_pipeline() |
| raw_response: str | None = None |
| if pipe is not None: |
| try: |
| out = call_generation_pipeline( |
| pipe, |
| messages, |
| max_new_tokens=req.max_new_tokens, |
| temperature=0.2, |
| ) |
| raw_response = out[0]["generated_text"][-1]["content"] |
| except Exception as exc: |
| logger.error("Koda ģenerēšanas kļūda: %s", exc) |
|
|
| if raw_response is None: |
| fallback_result = complete_with_hf_fallback( |
| messages, |
| fallback_model=req.fallback_model, |
| max_new_tokens=req.max_new_tokens, |
| temperature=0.2, |
| ) |
| if fallback_result is not None: |
| _, raw_response = fallback_result |
| else: |
| raise HTTPException( |
| status_code=503, |
| detail="Maris AI koda ģenerēšana šobrīd nav pieejama.", |
| ) |
|
|
| files, entrypoint, structured_explanation = _extract_project_files( |
| raw_response, |
| language=resolved_language, |
| fallback_entrypoint=fallback_entrypoint, |
| ) |
| files, entrypoint = _ensure_stack_scaffold( |
| files, |
| detected_stack=detected_stack, |
| entrypoint=entrypoint, |
| repo_path=repo_path, |
| ) |
| code, explanation = _extract_code_block(raw_response, resolved_language) |
| if files: |
| code = _select_primary_code(files, entrypoint) |
| if structured_explanation: |
| explanation = structured_explanation |
| workspace_artifact_dir = _materialize_workspace_artifacts(files, prompt=req.prompt) |
| bundle_path = _create_bundle_zip(workspace_artifact_dir) |
| detected_stack_label = _STACK_DISPLAY_NAMES.get(detected_stack, detected_stack) |
| await hf.save_generation( |
| "code", |
| req.prompt, |
| { |
| "language": resolved_language, |
| "detected_stack": detected_stack_label, |
| "repo_path": str(repo_path) if repo_path is not None else None, |
| "fallback_model": req.fallback_model, |
| }, |
| ) |
|
|
| return CodeResponse( |
| code=code, |
| explanation=explanation, |
| language=resolved_language, |
| detected_stack=detected_stack_label, |
| files=files, |
| workspace_artifact_dir=workspace_artifact_dir, |
| bundle_path=bundle_path, |
| entrypoint=entrypoint, |
| repo_path=str(repo_path) if repo_path is not None else None, |
| ) |
|
|
|
|
| @router.post("/fix", response_model=CodeResponse) |
| async def fix_code(req: FixCodeRequest) -> CodeResponse: |
| """Labo kļūdainu kodu.""" |
| prompt = f"Labo šo {req.language} kodu:\n```\n{req.code}\n```" + ( |
| f"\nKļūda: {req.error_message}" if req.error_message else "" |
| ) |
| return await generate_code(CodeRequest(prompt=prompt, language=req.language)) |
|
|