ayjays132's picture
Upload 101 files
d95ddd7 verified
import gc
import json
import os
import re
import time
from pathlib import Path
from typing import Any
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig, logging as transformers_logging
try:
from .cache import GuidanceCache
from .context import collect_project_context
from .prompts import GUIDANCE_SYSTEM, build_guidance_prompt
from .settings import GuidanceSettings
from .tools import ToolSignals, run_tools
except ImportError:
from guidance_sidecar.cache import GuidanceCache
from guidance_sidecar.context import collect_project_context
from guidance_sidecar.prompts import GUIDANCE_SYSTEM, build_guidance_prompt
from guidance_sidecar.settings import GuidanceSettings
from guidance_sidecar.tools import ToolSignals, run_tools
def configure_runtime(settings: GuidanceSettings) -> None:
cache_root = settings.resolve(settings.model_cache_dir)
tmp_root = settings.resolve(settings.tmp_dir)
tmp_root.mkdir(parents=True, exist_ok=True)
for key in ("HF_HOME", "HF_HUB_CACHE", "TRANSFORMERS_CACHE", "HF_DATASETS_CACHE", "HF_XET_CACHE"):
os.environ[key] = str(cache_root / key.lower())
for key in ("TMP", "TEMP", "TMPDIR"):
os.environ[key] = str(tmp_root)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
os.environ["TRANSFORMERS_VERBOSITY"] = "error"
os.environ["HF_HUB_DISABLE_PROGRESS_BARS"] = "1"
transformers_logging.set_verbosity_error()
if hasattr(transformers_logging, "disable_progress_bar"):
transformers_logging.disable_progress_bar()
def preferred_torch_dtype() -> torch.dtype:
if torch.cuda.is_available():
return torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16
return torch.float32
def extract_json(text: str) -> dict[str, Any]:
text = text.strip()
fenced = re.search(r"```(?:json)?\s*(.*?)```", text, re.DOTALL | re.IGNORECASE)
if fenced:
text = fenced.group(1).strip()
start = text.find("{")
end = text.rfind("}")
if start >= 0 and end > start:
text = text[start : end + 1]
try:
return json.loads(text)
except json.JSONDecodeError:
repaired = re.sub(r'(\]|\}|"|\d)\s*\n\s*("[-_a-zA-Z0-9]+"\s*:)', r"\1,\n \2", text)
repaired = re.sub(r",\s*([}\]])", r"\1", repaired)
return json.loads(repaired)
def normalize_guidance(result: dict[str, Any]) -> dict[str, Any]:
allowed_routes = ("plan", "patch", "critique", "test", "fix")
route = str(result.get("route", "plan")).lower()
result["route"] = next((candidate for candidate in allowed_routes if candidate in route), "plan")
try:
confidence = float(result.get("confidence", 0.0))
except (TypeError, ValueError):
confidence = 0.0
result["confidence"] = max(0.0, min(1.0, confidence))
for key in ("useful_context", "plan", "patch_rules", "risks", "tests"):
value = result.get(key, [])
if isinstance(value, str):
value = [value]
elif not isinstance(value, list):
value = []
result[key] = [str(item) for item in value if str(item).strip()]
result["next_action"] = str(result.get("next_action", "")).strip() or "Continue with the planned coding step."
return result
def enrich_with_tools(result: dict[str, Any], signals: ToolSignals) -> dict[str, Any]:
if result["route"] == "plan" and signals.route_hint != "plan":
result["route"] = signals.route_hint
for key, values in (
("risks", signals.risk_flags),
("tests", signals.suggested_tests),
("patch_rules", signals.patch_rules),
):
existing = result.get(key, [])
merged = existing + [value for value in values if value not in existing]
result[key] = merged[:8]
result["tool_signals"] = {
"route_hint": signals.route_hint,
"context_stats": signals.context_stats,
"calculations": signals.calculations,
}
return result
class GuidanceEngine:
def __init__(
self,
settings: GuidanceSettings | None = None,
model_name: str | None = None,
adapter_dir: str | None = None,
keep_loaded_seconds: int | None = None,
):
base = settings or GuidanceSettings()
self.settings = GuidanceSettings(
model_name=model_name or base.model_name,
adapter_dir=adapter_dir if adapter_dir is not None else base.adapter_dir,
cache_dir=base.cache_dir,
model_cache_dir=base.model_cache_dir,
tmp_dir=base.tmp_dir,
max_context_chars=base.max_context_chars,
max_input_tokens=base.max_input_tokens,
max_new_tokens=base.max_new_tokens,
keep_loaded_seconds=keep_loaded_seconds if keep_loaded_seconds is not None else base.keep_loaded_seconds,
quantization=base.quantization,
require_cuda=base.require_cuda,
require_bf16=base.require_bf16,
)
configure_runtime(self.settings)
self.cache = GuidanceCache(self.settings.resolve(self.settings.cache_dir))
self.model = None
self.tokenizer = None
self.loaded_at = 0.0
@classmethod
def from_bundle(cls, bundle_dir: str | Path) -> "GuidanceEngine":
return cls(GuidanceSettings.from_bundle(bundle_dir))
def _quantization_config(self) -> BitsAndBytesConfig | None:
if self.settings.quantization == "none":
return None
if self.settings.quantization != "4bit":
raise ValueError(f"Unsupported quantization: {self.settings.quantization}")
return BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
)
def _load(self) -> None:
if self.model is not None:
return
if self.settings.require_cuda and not torch.cuda.is_available():
raise RuntimeError("CUDA is required. Set GUIDANCE_REQUIRE_CUDA=0 for CPU-only non-fast mode.")
if self.settings.require_bf16 and torch.cuda.is_available() and not torch.cuda.is_bf16_supported():
raise RuntimeError("bf16 was requested, but this CUDA device does not support bf16.")
cache_dir = os.environ["HF_HOME"]
dtype = preferred_torch_dtype()
self.tokenizer = AutoTokenizer.from_pretrained(
self.settings.model_name,
trust_remote_code=True,
cache_dir=cache_dir,
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.model = AutoModelForCausalLM.from_pretrained(
self.settings.model_name,
quantization_config=self._quantization_config(),
device_map="auto",
torch_dtype=dtype,
trust_remote_code=True,
cache_dir=cache_dir,
low_cpu_mem_usage=True,
)
if self.settings.adapter_dir:
self.model = PeftModel.from_pretrained(self.model, self.settings.adapter_dir)
self.model.eval()
self.loaded_at = time.time()
def unload(self) -> None:
self.model = None
self.tokenizer = None
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
def _maybe_unload(self) -> None:
if self.settings.keep_loaded_seconds < 0:
return
if self.settings.keep_loaded_seconds == 0:
self.unload()
elif time.time() - self.loaded_at > self.settings.keep_loaded_seconds:
self.unload()
def advise(self, task: str, context: str = "", use_cache: bool = True) -> dict[str, Any]:
signals = run_tools(task, context)
payload = {
"model": self.settings.model_name,
"adapter": self.settings.adapter_dir,
"task": task,
"context": context,
"max_new_tokens": self.settings.max_new_tokens,
"tool_signals": signals.__dict__,
}
key = self.cache.key_for(payload)
if use_cache:
cached = self.cache.get(key)
if cached:
cached["cache_hit"] = True
return cached
self._load()
messages = [
{"role": "system", "content": GUIDANCE_SYSTEM},
{"role": "user", "content": build_guidance_prompt(task, context, signals.as_prompt_text())},
]
prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=self.settings.max_input_tokens,
).to(self.model.device)
with torch.no_grad():
output = self.model.generate(
**inputs,
max_new_tokens=self.settings.max_new_tokens,
do_sample=False,
pad_token_id=self.tokenizer.eos_token_id,
temperature=None,
top_p=None,
top_k=None,
)
raw = self.tokenizer.decode(output[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True)
try:
result = extract_json(raw)
except Exception:
result = {
"route": "plan",
"confidence": 0.2,
"useful_context": [],
"plan": [],
"patch_rules": ["Model returned invalid JSON; inspect raw_output."],
"risks": ["Guidance parser failed."],
"tests": [],
"next_action": "Retry with less context.",
"raw_output": raw,
}
result = enrich_with_tools(normalize_guidance(result), signals)
result["cache_hit"] = False
result["model"] = self.settings.model_name
self.cache.put(key, result)
self._maybe_unload()
return result
def generate_code(self, task: str, context: str = "", language: str = "python") -> str:
"""Generate code directly with the bundled coder sidecar.
This is used by the outer multimodal model when a prompt explicitly asks
for code-only output. The sidecar remains internal, but the returned text
is final code rather than guidance JSON.
"""
self._load()
messages = [
{
"role": "system",
"content": (
"You are the internal coding sidecar for a larger model. "
"Return only source code. No prose, no markdown fences, no JSON. "
"Implement the requested behavior directly and minimally. "
"Do not add extra validation, restrictions, printing, examples, or wrapper code unless requested. "
"For text parsing tasks, handle values embedded inside surrounding text and preserve signs."
),
},
{
"role": "user",
"content": (
f"Language: {language}\n"
f"Task:\n{task}\n\n"
f"Relevant context:\n{context or 'No additional context.'}\n\n"
"Return a complete minimal implementation only."
),
},
]
prompt = self.tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
inputs = self.tokenizer(
prompt,
return_tensors="pt",
truncation=True,
max_length=self.settings.max_input_tokens,
).to(self.model.device)
with torch.no_grad():
output = self.model.generate(
**inputs,
max_new_tokens=max(self.settings.max_new_tokens, 192),
do_sample=False,
pad_token_id=self.tokenizer.eos_token_id,
temperature=None,
top_p=None,
top_k=None,
)
raw = self.tokenizer.decode(output[0][inputs["input_ids"].shape[1] :], skip_special_tokens=True).strip()
match = re.search(r"```(?:\w+)?\s*(.*?)```", raw, flags=re.DOTALL)
code = match.group(1).strip() if match else raw
self._maybe_unload()
return code
def advise_project(self, task: str, root: str | Path = ".", use_cache: bool = True) -> dict[str, Any]:
context = collect_project_context(Path(root).resolve(), self.settings.max_context_chars)
return self.advise(task, context=context, use_cache=use_cache)