MultiAgent / app.py
AlsuGibadullina's picture
Create app.py
8d57631 verified
import os
import re
import json
from dataclasses import dataclass, asdict
from typing import Any, Dict, List, Optional, Tuple
import gradio as gr
# =======================
# .env + проверка OLLAMA_API_KEY (как в примере)
# =======================
try:
from dotenv import load_dotenv
load_dotenv()
except Exception:
pass
OLLAMA_API_KEY_ENV = "OLLAMA_API_KEY"
OLLAMA_API_KEY = os.environ.get(OLLAMA_API_KEY_ENV)
if not OLLAMA_API_KEY:
raise RuntimeError(
f"Переменная окружения {OLLAMA_API_KEY_ENV} не задана.\n"
f"В Hugging Face Spaces её нужно добавить в Settings → Variables/Secrets."
)
# =======================
# Ollama Cloud -> smolagents.Model adapter (упрощённо, но совместимо по идее с примером)
# =======================
from ollama import Client
class OllamaCloudModel:
"""
Мини-адаптер Ollama Cloud для генерации текста (не поток).
"""
def __init__(
self,
model_id: str = "gpt-oss:120b",
host: str = "https://ollama.com",
api_key_env: str = "OLLAMA_API_KEY",
):
api_key = os.environ.get(api_key_env)
if not api_key:
raise ValueError(f"Не найден {api_key_env} в окружении")
self.model_id = model_id
self.client = Client(host=host, headers={"Authorization": f"Bearer {api_key}"})
def _to_text(self, content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = []
for p in content:
if isinstance(p, dict):
parts.append(p.get("text", ""))
else:
parts.append(str(p))
return "".join(parts)
return str(content)
def generate(self, messages: List[Dict[str, str]], stop_sequences: Optional[List[str]] = None) -> str:
resp = self.client.chat(model=self.model_id, messages=messages, stream=False)
text = self._to_text((resp.get("message") or {}).get("content", ""))
if stop_sequences:
for s in stop_sequences:
if not s:
continue
idx = text.find(s)
if idx != -1:
text = text[:idx]
break
return text
# =======================
# Модели данных
# =======================
@dataclass
class NormalizedRequirement:
req_id: str
source_text: str
normalized_text: str
classification: str # functional / non-functional / constraint / assumption / other
tags: List[str]
@dataclass
class QualityMetrics:
invert_score: float # 0..1
smart_score: float # 0..1
gqm_score: float # 0..1
verifiability: float # 0..1
specificity: float # 0..1
ambiguity: float # 0..1 (1 = очень неоднозначно)
overall: float # 0..1
@dataclass
class Defect:
code: str
severity: str # low/medium/high
message: str
spans: List[str]
@dataclass
class QualityReport:
req_id: str
metrics: QualityMetrics
defects: List[Defect]
@dataclass
class RefactorResult:
req_id: str
improved_text: str
fixed_defects: List[str]
explanation: str
# =======================
# AnalysisAgent: сегментация + нормализация + базовая классификация
# =======================
class AnalysisAgent:
REQ_LINE_RE = re.compile(r"^\s*(?:REQ[-_ ]?\d+|R\d+|\d+[.)]|-|\*)\s+(.*)\s*$", re.IGNORECASE)
def segment(self, srs: str) -> List[str]:
srs = (srs or "").strip()
if not srs:
return []
# 1) если требования уже построчно/списком — берём строки
lines = [ln.strip() for ln in srs.splitlines() if ln.strip()]
# 2) склеиваем многострочные пункты: если строка не похожа на начало нового пункта — считаем продолжением
chunks: List[str] = []
buf: List[str] = []
for ln in lines:
is_new = bool(self.REQ_LINE_RE.match(ln)) or ln.lower().startswith(("shall", "must", "should", "system", "приложение", "система"))
if is_new and buf:
chunks.append(" ".join(buf).strip())
buf = []
buf.append(ln)
if buf:
chunks.append(" ".join(buf).strip())
# fallback: если получилось слишком мало — режем по точке с запятой/точкам с большими буквами
if len(chunks) <= 1 and len(srs) > 300:
parts = re.split(r"(?<=[.;])\s+(?=[A-ZА-ЯЁ])", srs)
parts = [p.strip() for p in parts if p.strip()]
if len(parts) > len(chunks):
chunks = parts
return chunks
def classify(self, text: str) -> Tuple[str, List[str]]:
t = text.lower()
tags: List[str] = []
# очень грубая эвристика
if any(k in t for k in ["ms", "сек", "seconds", "performance", "latency", "rps", "нагруз", "время отклика", "доступност", "sla"]):
tags.append("performance")
if any(k in t for k in ["security", "шифр", "oauth", "jwt", "роль", "доступ", "авторизац", "аутентификац", "pII".lower()]):
tags.append("security")
if any(k in t for k in ["log", "лог", "audit", "трасс", "наблюдаем", "метрик", "monitor", "алерт"]):
tags.append("observability")
if any(k in t for k in ["ui", "ux", "интерфейс", "экран", "форма", "кнопк"]):
tags.append("ui")
# classification
if any(k in t for k in ["должен", "must", "shall", "обязан"]):
# функциональные чаще про действия/кейсы
if any(k in t for k in ["созда", "удаля", "обнов", "получ", "отправ", "импорт", "экспорт", "рассчит", "показ", "поиск", "фильтр"]):
return "functional", tags
# нефункциональные — про качество/ограничения
if any(k in t for k in ["не менее", "не более", "ms", "сек", "доступност", "безопасн", "масштаб", "производит", "надежн", "совместим"]):
return "non-functional", tags
return "other", tags
if any(k in t for k in ["предполагается", "assume", "assumption"]):
return "assumption", tags
if any(k in t for k in ["огранич", "constraint", "только", "запрещ", "не допускается"]):
return "constraint", tags
return "other", tags
def normalize(self, text: str) -> str:
# убираем маркеры типа "1) " "REQ-1 " "-" "*"
m = self.REQ_LINE_RE.match(text)
if m:
text = m.group(1).strip()
text = re.sub(r"\s+", " ", text).strip()
# нормализация "Система должна" в единую форму, но без фанатизма
text = re.sub(r"^(система|приложение)\s+", "Система ", text, flags=re.IGNORECASE)
return text
def run(self, srs: str) -> List[NormalizedRequirement]:
segments = self.segment(srs)
out: List[NormalizedRequirement] = []
for i, seg in enumerate(segments, start=1):
norm = self.normalize(seg)
cls, tags = self.classify(norm)
out.append(
NormalizedRequirement(
req_id=f"REQ-{i:03d}",
source_text=seg,
normalized_text=norm,
classification=cls,
tags=tags,
)
)
return out
# =======================
# QualityAgent: rule-based + (упрощённые) INVERT / SMART / GQM
# =======================
class QualityAgent:
AMBIGUOUS_WORDS = [
"быстро", "удобно", "понятно", "желательно", "примерно", "как можно", "оптимально",
"в разумные сроки", "и т.п.", "и т.д.", "достаточно", "при необходимости",
"some", "several", "as needed", "etc", "quickly", "user-friendly",
]
WEAK_MODALS = ["может", "could", "may", "желательно", "should"]
STRONG_MODALS = ["должен", "must", "shall", "обязан"]
UNVERIFIABLE = ["удобно", "интуитивно", "красиво", "понятно", "friendly", "intuitive"]
NUMBER_RE = re.compile(r"(\d+(?:[.,]\d+)?)\s*(ms|сек|s|seconds|%|rps|rpm|gb|mb|kb|мин|min|hours|час|дней|day|days)\b", re.IGNORECASE)
def _find_spans(self, text: str, needles: List[str]) -> List[str]:
found: List[str] = []
t = text.lower()
for w in needles:
if w.lower() in t:
found.append(w)
return found
def _defect(self, code: str, severity: str, message: str, spans: List[str]) -> Defect:
return Defect(code=code, severity=severity, message=message, spans=spans)
def evaluate(self, req: NormalizedRequirement) -> QualityReport:
text = req.normalized_text.strip()
t = text.lower()
defects: List[Defect] = []
amb = self._find_spans(text, self.AMBIGUOUS_WORDS)
if amb:
defects.append(self._defect(
code="AMBIGUOUS_WORDING",
severity="high" if len(amb) >= 2 else "medium",
message="Найдены неоднозначные/расплывчатые слова. Замените на измеримые критерии.",
spans=amb,
))
weak = self._find_spans(text, self.WEAK_MODALS)
if weak:
defects.append(self._defect(
code="WEAK_MODAL",
severity="medium",
message="Используются слабые модальные слова (может/should). Требование может стать необязательным.",
spans=weak,
))
has_strong = any(m in t for m in self.STRONG_MODALS)
if not has_strong and req.classification in ("functional", "non-functional", "constraint"):
defects.append(self._defect(
code="MISSING_NORMATIVE_MODAL",
severity="medium",
message="Нет явной нормативности (должен/must/shall). Уточните обязательность.",
spans=[],
))
# Проверяем измеримость (числа+единицы) для NFR/качества
has_numbers = bool(self.NUMBER_RE.search(text))
if req.classification == "non-functional" and not has_numbers:
defects.append(self._defect(
code="MISSING_MEASURABLE_CRITERIA",
severity="high",
message="Для нефункционального требования не задан измеримый критерий (число + единица).",
spans=[],
))
unver = self._find_spans(text, self.UNVERIFIABLE)
if unver:
defects.append(self._defect(
code="POOR_VERIFIABILITY",
severity="high",
message="Формулировка плохо проверяема (субъективные критерии).",
spans=unver,
))
# Простейшая проверка на “одна мысль — одно требование”
if len(re.split(r"\b(и|and|или|or)\b", t)) > 7:
defects.append(self._defect(
code="MULTIPLE_REQUIREMENTS_IN_ONE",
severity="medium",
message="Похоже, в одном предложении смешано несколько требований. Разбейте на отдельные пункты.",
spans=[],
))
# --- Метрики (упрощённые) ---
# ambiguity: 0..1 (1 = много неоднозначностей)
ambiguity = min(1.0, 0.2 * len(amb) + (0.15 if weak else 0.0))
# specificity: штрафуем за отсутствие чисел в NFR и за расплывчатость
specificity = 1.0
if req.classification == "non-functional" and not has_numbers:
specificity -= 0.5
specificity -= 0.15 * len(amb)
specificity = max(0.0, min(1.0, specificity))
# verifiability: штраф за субъективность и отсутствие критериев
verifiability = 1.0
if unver:
verifiability -= 0.6
if req.classification == "non-functional" and not has_numbers:
verifiability -= 0.4
verifiability = max(0.0, min(1.0, verifiability))
# SMART (S,M,A,R,T) — приближение через специфичность + измеримость + проверяемость
smart_score = max(0.0, min(1.0, (specificity * 0.45 + verifiability * 0.55)))
# INVERT — приближение: отсутствие неоднозначности + “однозначная проверка”
invert_score = max(0.0, min(1.0, (1.0 - ambiguity) * 0.55 + verifiability * 0.45))
# GQM — если есть (Goal/Question/Metric) хотя бы частично: наличие метрик/критериев и “что проверять”
gqm_score = 0.3
if has_numbers:
gqm_score += 0.4
if has_strong:
gqm_score += 0.2
if not amb:
gqm_score += 0.1
gqm_score = max(0.0, min(1.0, gqm_score))
overall = max(0.0, min(1.0, (invert_score + smart_score + gqm_score) / 3.0))
metrics = QualityMetrics(
invert_score=round(invert_score, 3),
smart_score=round(smart_score, 3),
gqm_score=round(gqm_score, 3),
verifiability=round(verifiability, 3),
specificity=round(specificity, 3),
ambiguity=round(ambiguity, 3),
overall=round(overall, 3),
)
return QualityReport(req_id=req.req_id, metrics=metrics, defects=defects)
def run(self, req: NormalizedRequirement) -> QualityReport:
return self.evaluate(req)
# =======================
# RefactorAgent: LLM-рефакторинг требования по дефектам + целевым критериям
# =======================
class RefactorAgent:
def __init__(self, model: OllamaCloudModel):
self.model = model
def run(self, req: NormalizedRequirement, report: QualityReport) -> RefactorResult:
defects = report.defects or []
defects_md = "\n".join(
[f"- [{d.severity}] {d.code}: {d.message}" + (f" (спаны: {', '.join(d.spans)})" if d.spans else "")
for d in defects]
) or "- (дефектов не найдено)"
target_criteria = """
Целевые критерии улучшения:
- SMART: конкретно, измеримо, достижимо, релевантно, ограничено по времени/условиям где уместно
- INVERT (упрощённо): однозначность, проверяемость, атомарность, отсутствие расплывчатых слов
- GQM: чтобы было понятно, как измерять/проверять (метрика/критерий приемки)
"""
prompt = f"""
Ты — эксперт по требованиям (SRS). Улучши формулировку требования.
Исходное требование (req_id={req.req_id}):
{req.normalized_text}
Классификация: {req.classification}
Теги: {", ".join(req.tags) if req.tags else "-"}
Выявленные дефекты:
{defects_md}
{target_criteria}
Задача:
1) Верни улучшенную формулировку (одной строкой или несколькими, но без воды).
2) Затем коротко перечисли, какие дефекты исправлены.
3) Если требование нефункциональное и нет метрик — добавь измеримый критерий (число + единица),
но НЕ выдумывай домен: используй нейтральные критерии (например, время отклика, точность, SLA) и пометь как "пример".
4) Не добавляй лишних требований, только уточняй существующее.
Формат ответа (СТРОГО):
IMPROVED:
<текст>
FIXED_DEFECTS:
- <код1>
- <код2>
EXPLANATION:
<1-3 предложения>
""".strip()
out = self.model.generate([{"role": "user", "content": prompt}])
improved, fixed, expl = self._parse_refactor(out, fallback=req.normalized_text, defects=defects)
return RefactorResult(
req_id=req.req_id,
improved_text=improved,
fixed_defects=fixed,
explanation=expl,
)
def _parse_refactor(self, text: str, fallback: str, defects: List[Defect]) -> Tuple[str, List[str], str]:
improved = fallback
fixed_codes = [d.code for d in defects]
explanation = "Уточнена формулировка с учётом найденных дефектов."
# грубый парсер по секциям
m1 = re.search(r"IMPROVED:\s*(.+?)\n\s*FIXED_DEFECTS:", text, flags=re.S | re.I)
if m1:
improved = m1.group(1).strip()
m2 = re.search(r"FIXED_DEFECTS:\s*(.+?)\n\s*EXPLANATION:", text, flags=re.S | re.I)
if m2:
block = m2.group(1).strip()
codes = []
for ln in block.splitlines():
ln = ln.strip()
if ln.startswith("-"):
codes.append(ln[1:].strip())
if codes:
fixed_codes = codes
m3 = re.search(r"EXPLANATION:\s*(.+)\s*$", text, flags=re.S | re.I)
if m3:
explanation = m3.group(1).strip()
return improved, fixed_codes, explanation
# =======================
# PlantUML: простая валидация/минимальные правки + (опционально) LLM-подсказки
# =======================
class PlantUMLAgent:
START = "@startuml"
END = "@enduml"
def extract_blocks(self, text: str) -> List[str]:
if not text:
return []
blocks = []
pattern = re.compile(r"@startuml[\s\S]*?@enduml", re.IGNORECASE)
for m in pattern.finditer(text):
blocks.append(m.group(0).strip())
return blocks
def basic_fix(self, block: str) -> Tuple[str, List[str]]:
changes: List[str] = []
b = block.strip()
if self.START not in b.lower():
b = f"@startuml\n{b}"
changes.append("Добавлен @startuml в начале блока.")
if self.END not in b.lower():
b = f"{b}\n@enduml"
changes.append("Добавлен @enduml в конце блока.")
# нормализуем переносы и табы
b2 = b.replace("\t", " ")
if b2 != b:
b = b2
changes.append("Заменены табы на пробелы.")
# типовые опечатки
b2 = re.sub(r"@end\s*uml", "@enduml", b, flags=re.IGNORECASE)
if b2 != b:
b = b2
changes.append("Исправлено '@end uml' → '@enduml'.")
return b.strip(), changes
def run(self, plantuml_text: str) -> Tuple[str, List[str]]:
blocks = self.extract_blocks(plantuml_text)
if not blocks and (plantuml_text or "").strip():
# если пользователь вставил диаграмму без start/end — считаем это одним блоком
blocks = [(plantuml_text or "").strip()]
fixed_blocks: List[str] = []
changes: List[str] = []
for i, bl in enumerate(blocks, start=1):
fixed, ch = self.basic_fix(bl)
fixed_blocks.append(fixed)
for c in ch:
changes.append(f"Диаграмма #{i}: {c}")
return ("\n\n".join(fixed_blocks)).strip(), changes
# =======================
# Orchestrator: связывает агентов и собирает единый ответ
# =======================
class Orchestrator:
def __init__(self, model_id: str):
self.model = OllamaCloudModel(model_id=model_id.strip() or "gpt-oss:120b")
self.analysis = AnalysisAgent()
self.quality = QualityAgent()
self.refactor = RefactorAgent(self.model)
self.plantuml = PlantUMLAgent()
def run(self, srs: str, plantuml_optional: str) -> Dict[str, Any]:
changes: List[str] = []
tips: List[str] = []
# 1) PlantUML (опционально)
fixed_diagrams = ""
diagram_changes: List[str] = []
if (plantuml_optional or "").strip():
fixed_diagrams, diagram_changes = self.plantuml.run(plantuml_optional)
changes.extend(diagram_changes)
# 2) Analysis
normalized = self.analysis.run(srs)
if not normalized:
return {
"final_md": "❗️Не нашёл требований в тексте SRS. Вставь хотя бы 1-2 пункта требований.",
"normalized_json": "[]",
"quality_json": "[]",
"refactor_json": "[]",
}
# 3) Quality + Refactor
quality_reports: List[QualityReport] = []
refactors: List[RefactorResult] = []
for req in normalized:
qr = self.quality.run(req)
quality_reports.append(qr)
# если дефектов нет — можно не трогать; но обычно полезно унифицировать стиль
rr = self.refactor.run(req, qr) if (qr.defects or []) else RefactorResult(
req_id=req.req_id,
improved_text=req.normalized_text,
fixed_defects=[],
explanation="Дефектов не выявлено, формулировка оставлена без изменений.",
)
refactors.append(rr)
# изменения по требованию
if rr.improved_text.strip() != req.normalized_text.strip():
changes.append(f"{req.req_id}: улучшена формулировка требования.")
if qr.defects:
tips.append(f"{req.req_id}: проверь критерии приемки и измеримость (особенно если это NFR).")
# 4) Сборка пользовательского ответа
final_md = self._render_final(normalized, quality_reports, refactors, fixed_diagrams, changes, tips)
return {
"final_md": final_md,
"normalized_json": json.dumps([asdict(x) for x in normalized], ensure_ascii=False, indent=2),
"quality_json": json.dumps([self._asdict_quality(x) for x in quality_reports], ensure_ascii=False, indent=2),
"refactor_json": json.dumps([asdict(x) for x in refactors], ensure_ascii=False, indent=2),
}
def _asdict_quality(self, qr: QualityReport) -> Dict[str, Any]:
return {
"req_id": qr.req_id,
"metrics": asdict(qr.metrics),
"defects": [asdict(d) for d in (qr.defects or [])],
}
def _render_final(
self,
normalized: List[NormalizedRequirement],
quality: List[QualityReport],
refactors: List[RefactorResult],
fixed_diagrams: str,
changes: List[str],
tips: List[str],
) -> str:
q_by_id = {q.req_id: q for q in quality}
r_by_id = {r.req_id: r for r in refactors}
md = []
md.append("# ✅ Исправленные требования и рекомендации\n")
md.append("## 1) Итоговые требования (улучшенные формулировки)\n")
for req in normalized:
rr = r_by_id[req.req_id]
qr = q_by_id[req.req_id]
md.append(f"### {req.req_id}{req.classification}\n")
md.append(f"**Было:** {req.normalized_text}\n")
md.append(f"**Стало:** {rr.improved_text}\n")
md.append("**Метрики качества (0..1):**")
md.append(
f"- overall: {qr.metrics.overall} | SMART: {qr.metrics.smart_score} | INVERT: {qr.metrics.invert_score} | GQM: {qr.metrics.gqm_score}"
)
if qr.defects:
md.append("**Дефекты:**")
for d in qr.defects:
spans = f" (спаны: {', '.join(d.spans)})" if d.spans else ""
md.append(f"- [{d.severity}] {d.code}: {d.message}{spans}")
md.append("**Что исправлено:**")
if rr.fixed_defects:
for c in rr.fixed_defects:
md.append(f"- {c}")
else:
md.append("- (ничего)")
md.append(f"**Пояснение:** {rr.explanation}\n")
if fixed_diagrams.strip():
md.append("## 2) Диаграммы (PlantUML) с исправлениями\n")
md.append("```plantuml")
md.append(fixed_diagrams.strip())
md.append("```\n")
md.append("## 3) Список изменений\n")
if changes:
for c in changes:
md.append(f"- {c}")
else:
md.append("- (изменений не зафиксировано)")
md.append("\n## 4) Советы\n")
uniq_tips = []
for t in tips:
if t not in uniq_tips:
uniq_tips.append(t)
if uniq_tips:
for t in uniq_tips[:20]:
md.append(f"- {t}")
else:
md.append("- Требования выглядят неплохо. Если это финальная версия, добавь тестируемые критерии приемки там, где это важно.")
md.append("\n---\n")
md.append("_Примечание: метрики INVERT/SMART/GQM здесь реализованы в упрощённом виде (практическая эвристика), "
"а улучшение текста делает LLM-агент по списку дефектов._")
return "\n".join(md)
# =======================
# Gradio UI
# =======================
def gradio_run(
srs_text: str,
plantuml_text: str,
model_id: str,
):
orch = Orchestrator(model_id=model_id)
res = orch.run(srs_text or "", plantuml_text or "")
return res["final_md"], res["normalized_json"], res["quality_json"], res["refactor_json"]
with gr.Blocks() as demo:
gr.Markdown(
"""
# 🧩 Multi-Agent SRS Refiner (Orchestrator + Analysis + Quality + Refactor)
Вставь SRS (требования) и (опционально) PlantUML — получишь:
- улучшенные формулировки требований
- метрики качества и дефекты
- (опционально) исправленные PlantUML блоки
- список изменений и советы
"""
)
with gr.Row():
with gr.Column(scale=2):
srs_input = gr.Textbox(
label="SRS / Требования (текст)",
placeholder="Вставь требования списком (по строкам) или цельным текстом.",
lines=14,
)
plantuml_input = gr.Textbox(
label="PlantUML (опционально)",
placeholder="@startuml ... @enduml (можно несколько блоков)",
lines=10,
)
with gr.Column(scale=1):
model_id_input = gr.Textbox(
label="Ollama Cloud модель",
value="gpt-oss:120b",
placeholder="Например: gpt-oss:120b",
)
run_btn = gr.Button("Запустить оркестратора")
out_md = gr.Markdown(label="Результат")
with gr.Accordion("Debug: Нормализованные требования (JSON)", open=False):
out_norm = gr.Code(language="json")
with gr.Accordion("Debug: Оценка качества (JSON)", open=False):
out_quality = gr.Code(language="json")
with gr.Accordion("Debug: Рефакторинг (JSON)", open=False):
out_ref = gr.Code(language="json")
run_btn.click(
fn=gradio_run,
inputs=[srs_input, plantuml_input, model_id_input],
outputs=[out_md, out_norm, out_quality, out_ref],
)
if __name__ == "__main__":
demo.launch()