import gc import json import os import re import time from pathlib import Path from typing import Any import torch from peft import PeftModel from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, logging as transformers_logging try: from .cache import GuidanceCache from .context import collect_project_context from .prompts import GUIDANCE_SYSTEM, build_guidance_prompt from .settings import GuidanceSettings from .tools import ToolSignals, run_tools except ImportError: from guidance_sidecar.cache import GuidanceCache from guidance_sidecar.context import collect_project_context from guidance_sidecar.prompts import GUIDANCE_SYSTEM, build_guidance_prompt from guidance_sidecar.settings import GuidanceSettings from guidance_sidecar.tools import ToolSignals, run_tools def configure_runtime(settings: GuidanceSettings) -> None: cache_root = settings.resolve(settings.model_cache_dir) tmp_root = settings.resolve(settings.tmp_dir) tmp_root.mkdir(parents=True, exist_ok=True) for key in ("HF_HOME", "HF_HUB_CACHE", "TRANSFORMERS_CACHE", "HF_DATASETS_CACHE", "HF_XET_CACHE"): os.environ[key] = str(cache_root / key.lower()) for key in ("TMP", "TEMP", "TMPDIR"): os.environ[key] = str(tmp_root) os.environ["TOKENIZERS_PARALLELISM"] = "false" os.environ["TRANSFORMERS_VERBOSITY"] = "error" os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1" transformers_logging.set_verbosity_error() if hasattr(transformers_logging, "disable_progress_bar"): transformers_logging.disable_progress_bar() def preferred_torch_dtype() -> torch.dtype: if torch.cuda.is_available(): return torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16 return torch.float32 def extract_json(text: str) -> dict[str, Any]: text = text.strip() fenced = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL | re.IGNORECASE) if fenced: text = fenced.group(1).strip() start = text.find("{") end = text.rfind("}") if start >= 0 and end > start: text = text[start : end + 1] try: return json.loads(text) except json.JSONDecodeError: repaired = re.sub(r'(\]|\}|"|\d)\s*\n\s*("[-_a-zA-Z0-9]+"\s*:)', r"\1,\n \2", text) repaired = re.sub(r",\s*([}\]])", r"\1", repaired) return json.loads(repaired) def normalize_guidance(result: dict[str, Any]) -> dict[str, Any]: allowed_routes = ("plan", "patch", "critique", "test", "fix") route = str(result.get("route", "plan")).lower() result["route"] = next((candidate for candidate in allowed_routes if candidate in route), "plan") try: confidence = float(result.get("confidence", 0.0)) except (TypeError, ValueError): confidence = 0.0 result["confidence"] = max(0.0, min(1.0, confidence)) for key in ("useful_context", "plan", "patch_rules", "risks", "tests"): value = result.get(key, []) if isinstance(value, str): value = [value] elif not isinstance(value, list): value = [] result[key] = [str(item) for item in value if str(item).strip()] result["next_action"] = str(result.get("next_action", "")).strip() or "Continue with the planned coding step." return result def enrich_with_tools(result: dict[str, Any], signals: ToolSignals) -> dict[str, Any]: if result["route"] == "plan" and signals.route_hint != "plan": result["route"] = signals.route_hint for key, values in ( ("risks", signals.risk_flags), ("tests", signals.suggested_tests), ("patch_rules", signals.patch_rules), ): existing = result.get(key, []) merged = existing + [value for value in values if value not in existing] result[key] = merged[:8] result["tool_signals"] = { "route_hint": signals.route_hint, "context_stats": signals.context_stats, "calculations": signals.calculations, } return result class GuidanceEngine: def __init__( self, settings: GuidanceSettings | None = None, model_name: str | None = None, adapter_dir: str | None = None, keep_loaded_seconds: int | None = None, ): base = settings or GuidanceSettings() self.settings = GuidanceSettings( model_name=model_name or base.model_name, adapter_dir=adapter_dir if adapter_dir is not None else base.adapter_dir, cache_dir=base.cache_dir, model_cache_dir=base.model_cache_dir, tmp_dir=base.tmp_dir, max_context_chars=base.max_context_chars, max_input_tokens=base.max_input_tokens, max_new_tokens=base.max_new_tokens, keep_loaded_seconds=keep_loaded_seconds if keep_loaded_seconds is not None else base.keep_loaded_seconds, quantization=base.quantization, require_cuda=base.require_cuda, require_bf16=base.require_bf16, ) configure_runtime(self.settings) self.cache = GuidanceCache(self.settings.resolve(self.settings.cache_dir)) self.model = None self.tokenizer = None self.loaded_at = 0.0 @classmethod def from_bundle(cls, bundle_dir: str | Path) -> "GuidanceEngine": return cls(GuidanceSettings.from_bundle(bundle_dir)) def _quantization_config(self) -> BitsAndBytesConfig | None: if self.settings.quantization == "none": return None if self.settings.quantization != "4bit": raise ValueError(f"Unsupported quantization: {self.settings.quantization}") return BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) def _load(self) -> None: if self.model is not None: return if self.settings.require_cuda and not torch.cuda.is_available(): raise RuntimeError("CUDA is required. Set GUIDANCE_REQUIRE_CUDA=0 for CPU-only non-fast mode.") if self.settings.require_bf16 and torch.cuda.is_available() and not torch.cuda.is_bf16_supported(): raise RuntimeError("bf16 was requested, but this CUDA device does not support bf16.") cache_dir = os.environ["HF_HOME"] dtype = preferred_torch_dtype() self.tokenizer = AutoTokenizer.from_pretrained( self.settings.model_name, trust_remote_code=True, cache_dir=cache_dir, ) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.model = AutoModelForCausalLM.from_pretrained( self.settings.model_name, quantization_config=self._quantization_config(), device_map="auto", torch_dtype=dtype, trust_remote_code=True, cache_dir=cache_dir, low_cpu_mem_usage=True, ) if self.settings.adapter_dir: self.model = PeftModel.from_pretrained(self.model, self.settings.adapter_dir) self.model.eval() self.loaded_at = time.time() def unload(self) -> None: self.model = None self.tokenizer = None gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() def _maybe_unload(self) -> None: if self.settings.keep_loaded_seconds < 0: return if self.settings.keep_loaded_seconds == 0: self.unload() elif time.time() - self.loaded_at > self.settings.keep_loaded_seconds: self.unload() def advise(self, task: str, context: str = "", use_cache: bool = True) -> dict[str, Any]: signals = run_tools(task, context) payload = { "model": self.settings.model_name, "adapter": self.settings.adapter_dir, "task": task, "context": context, "max_new_tokens": self.settings.max_new_tokens, "tool_signals": signals.__dict__, } key = self.cache.key_for(payload) if use_cache: cached = self.cache.get(key) if cached: cached["cache_hit"] = True return cached self._load() messages = [ {"role": "system", "content": GUIDANCE_SYSTEM}, {"role": "user", "content": build_guidance_prompt(task, context, signals.as_prompt_text())}, ] prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = self.tokenizer( prompt, return_tensors="pt", truncation=True, max_length=self.settings.max_input_tokens, ).to(self.model.device) with torch.no_grad(): output = self.model.generate( **inputs, max_new_tokens=self.settings.max_new_tokens, do_sample=False, pad_token_id=self.tokenizer.eos_token_id, temperature=None, top_p=None, top_k=None, ) raw = self.tokenizer.decode(output[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True) try: result = extract_json(raw) except Exception: result = { "route": "plan", "confidence": 0.2, "useful_context": [], "plan": [], "patch_rules": ["Model returned invalid JSON; inspect raw_output."], "risks": ["Guidance parser failed."], "tests": [], "next_action": "Retry with less context.", "raw_output": raw, } result = enrich_with_tools(normalize_guidance(result), signals) result["cache_hit"] = False result["model"] = self.settings.model_name self.cache.put(key, result) self._maybe_unload() return result def generate_code(self, task: str, context: str = "", language: str = "python") -> str: """Generate code directly with the bundled coder sidecar. This is used by the outer multimodal model when a prompt explicitly asks for code-only output. The sidecar remains internal, but the returned text is final code rather than guidance JSON. """ self._load() messages = [ { "role": "system", "content": ( "You are the internal coding sidecar for a larger model. " "Return only source code. No prose, no markdown fences, no JSON. " "Implement the requested behavior directly and minimally. " "Do not add extra validation, restrictions, printing, examples, or wrapper code unless requested. " "For text parsing tasks, handle values embedded inside surrounding text and preserve signs." ), }, { "role": "user", "content": ( f"Language: {language}\n" f"Task:\n{task}\n\n" f"Relevant context:\n{context or 'No additional context.'}\n\n" "Return a complete minimal implementation only." ), }, ] prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = self.tokenizer( prompt, return_tensors="pt", truncation=True, max_length=self.settings.max_input_tokens, ).to(self.model.device) with torch.no_grad(): output = self.model.generate( **inputs, max_new_tokens=max(self.settings.max_new_tokens, 192), do_sample=False, pad_token_id=self.tokenizer.eos_token_id, temperature=None, top_p=None, top_k=None, ) raw = self.tokenizer.decode(output[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True).strip() match = re.search(r"```(?:\w+)?\s*(.*?)```", raw, flags=re.DOTALL) code = match.group(1).strip() if match else raw self._maybe_unload() return code def advise_project(self, task: str, root: str | Path = ".", use_cache: bool = True) -> dict[str, Any]: context = collect_project_context(Path(root).resolve(), self.settings.max_context_chars) return self.advise(task, context=context, use_cache=use_cache)