import os import re import json from dataclasses import dataclass, asdict from typing import Any, Dict, List, Optional, Tuple import gradio as gr # ======================= # .env + проверка OLLAMA_API_KEY (как в примере) # ======================= try: from dotenv import load_dotenv load_dotenv() except Exception: pass OLLAMA_API_KEY_ENV = "OLLAMA_API_KEY" OLLAMA_API_KEY = os.environ.get(OLLAMA_API_KEY_ENV) if not OLLAMA_API_KEY: raise RuntimeError( f"Переменная окружения {OLLAMA_API_KEY_ENV} не задана.\n" f"В Hugging Face Spaces её нужно добавить в Settings → Variables/Secrets." ) # ======================= # Ollama Cloud -> smolagents.Model adapter (упрощённо, но совместимо по идее с примером) # ======================= from ollama import Client class OllamaCloudModel: """ Мини-адаптер Ollama Cloud для генерации текста (не поток). """ def __init__( self, model_id: str = "gpt-oss:120b", host: str = "https://ollama.com", api_key_env: str = "OLLAMA_API_KEY", ): api_key = os.environ.get(api_key_env) if not api_key: raise ValueError(f"Не найден {api_key_env} в окружении") self.model_id = model_id self.client = Client(host=host, headers={"Authorization": f"Bearer {api_key}"}) def _to_text(self, content: Any) -> str: if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): parts = [] for p in content: if isinstance(p, dict): parts.append(p.get("text", "")) else: parts.append(str(p)) return "".join(parts) return str(content) def generate(self, messages: List[Dict[str, str]], stop_sequences: Optional[List[str]] = None) -> str: resp = self.client.chat(model=self.model_id, messages=messages, stream=False) text = self._to_text((resp.get("message") or {}).get("content", "")) if stop_sequences: for s in stop_sequences: if not s: continue idx = text.find(s) if idx != -1: text = text[:idx] break return text # ======================= # Модели данных # ======================= @dataclass class NormalizedRequirement: req_id: str source_text: str normalized_text: str classification: str # functional / non-functional / constraint / assumption / other tags: List[str] @dataclass class QualityMetrics: invert_score: float # 0..1 smart_score: float # 0..1 gqm_score: float # 0..1 verifiability: float # 0..1 specificity: float # 0..1 ambiguity: float # 0..1 (1 = очень неоднозначно) overall: float # 0..1 @dataclass class Defect: code: str severity: str # low/medium/high message: str spans: List[str] @dataclass class QualityReport: req_id: str metrics: QualityMetrics defects: List[Defect] @dataclass class RefactorResult: req_id: str improved_text: str fixed_defects: List[str] explanation: str # ======================= # AnalysisAgent: сегментация + нормализация + базовая классификация # ======================= class AnalysisAgent: REQ_LINE_RE = re.compile(r"^\s*(?:REQ[-_ ]?\d+|R\d+|\d+[.)]|-|\*)\s+(.*)\s*$", re.IGNORECASE) def segment(self, srs: str) -> List[str]: srs = (srs or "").strip() if not srs: return [] # 1) если требования уже построчно/списком — берём строки lines = [ln.strip() for ln in srs.splitlines() if ln.strip()] # 2) склеиваем многострочные пункты: если строка не похожа на начало нового пункта — считаем продолжением chunks: List[str] = [] buf: List[str] = [] for ln in lines: is_new = bool(self.REQ_LINE_RE.match(ln)) or ln.lower().startswith(("shall", "must", "should", "system", "приложение", "система")) if is_new and buf: chunks.append(" ".join(buf).strip()) buf = [] buf.append(ln) if buf: chunks.append(" ".join(buf).strip()) # fallback: если получилось слишком мало — режем по точке с запятой/точкам с большими буквами if len(chunks) <= 1 and len(srs) > 300: parts = re.split(r"(?<=[.;])\s+(?=[A-ZА-ЯЁ])", srs) parts = [p.strip() for p in parts if p.strip()] if len(parts) > len(chunks): chunks = parts return chunks def classify(self, text: str) -> Tuple[str, List[str]]: t = text.lower() tags: List[str] = [] # очень грубая эвристика if any(k in t for k in ["ms", "сек", "seconds", "performance", "latency", "rps", "нагруз", "время отклика", "доступност", "sla"]): tags.append("performance") if any(k in t for k in ["security", "шифр", "oauth", "jwt", "роль", "доступ", "авторизац", "аутентификац", "pII".lower()]): tags.append("security") if any(k in t for k in ["log", "лог", "audit", "трасс", "наблюдаем", "метрик", "monitor", "алерт"]): tags.append("observability") if any(k in t for k in ["ui", "ux", "интерфейс", "экран", "форма", "кнопк"]): tags.append("ui") # classification if any(k in t for k in ["должен", "must", "shall", "обязан"]): # функциональные чаще про действия/кейсы if any(k in t for k in ["созда", "удаля", "обнов", "получ", "отправ", "импорт", "экспорт", "рассчит", "показ", "поиск", "фильтр"]): return "functional", tags # нефункциональные — про качество/ограничения if any(k in t for k in ["не менее", "не более", "ms", "сек", "доступност", "безопасн", "масштаб", "производит", "надежн", "совместим"]): return "non-functional", tags return "other", tags if any(k in t for k in ["предполагается", "assume", "assumption"]): return "assumption", tags if any(k in t for k in ["огранич", "constraint", "только", "запрещ", "не допускается"]): return "constraint", tags return "other", tags def normalize(self, text: str) -> str: # убираем маркеры типа "1) " "REQ-1 " "-" "*" m = self.REQ_LINE_RE.match(text) if m: text = m.group(1).strip() text = re.sub(r"\s+", " ", text).strip() # нормализация "Система должна" в единую форму, но без фанатизма text = re.sub(r"^(система|приложение)\s+", "Система ", text, flags=re.IGNORECASE) return text def run(self, srs: str) -> List[NormalizedRequirement]: segments = self.segment(srs) out: List[NormalizedRequirement] = [] for i, seg in enumerate(segments, start=1): norm = self.normalize(seg) cls, tags = self.classify(norm) out.append( NormalizedRequirement( req_id=f"REQ-{i:03d}", source_text=seg, normalized_text=norm, classification=cls, tags=tags, ) ) return out # ======================= # QualityAgent: rule-based + (упрощённые) INVERT / SMART / GQM # ======================= class QualityAgent: AMBIGUOUS_WORDS = [ "быстро", "удобно", "понятно", "желательно", "примерно", "как можно", "оптимально", "в разумные сроки", "и т.п.", "и т.д.", "достаточно", "при необходимости", "some", "several", "as needed", "etc", "quickly", "user-friendly", ] WEAK_MODALS = ["может", "could", "may", "желательно", "should"] STRONG_MODALS = ["должен", "must", "shall", "обязан"] UNVERIFIABLE = ["удобно", "интуитивно", "красиво", "понятно", "friendly", "intuitive"] NUMBER_RE = re.compile(r"(\d+(?:[.,]\d+)?)\s*(ms|сек|s|seconds|%|rps|rpm|gb|mb|kb|мин|min|hours|час|дней|day|days)\b", re.IGNORECASE) def _find_spans(self, text: str, needles: List[str]) -> List[str]: found: List[str] = [] t = text.lower() for w in needles: if w.lower() in t: found.append(w) return found def _defect(self, code: str, severity: str, message: str, spans: List[str]) -> Defect: return Defect(code=code, severity=severity, message=message, spans=spans) def evaluate(self, req: NormalizedRequirement) -> QualityReport: text = req.normalized_text.strip() t = text.lower() defects: List[Defect] = [] amb = self._find_spans(text, self.AMBIGUOUS_WORDS) if amb: defects.append(self._defect( code="AMBIGUOUS_WORDING", severity="high" if len(amb) >= 2 else "medium", message="Найдены неоднозначные/расплывчатые слова. Замените на измеримые критерии.", spans=amb, )) weak = self._find_spans(text, self.WEAK_MODALS) if weak: defects.append(self._defect( code="WEAK_MODAL", severity="medium", message="Используются слабые модальные слова (может/should). Требование может стать необязательным.", spans=weak, )) has_strong = any(m in t for m in self.STRONG_MODALS) if not has_strong and req.classification in ("functional", "non-functional", "constraint"): defects.append(self._defect( code="MISSING_NORMATIVE_MODAL", severity="medium", message="Нет явной нормативности (должен/must/shall). Уточните обязательность.", spans=[], )) # Проверяем измеримость (числа+единицы) для NFR/качества has_numbers = bool(self.NUMBER_RE.search(text)) if req.classification == "non-functional" and not has_numbers: defects.append(self._defect( code="MISSING_MEASURABLE_CRITERIA", severity="high", message="Для нефункционального требования не задан измеримый критерий (число + единица).", spans=[], )) unver = self._find_spans(text, self.UNVERIFIABLE) if unver: defects.append(self._defect( code="POOR_VERIFIABILITY", severity="high", message="Формулировка плохо проверяема (субъективные критерии).", spans=unver, )) # Простейшая проверка на “одна мысль — одно требование” if len(re.split(r"\b(и|and|или|or)\b", t)) > 7: defects.append(self._defect( code="MULTIPLE_REQUIREMENTS_IN_ONE", severity="medium", message="Похоже, в одном предложении смешано несколько требований. Разбейте на отдельные пункты.", spans=[], )) # --- Метрики (упрощённые) --- # ambiguity: 0..1 (1 = много неоднозначностей) ambiguity = min(1.0, 0.2 * len(amb) + (0.15 if weak else 0.0)) # specificity: штрафуем за отсутствие чисел в NFR и за расплывчатость specificity = 1.0 if req.classification == "non-functional" and not has_numbers: specificity -= 0.5 specificity -= 0.15 * len(amb) specificity = max(0.0, min(1.0, specificity)) # verifiability: штраф за субъективность и отсутствие критериев verifiability = 1.0 if unver: verifiability -= 0.6 if req.classification == "non-functional" and not has_numbers: verifiability -= 0.4 verifiability = max(0.0, min(1.0, verifiability)) # SMART (S,M,A,R,T) — приближение через специфичность + измеримость + проверяемость smart_score = max(0.0, min(1.0, (specificity * 0.45 + verifiability * 0.55))) # INVERT — приближение: отсутствие неоднозначности + “однозначная проверка” invert_score = max(0.0, min(1.0, (1.0 - ambiguity) * 0.55 + verifiability * 0.45)) # GQM — если есть (Goal/Question/Metric) хотя бы частично: наличие метрик/критериев и “что проверять” gqm_score = 0.3 if has_numbers: gqm_score += 0.4 if has_strong: gqm_score += 0.2 if not amb: gqm_score += 0.1 gqm_score = max(0.0, min(1.0, gqm_score)) overall = max(0.0, min(1.0, (invert_score + smart_score + gqm_score) / 3.0)) metrics = QualityMetrics( invert_score=round(invert_score, 3), smart_score=round(smart_score, 3), gqm_score=round(gqm_score, 3), verifiability=round(verifiability, 3), specificity=round(specificity, 3), ambiguity=round(ambiguity, 3), overall=round(overall, 3), ) return QualityReport(req_id=req.req_id, metrics=metrics, defects=defects) def run(self, req: NormalizedRequirement) -> QualityReport: return self.evaluate(req) # ======================= # RefactorAgent: LLM-рефакторинг требования по дефектам + целевым критериям # ======================= class RefactorAgent: def __init__(self, model: OllamaCloudModel): self.model = model def run(self, req: NormalizedRequirement, report: QualityReport) -> RefactorResult: defects = report.defects or [] defects_md = "\n".join( [f"- [{d.severity}] {d.code}: {d.message}" + (f" (спаны: {', '.join(d.spans)})" if d.spans else "") for d in defects] ) or "- (дефектов не найдено)" target_criteria = """ Целевые критерии улучшения: - SMART: конкретно, измеримо, достижимо, релевантно, ограничено по времени/условиям где уместно - INVERT (упрощённо): однозначность, проверяемость, атомарность, отсутствие расплывчатых слов - GQM: чтобы было понятно, как измерять/проверять (метрика/критерий приемки) """ prompt = f""" Ты — эксперт по требованиям (SRS). Улучши формулировку требования. Исходное требование (req_id={req.req_id}): {req.normalized_text} Классификация: {req.classification} Теги: {", ".join(req.tags) if req.tags else "-"} Выявленные дефекты: {defects_md} {target_criteria} Задача: 1) Верни улучшенную формулировку (одной строкой или несколькими, но без воды). 2) Затем коротко перечисли, какие дефекты исправлены. 3) Если требование нефункциональное и нет метрик — добавь измеримый критерий (число + единица), но НЕ выдумывай домен: используй нейтральные критерии (например, время отклика, точность, SLA) и пометь как "пример". 4) Не добавляй лишних требований, только уточняй существующее. Формат ответа (СТРОГО): IMPROVED: <текст> FIXED_DEFECTS: - <код1> - <код2> EXPLANATION: <1-3 предложения> """.strip() out = self.model.generate([{"role": "user", "content": prompt}]) improved, fixed, expl = self._parse_refactor(out, fallback=req.normalized_text, defects=defects) return RefactorResult( req_id=req.req_id, improved_text=improved, fixed_defects=fixed, explanation=expl, ) def _parse_refactor(self, text: str, fallback: str, defects: List[Defect]) -> Tuple[str, List[str], str]: improved = fallback fixed_codes = [d.code for d in defects] explanation = "Уточнена формулировка с учётом найденных дефектов." # грубый парсер по секциям m1 = re.search(r"IMPROVED:\s*(.+?)\n\s*FIXED_DEFECTS:", text, flags=re.S | re.I) if m1: improved = m1.group(1).strip() m2 = re.search(r"FIXED_DEFECTS:\s*(.+?)\n\s*EXPLANATION:", text, flags=re.S | re.I) if m2: block = m2.group(1).strip() codes = [] for ln in block.splitlines(): ln = ln.strip() if ln.startswith("-"): codes.append(ln[1:].strip()) if codes: fixed_codes = codes m3 = re.search(r"EXPLANATION:\s*(.+)\s*$", text, flags=re.S | re.I) if m3: explanation = m3.group(1).strip() return improved, fixed_codes, explanation # ======================= # PlantUML: простая валидация/минимальные правки + (опционально) LLM-подсказки # ======================= class PlantUMLAgent: START = "@startuml" END = "@enduml" def extract_blocks(self, text: str) -> List[str]: if not text: return [] blocks = [] pattern = re.compile(r"@startuml[\s\S]*?@enduml", re.IGNORECASE) for m in pattern.finditer(text): blocks.append(m.group(0).strip()) return blocks def basic_fix(self, block: str) -> Tuple[str, List[str]]: changes: List[str] = [] b = block.strip() if self.START not in b.lower(): b = f"@startuml\n{b}" changes.append("Добавлен @startuml в начале блока.") if self.END not in b.lower(): b = f"{b}\n@enduml" changes.append("Добавлен @enduml в конце блока.") # нормализуем переносы и табы b2 = b.replace("\t", " ") if b2 != b: b = b2 changes.append("Заменены табы на пробелы.") # типовые опечатки b2 = re.sub(r"@end\s*uml", "@enduml", b, flags=re.IGNORECASE) if b2 != b: b = b2 changes.append("Исправлено '@end uml' → '@enduml'.") return b.strip(), changes def run(self, plantuml_text: str) -> Tuple[str, List[str]]: blocks = self.extract_blocks(plantuml_text) if not blocks and (plantuml_text or "").strip(): # если пользователь вставил диаграмму без start/end — считаем это одним блоком blocks = [(plantuml_text or "").strip()] fixed_blocks: List[str] = [] changes: List[str] = [] for i, bl in enumerate(blocks, start=1): fixed, ch = self.basic_fix(bl) fixed_blocks.append(fixed) for c in ch: changes.append(f"Диаграмма #{i}: {c}") return ("\n\n".join(fixed_blocks)).strip(), changes # ======================= # Orchestrator: связывает агентов и собирает единый ответ # ======================= class Orchestrator: def __init__(self, model_id: str): self.model = OllamaCloudModel(model_id=model_id.strip() or "gpt-oss:120b") self.analysis = AnalysisAgent() self.quality = QualityAgent() self.refactor = RefactorAgent(self.model) self.plantuml = PlantUMLAgent() def run(self, srs: str, plantuml_optional: str) -> Dict[str, Any]: changes: List[str] = [] tips: List[str] = [] # 1) PlantUML (опционально) fixed_diagrams = "" diagram_changes: List[str] = [] if (plantuml_optional or "").strip(): fixed_diagrams, diagram_changes = self.plantuml.run(plantuml_optional) changes.extend(diagram_changes) # 2) Analysis normalized = self.analysis.run(srs) if not normalized: return { "final_md": "❗️Не нашёл требований в тексте SRS. Вставь хотя бы 1-2 пункта требований.", "normalized_json": "[]", "quality_json": "[]", "refactor_json": "[]", } # 3) Quality + Refactor quality_reports: List[QualityReport] = [] refactors: List[RefactorResult] = [] for req in normalized: qr = self.quality.run(req) quality_reports.append(qr) # если дефектов нет — можно не трогать; но обычно полезно унифицировать стиль rr = self.refactor.run(req, qr) if (qr.defects or []) else RefactorResult( req_id=req.req_id, improved_text=req.normalized_text, fixed_defects=[], explanation="Дефектов не выявлено, формулировка оставлена без изменений.", ) refactors.append(rr) # изменения по требованию if rr.improved_text.strip() != req.normalized_text.strip(): changes.append(f"{req.req_id}: улучшена формулировка требования.") if qr.defects: tips.append(f"{req.req_id}: проверь критерии приемки и измеримость (особенно если это NFR).") # 4) Сборка пользовательского ответа final_md = self._render_final(normalized, quality_reports, refactors, fixed_diagrams, changes, tips) return { "final_md": final_md, "normalized_json": json.dumps([asdict(x) for x in normalized], ensure_ascii=False, indent=2), "quality_json": json.dumps([self._asdict_quality(x) for x in quality_reports], ensure_ascii=False, indent=2), "refactor_json": json.dumps([asdict(x) for x in refactors], ensure_ascii=False, indent=2), } def _asdict_quality(self, qr: QualityReport) -> Dict[str, Any]: return { "req_id": qr.req_id, "metrics": asdict(qr.metrics), "defects": [asdict(d) for d in (qr.defects or [])], } def _render_final( self, normalized: List[NormalizedRequirement], quality: List[QualityReport], refactors: List[RefactorResult], fixed_diagrams: str, changes: List[str], tips: List[str], ) -> str: q_by_id = {q.req_id: q for q in quality} r_by_id = {r.req_id: r for r in refactors} md = [] md.append("# ✅ Исправленные требования и рекомендации\n") md.append("## 1) Итоговые требования (улучшенные формулировки)\n") for req in normalized: rr = r_by_id[req.req_id] qr = q_by_id[req.req_id] md.append(f"### {req.req_id} — {req.classification}\n") md.append(f"**Было:** {req.normalized_text}\n") md.append(f"**Стало:** {rr.improved_text}\n") md.append("**Метрики качества (0..1):**") md.append( f"- overall: {qr.metrics.overall} | SMART: {qr.metrics.smart_score} | INVERT: {qr.metrics.invert_score} | GQM: {qr.metrics.gqm_score}" ) if qr.defects: md.append("**Дефекты:**") for d in qr.defects: spans = f" (спаны: {', '.join(d.spans)})" if d.spans else "" md.append(f"- [{d.severity}] {d.code}: {d.message}{spans}") md.append("**Что исправлено:**") if rr.fixed_defects: for c in rr.fixed_defects: md.append(f"- {c}") else: md.append("- (ничего)") md.append(f"**Пояснение:** {rr.explanation}\n") if fixed_diagrams.strip(): md.append("## 2) Диаграммы (PlantUML) с исправлениями\n") md.append("```plantuml") md.append(fixed_diagrams.strip()) md.append("```\n") md.append("## 3) Список изменений\n") if changes: for c in changes: md.append(f"- {c}") else: md.append("- (изменений не зафиксировано)") md.append("\n## 4) Советы\n") uniq_tips = [] for t in tips: if t not in uniq_tips: uniq_tips.append(t) if uniq_tips: for t in uniq_tips[:20]: md.append(f"- {t}") else: md.append("- Требования выглядят неплохо. Если это финальная версия, добавь тестируемые критерии приемки там, где это важно.") md.append("\n---\n") md.append("_Примечание: метрики INVERT/SMART/GQM здесь реализованы в упрощённом виде (практическая эвристика), " "а улучшение текста делает LLM-агент по списку дефектов._") return "\n".join(md) # ======================= # Gradio UI # ======================= def gradio_run( srs_text: str, plantuml_text: str, model_id: str, ): orch = Orchestrator(model_id=model_id) res = orch.run(srs_text or "", plantuml_text or "") return res["final_md"], res["normalized_json"], res["quality_json"], res["refactor_json"] with gr.Blocks() as demo: gr.Markdown( """ # 🧩 Multi-Agent SRS Refiner (Orchestrator + Analysis + Quality + Refactor) Вставь SRS (требования) и (опционально) PlantUML — получишь: - улучшенные формулировки требований - метрики качества и дефекты - (опционально) исправленные PlantUML блоки - список изменений и советы """ ) with gr.Row(): with gr.Column(scale=2): srs_input = gr.Textbox( label="SRS / Требования (текст)", placeholder="Вставь требования списком (по строкам) или цельным текстом.", lines=14, ) plantuml_input = gr.Textbox( label="PlantUML (опционально)", placeholder="@startuml ... @enduml (можно несколько блоков)", lines=10, ) with gr.Column(scale=1): model_id_input = gr.Textbox( label="Ollama Cloud модель", value="gpt-oss:120b", placeholder="Например: gpt-oss:120b", ) run_btn = gr.Button("Запустить оркестратора") out_md = gr.Markdown(label="Результат") with gr.Accordion("Debug: Нормализованные требования (JSON)", open=False): out_norm = gr.Code(language="json") with gr.Accordion("Debug: Оценка качества (JSON)", open=False): out_quality = gr.Code(language="json") with gr.Accordion("Debug: Рефакторинг (JSON)", open=False): out_ref = gr.Code(language="json") run_btn.click( fn=gradio_run, inputs=[srs_input, plantuml_input, model_id_input], outputs=[out_md, out_norm, out_quality, out_ref], ) if __name__ == "__main__": demo.launch()