Parsa2025AI commited on
Commit
d2a44e3
Β·
verified Β·
1 Parent(s): cbba49b

analyzer model

Browse files
Files changed (1) hide show
  1. app/models/analyzer.py +72 -101
app/models/analyzer.py CHANGED
@@ -1,124 +1,94 @@
1
- import logging
 
 
 
 
2
  import os
 
 
3
  from typing import Optional, Dict, Any
4
 
5
- import torch
6
- from app.config import settings
7
  from app.models.patterns import analyze_with_patterns
8
 
9
  logger = logging.getLogger(__name__)
10
 
11
- _model = None
12
- _tokenizer = None
13
- _model_loaded: bool = False
 
 
14
 
15
 
16
- def _load_model() -> bool:
17
- """Lazy-load the fine-tuned model (called once on first inference)."""
18
- global _model, _tokenizer, _model_loaded
 
 
 
 
 
19
 
20
- if _model_loaded:
21
- return True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
 
23
  try:
24
- from transformers import AutoTokenizer, AutoModelForCausalLM
25
- from peft import PeftModel, PeftConfig
26
-
27
- # Prefer HF Hub model over local path
28
- model_source = settings.HF_MODEL_ID or settings.MODEL_PATH
29
-
30
- if not settings.HF_MODEL_ID and not os.path.exists(settings.MODEL_PATH):
31
- logger.warning("No model found – falling back to pattern analysis only.")
32
- return False
33
-
34
- logger.info(f"Loading model from: {model_source}")
35
-
36
- hf_kwargs = {}
37
- if settings.HF_TOKEN:
38
- hf_kwargs["token"] = settings.HF_TOKEN
39
-
40
- peft_config = PeftConfig.from_pretrained(model_source, **hf_kwargs)
41
- base_name = peft_config.base_model_name_or_path
42
-
43
- base_model = AutoModelForCausalLM.from_pretrained(
44
- base_name,
45
- torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
46
- device_map="auto" if torch.cuda.is_available() else None,
47
- low_cpu_mem_usage=True,
48
- **hf_kwargs,
49
- )
50
 
51
- _model = PeftModel.from_pretrained(base_model, model_source, **hf_kwargs)
52
- _tokenizer = AutoTokenizer.from_pretrained(model_source, **hf_kwargs)
 
 
53
 
54
- if _tokenizer.pad_token is None:
55
- _tokenizer.pad_token = _tokenizer.eos_token
 
56
 
57
- _model.eval()
58
- _model_loaded = True
59
- logger.info("βœ… Model loaded successfully.")
60
- return True
61
 
62
- except Exception as exc:
63
- logger.error(f"Model load failed: {exc}")
64
- return False
65
-
66
-
67
- def _llm_analyze(solidity_code: str) -> Optional[str]:
68
- """Run LLM inference. Returns raw text or None on failure."""
69
- if not _load_model():
70
  return None
71
 
72
- try:
73
- prompt = (
74
- "<|system|>\nYou are an expert Solidity security auditor. "
75
- "Analyze smart contracts for vulnerabilities and provide concise security reports.\n"
76
- "<|endoftext|>\n\n"
77
- "<|user|>\nAnalyze this Solidity contract for security vulnerabilities:\n\n"
78
- f"```solidity\n{solidity_code[:settings.MAX_INPUT_LENGTH]}\n```\n<|endoftext|>\n\n"
79
- "<|assistant|>\n"
80
- )
81
-
82
- inputs = _tokenizer(
83
- prompt,
84
- return_tensors="pt",
85
- truncation=True,
86
- max_length=512,
87
- )
88
- device = next(_model.parameters()).device
89
- inputs = {k: v.to(device) for k, v in inputs.items()}
90
-
91
- with torch.no_grad():
92
- outputs = _model.generate(
93
- **inputs,
94
- max_new_tokens=settings.MAX_NEW_TOKENS,
95
- temperature=settings.TEMPERATURE,
96
- do_sample=True,
97
- pad_token_id=_tokenizer.pad_token_id,
98
- use_cache=False,
99
- )
100
-
101
- generated = outputs[0][len(inputs["input_ids"][0]):]
102
- return _tokenizer.decode(generated, skip_special_tokens=True).strip()
103
-
104
  except Exception as exc:
105
- logger.error(f"LLM inference error: {exc}")
106
  return None
107
 
108
 
109
  def analyze_contract(solidity_code: str) -> Dict[str, Any]:
110
- """
111
- Main entry point.
112
- 1. Always run pattern analysis (fast, reliable).
113
- 2. If model is available, also run LLM analysis.
114
- 3. Return combined result.
115
- """
116
  pattern_result = analyze_with_patterns(solidity_code)
117
-
118
- llm_text: Optional[str] = None
119
- if not settings.USE_PATTERN_FALLBACK or settings.HF_MODEL_ID or os.path.exists(settings.MODEL_PATH):
120
- llm_text = _llm_analyze(solidity_code)
121
-
122
  return {
123
  **pattern_result,
124
  "llm_analysis": llm_text,
@@ -128,7 +98,8 @@ def analyze_contract(solidity_code: str) -> Dict[str, Any]:
128
 
129
  def model_status() -> Dict[str, Any]:
130
  return {
131
- "model_loaded": _model_loaded,
132
- "model_source": settings.HF_MODEL_ID or settings.MODEL_PATH,
133
- "device": "cuda" if torch.cuda.is_available() else "cpu",
134
- }
 
 
1
+ """
2
+ analyzer.py – Smart Contract Auditor
3
+ LLM inference via HF Inference API (no local model loading β†’ no OOM).
4
+ Pattern analysis always runs as baseline.
5
+ """
6
  import os
7
+ import logging
8
+ import requests
9
  from typing import Optional, Dict, Any
10
 
 
 
11
  from app.models.patterns import analyze_with_patterns
12
 
13
  logger = logging.getLogger(__name__)
14
 
15
+ # ── Config from environment ──────────────────────────────────────────────────
16
+ HF_MODEL_ID = os.getenv("HF_MODEL_ID", "")
17
+ HF_TOKEN = os.getenv("HF_TOKEN", "")
18
+ MAX_TOKENS = int(os.getenv("MAX_NEW_TOKENS", "300"))
19
+ TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
20
 
21
 
22
+ def _call_hf_inference_api(solidity_code: str) -> Optional[str]:
23
+ """
24
+ Call HuggingFace Serverless Inference API.
25
+ Free tier: ~30k tokens/month, no GPU needed in the Space.
26
+ """
27
+ if not HF_MODEL_ID:
28
+ logger.info("HF_MODEL_ID not set – skipping LLM analysis.")
29
+ return None
30
 
31
+ api_url = f"https://api-inference.huggingface.co/models/{HF_MODEL_ID}"
32
+
33
+ headers = {"Content-Type": "application/json"}
34
+ if HF_TOKEN:
35
+ headers["Authorization"] = f"Bearer {HF_TOKEN}"
36
+
37
+ prompt = (
38
+ "<|system|>\n"
39
+ "You are an expert Solidity security auditor. "
40
+ "Analyze smart contracts for vulnerabilities and provide concise security reports.\n"
41
+ "<|endoftext|>\n\n"
42
+ "<|user|>\n"
43
+ f"Analyze this Solidity contract for security vulnerabilities:\n\n"
44
+ f"```solidity\n{solidity_code[:1500]}\n```\n"
45
+ "<|endoftext|>\n\n"
46
+ "<|assistant|>\n"
47
+ )
48
+
49
+ payload = {
50
+ "inputs": prompt,
51
+ "parameters": {
52
+ "max_new_tokens": MAX_TOKENS,
53
+ "temperature": TEMPERATURE,
54
+ "return_full_text": False,
55
+ "do_sample": True,
56
+ },
57
+ "options": {
58
+ "wait_for_model": True,
59
+ "use_cache": False,
60
+ },
61
+ }
62
 
63
  try:
64
+ resp = requests.post(api_url, headers=headers, json=payload, timeout=60)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
65
 
66
+ if resp.status_code == 503:
67
+ logger.warning("Model is loading on HF side, retrying once…")
68
+ import time; time.sleep(20)
69
+ resp = requests.post(api_url, headers=headers, json=payload, timeout=60)
70
 
71
+ if resp.status_code != 200:
72
+ logger.error(f"HF API error {resp.status_code}: {resp.text[:200]}")
73
+ return None
74
 
75
+ data = resp.json()
76
+ if isinstance(data, list) and data:
77
+ return data[0].get("generated_text", "").strip()
 
78
 
 
 
 
 
 
 
 
 
79
  return None
80
 
81
+ except requests.exceptions.Timeout:
82
+ logger.error("HF Inference API timed out.")
83
+ return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
84
  except Exception as exc:
85
+ logger.error(f"HF Inference API call failed: {exc}")
86
  return None
87
 
88
 
89
  def analyze_contract(solidity_code: str) -> Dict[str, Any]:
 
 
 
 
 
 
90
  pattern_result = analyze_with_patterns(solidity_code)
91
+ llm_text = _call_hf_inference_api(solidity_code)
 
 
 
 
92
  return {
93
  **pattern_result,
94
  "llm_analysis": llm_text,
 
98
 
99
  def model_status() -> Dict[str, Any]:
100
  return {
101
+ "model_loaded": False,
102
+ "inference_mode": "HF Inference API",
103
+ "model_id": HF_MODEL_ID or "not configured",
104
+ "hf_token_set": bool(HF_TOKEN),
105
+ }