Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| from dataclasses import dataclass, asdict | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import gradio as gr | |
| # ======================= | |
| # .env + проверка OLLAMA_API_KEY (как в примере) | |
| # ======================= | |
| try: | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| except Exception: | |
| pass | |
| OLLAMA_API_KEY_ENV = "OLLAMA_API_KEY" | |
| OLLAMA_API_KEY = os.environ.get(OLLAMA_API_KEY_ENV) | |
| if not OLLAMA_API_KEY: | |
| raise RuntimeError( | |
| f"Переменная окружения {OLLAMA_API_KEY_ENV} не задана.\n" | |
| f"В Hugging Face Spaces её нужно добавить в Settings → Variables/Secrets." | |
| ) | |
| # ======================= | |
| # Ollama Cloud -> smolagents.Model adapter (упрощённо, но совместимо по идее с примером) | |
| # ======================= | |
| from ollama import Client | |
| class OllamaCloudModel: | |
| """ | |
| Мини-адаптер Ollama Cloud для генерации текста (не поток). | |
| """ | |
| def __init__( | |
| self, | |
| model_id: str = "gpt-oss:120b", | |
| host: str = "https://ollama.com", | |
| api_key_env: str = "OLLAMA_API_KEY", | |
| ): | |
| api_key = os.environ.get(api_key_env) | |
| if not api_key: | |
| raise ValueError(f"Не найден {api_key_env} в окружении") | |
| self.model_id = model_id | |
| self.client = Client(host=host, headers={"Authorization": f"Bearer {api_key}"}) | |
| def _to_text(self, content: Any) -> str: | |
| if content is None: | |
| return "" | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts = [] | |
| for p in content: | |
| if isinstance(p, dict): | |
| parts.append(p.get("text", "")) | |
| else: | |
| parts.append(str(p)) | |
| return "".join(parts) | |
| return str(content) | |
| def generate(self, messages: List[Dict[str, str]], stop_sequences: Optional[List[str]] = None) -> str: | |
| resp = self.client.chat(model=self.model_id, messages=messages, stream=False) | |
| text = self._to_text((resp.get("message") or {}).get("content", "")) | |
| if stop_sequences: | |
| for s in stop_sequences: | |
| if not s: | |
| continue | |
| idx = text.find(s) | |
| if idx != -1: | |
| text = text[:idx] | |
| break | |
| return text | |
| # ======================= | |
| # Модели данных | |
| # ======================= | |
| class NormalizedRequirement: | |
| req_id: str | |
| source_text: str | |
| normalized_text: str | |
| classification: str # functional / non-functional / constraint / assumption / other | |
| tags: List[str] | |
| class QualityMetrics: | |
| invert_score: float # 0..1 | |
| smart_score: float # 0..1 | |
| gqm_score: float # 0..1 | |
| verifiability: float # 0..1 | |
| specificity: float # 0..1 | |
| ambiguity: float # 0..1 (1 = очень неоднозначно) | |
| overall: float # 0..1 | |
| class Defect: | |
| code: str | |
| severity: str # low/medium/high | |
| message: str | |
| spans: List[str] | |
| class QualityReport: | |
| req_id: str | |
| metrics: QualityMetrics | |
| defects: List[Defect] | |
| class RefactorResult: | |
| req_id: str | |
| improved_text: str | |
| fixed_defects: List[str] | |
| explanation: str | |
| # ======================= | |
| # AnalysisAgent: сегментация + нормализация + базовая классификация | |
| # ======================= | |
| class AnalysisAgent: | |
| REQ_LINE_RE = re.compile(r"^\s*(?:REQ[-_ ]?\d+|R\d+|\d+[.)]|-|\*)\s+(.*)\s*$", re.IGNORECASE) | |
| def segment(self, srs: str) -> List[str]: | |
| srs = (srs or "").strip() | |
| if not srs: | |
| return [] | |
| # 1) если требования уже построчно/списком — берём строки | |
| lines = [ln.strip() for ln in srs.splitlines() if ln.strip()] | |
| # 2) склеиваем многострочные пункты: если строка не похожа на начало нового пункта — считаем продолжением | |
| chunks: List[str] = [] | |
| buf: List[str] = [] | |
| for ln in lines: | |
| is_new = bool(self.REQ_LINE_RE.match(ln)) or ln.lower().startswith(("shall", "must", "should", "system", "приложение", "система")) | |
| if is_new and buf: | |
| chunks.append(" ".join(buf).strip()) | |
| buf = [] | |
| buf.append(ln) | |
| if buf: | |
| chunks.append(" ".join(buf).strip()) | |
| # fallback: если получилось слишком мало — режем по точке с запятой/точкам с большими буквами | |
| if len(chunks) <= 1 and len(srs) > 300: | |
| parts = re.split(r"(?<=[.;])\s+(?=[A-ZА-ЯЁ])", srs) | |
| parts = [p.strip() for p in parts if p.strip()] | |
| if len(parts) > len(chunks): | |
| chunks = parts | |
| return chunks | |
| def classify(self, text: str) -> Tuple[str, List[str]]: | |
| t = text.lower() | |
| tags: List[str] = [] | |
| # очень грубая эвристика | |
| if any(k in t for k in ["ms", "сек", "seconds", "performance", "latency", "rps", "нагруз", "время отклика", "доступност", "sla"]): | |
| tags.append("performance") | |
| if any(k in t for k in ["security", "шифр", "oauth", "jwt", "роль", "доступ", "авторизац", "аутентификац", "pII".lower()]): | |
| tags.append("security") | |
| if any(k in t for k in ["log", "лог", "audit", "трасс", "наблюдаем", "метрик", "monitor", "алерт"]): | |
| tags.append("observability") | |
| if any(k in t for k in ["ui", "ux", "интерфейс", "экран", "форма", "кнопк"]): | |
| tags.append("ui") | |
| # classification | |
| if any(k in t for k in ["должен", "must", "shall", "обязан"]): | |
| # функциональные чаще про действия/кейсы | |
| if any(k in t for k in ["созда", "удаля", "обнов", "получ", "отправ", "импорт", "экспорт", "рассчит", "показ", "поиск", "фильтр"]): | |
| return "functional", tags | |
| # нефункциональные — про качество/ограничения | |
| if any(k in t for k in ["не менее", "не более", "ms", "сек", "доступност", "безопасн", "масштаб", "производит", "надежн", "совместим"]): | |
| return "non-functional", tags | |
| return "other", tags | |
| if any(k in t for k in ["предполагается", "assume", "assumption"]): | |
| return "assumption", tags | |
| if any(k in t for k in ["огранич", "constraint", "только", "запрещ", "не допускается"]): | |
| return "constraint", tags | |
| return "other", tags | |
| def normalize(self, text: str) -> str: | |
| # убираем маркеры типа "1) " "REQ-1 " "-" "*" | |
| m = self.REQ_LINE_RE.match(text) | |
| if m: | |
| text = m.group(1).strip() | |
| text = re.sub(r"\s+", " ", text).strip() | |
| # нормализация "Система должна" в единую форму, но без фанатизма | |
| text = re.sub(r"^(система|приложение)\s+", "Система ", text, flags=re.IGNORECASE) | |
| return text | |
| def run(self, srs: str) -> List[NormalizedRequirement]: | |
| segments = self.segment(srs) | |
| out: List[NormalizedRequirement] = [] | |
| for i, seg in enumerate(segments, start=1): | |
| norm = self.normalize(seg) | |
| cls, tags = self.classify(norm) | |
| out.append( | |
| NormalizedRequirement( | |
| req_id=f"REQ-{i:03d}", | |
| source_text=seg, | |
| normalized_text=norm, | |
| classification=cls, | |
| tags=tags, | |
| ) | |
| ) | |
| return out | |
| # ======================= | |
| # QualityAgent: rule-based + (упрощённые) INVERT / SMART / GQM | |
| # ======================= | |
| class QualityAgent: | |
| AMBIGUOUS_WORDS = [ | |
| "быстро", "удобно", "понятно", "желательно", "примерно", "как можно", "оптимально", | |
| "в разумные сроки", "и т.п.", "и т.д.", "достаточно", "при необходимости", | |
| "some", "several", "as needed", "etc", "quickly", "user-friendly", | |
| ] | |
| WEAK_MODALS = ["может", "could", "may", "желательно", "should"] | |
| STRONG_MODALS = ["должен", "must", "shall", "обязан"] | |
| UNVERIFIABLE = ["удобно", "интуитивно", "красиво", "понятно", "friendly", "intuitive"] | |
| NUMBER_RE = re.compile(r"(\d+(?:[.,]\d+)?)\s*(ms|сек|s|seconds|%|rps|rpm|gb|mb|kb|мин|min|hours|час|дней|day|days)\b", re.IGNORECASE) | |
| def _find_spans(self, text: str, needles: List[str]) -> List[str]: | |
| found: List[str] = [] | |
| t = text.lower() | |
| for w in needles: | |
| if w.lower() in t: | |
| found.append(w) | |
| return found | |
| def _defect(self, code: str, severity: str, message: str, spans: List[str]) -> Defect: | |
| return Defect(code=code, severity=severity, message=message, spans=spans) | |
| def evaluate(self, req: NormalizedRequirement) -> QualityReport: | |
| text = req.normalized_text.strip() | |
| t = text.lower() | |
| defects: List[Defect] = [] | |
| amb = self._find_spans(text, self.AMBIGUOUS_WORDS) | |
| if amb: | |
| defects.append(self._defect( | |
| code="AMBIGUOUS_WORDING", | |
| severity="high" if len(amb) >= 2 else "medium", | |
| message="Найдены неоднозначные/расплывчатые слова. Замените на измеримые критерии.", | |
| spans=amb, | |
| )) | |
| weak = self._find_spans(text, self.WEAK_MODALS) | |
| if weak: | |
| defects.append(self._defect( | |
| code="WEAK_MODAL", | |
| severity="medium", | |
| message="Используются слабые модальные слова (может/should). Требование может стать необязательным.", | |
| spans=weak, | |
| )) | |
| has_strong = any(m in t for m in self.STRONG_MODALS) | |
| if not has_strong and req.classification in ("functional", "non-functional", "constraint"): | |
| defects.append(self._defect( | |
| code="MISSING_NORMATIVE_MODAL", | |
| severity="medium", | |
| message="Нет явной нормативности (должен/must/shall). Уточните обязательность.", | |
| spans=[], | |
| )) | |
| # Проверяем измеримость (числа+единицы) для NFR/качества | |
| has_numbers = bool(self.NUMBER_RE.search(text)) | |
| if req.classification == "non-functional" and not has_numbers: | |
| defects.append(self._defect( | |
| code="MISSING_MEASURABLE_CRITERIA", | |
| severity="high", | |
| message="Для нефункционального требования не задан измеримый критерий (число + единица).", | |
| spans=[], | |
| )) | |
| unver = self._find_spans(text, self.UNVERIFIABLE) | |
| if unver: | |
| defects.append(self._defect( | |
| code="POOR_VERIFIABILITY", | |
| severity="high", | |
| message="Формулировка плохо проверяема (субъективные критерии).", | |
| spans=unver, | |
| )) | |
| # Простейшая проверка на “одна мысль — одно требование” | |
| if len(re.split(r"\b(и|and|или|or)\b", t)) > 7: | |
| defects.append(self._defect( | |
| code="MULTIPLE_REQUIREMENTS_IN_ONE", | |
| severity="medium", | |
| message="Похоже, в одном предложении смешано несколько требований. Разбейте на отдельные пункты.", | |
| spans=[], | |
| )) | |
| # --- Метрики (упрощённые) --- | |
| # ambiguity: 0..1 (1 = много неоднозначностей) | |
| ambiguity = min(1.0, 0.2 * len(amb) + (0.15 if weak else 0.0)) | |
| # specificity: штрафуем за отсутствие чисел в NFR и за расплывчатость | |
| specificity = 1.0 | |
| if req.classification == "non-functional" and not has_numbers: | |
| specificity -= 0.5 | |
| specificity -= 0.15 * len(amb) | |
| specificity = max(0.0, min(1.0, specificity)) | |
| # verifiability: штраф за субъективность и отсутствие критериев | |
| verifiability = 1.0 | |
| if unver: | |
| verifiability -= 0.6 | |
| if req.classification == "non-functional" and not has_numbers: | |
| verifiability -= 0.4 | |
| verifiability = max(0.0, min(1.0, verifiability)) | |
| # SMART (S,M,A,R,T) — приближение через специфичность + измеримость + проверяемость | |
| smart_score = max(0.0, min(1.0, (specificity * 0.45 + verifiability * 0.55))) | |
| # INVERT — приближение: отсутствие неоднозначности + “однозначная проверка” | |
| invert_score = max(0.0, min(1.0, (1.0 - ambiguity) * 0.55 + verifiability * 0.45)) | |
| # GQM — если есть (Goal/Question/Metric) хотя бы частично: наличие метрик/критериев и “что проверять” | |
| gqm_score = 0.3 | |
| if has_numbers: | |
| gqm_score += 0.4 | |
| if has_strong: | |
| gqm_score += 0.2 | |
| if not amb: | |
| gqm_score += 0.1 | |
| gqm_score = max(0.0, min(1.0, gqm_score)) | |
| overall = max(0.0, min(1.0, (invert_score + smart_score + gqm_score) / 3.0)) | |
| metrics = QualityMetrics( | |
| invert_score=round(invert_score, 3), | |
| smart_score=round(smart_score, 3), | |
| gqm_score=round(gqm_score, 3), | |
| verifiability=round(verifiability, 3), | |
| specificity=round(specificity, 3), | |
| ambiguity=round(ambiguity, 3), | |
| overall=round(overall, 3), | |
| ) | |
| return QualityReport(req_id=req.req_id, metrics=metrics, defects=defects) | |
| def run(self, req: NormalizedRequirement) -> QualityReport: | |
| return self.evaluate(req) | |
| # ======================= | |
| # RefactorAgent: LLM-рефакторинг требования по дефектам + целевым критериям | |
| # ======================= | |
| class RefactorAgent: | |
| def __init__(self, model: OllamaCloudModel): | |
| self.model = model | |
| def run(self, req: NormalizedRequirement, report: QualityReport) -> RefactorResult: | |
| defects = report.defects or [] | |
| defects_md = "\n".join( | |
| [f"- [{d.severity}] {d.code}: {d.message}" + (f" (спаны: {', '.join(d.spans)})" if d.spans else "") | |
| for d in defects] | |
| ) or "- (дефектов не найдено)" | |
| target_criteria = """ | |
| Целевые критерии улучшения: | |
| - SMART: конкретно, измеримо, достижимо, релевантно, ограничено по времени/условиям где уместно | |
| - INVERT (упрощённо): однозначность, проверяемость, атомарность, отсутствие расплывчатых слов | |
| - GQM: чтобы было понятно, как измерять/проверять (метрика/критерий приемки) | |
| """ | |
| prompt = f""" | |
| Ты — эксперт по требованиям (SRS). Улучши формулировку требования. | |
| Исходное требование (req_id={req.req_id}): | |
| {req.normalized_text} | |
| Классификация: {req.classification} | |
| Теги: {", ".join(req.tags) if req.tags else "-"} | |
| Выявленные дефекты: | |
| {defects_md} | |
| {target_criteria} | |
| Задача: | |
| 1) Верни улучшенную формулировку (одной строкой или несколькими, но без воды). | |
| 2) Затем коротко перечисли, какие дефекты исправлены. | |
| 3) Если требование нефункциональное и нет метрик — добавь измеримый критерий (число + единица), | |
| но НЕ выдумывай домен: используй нейтральные критерии (например, время отклика, точность, SLA) и пометь как "пример". | |
| 4) Не добавляй лишних требований, только уточняй существующее. | |
| Формат ответа (СТРОГО): | |
| IMPROVED: | |
| <текст> | |
| FIXED_DEFECTS: | |
| - <код1> | |
| - <код2> | |
| EXPLANATION: | |
| <1-3 предложения> | |
| """.strip() | |
| out = self.model.generate([{"role": "user", "content": prompt}]) | |
| improved, fixed, expl = self._parse_refactor(out, fallback=req.normalized_text, defects=defects) | |
| return RefactorResult( | |
| req_id=req.req_id, | |
| improved_text=improved, | |
| fixed_defects=fixed, | |
| explanation=expl, | |
| ) | |
| def _parse_refactor(self, text: str, fallback: str, defects: List[Defect]) -> Tuple[str, List[str], str]: | |
| improved = fallback | |
| fixed_codes = [d.code for d in defects] | |
| explanation = "Уточнена формулировка с учётом найденных дефектов." | |
| # грубый парсер по секциям | |
| m1 = re.search(r"IMPROVED:\s*(.+?)\n\s*FIXED_DEFECTS:", text, flags=re.S | re.I) | |
| if m1: | |
| improved = m1.group(1).strip() | |
| m2 = re.search(r"FIXED_DEFECTS:\s*(.+?)\n\s*EXPLANATION:", text, flags=re.S | re.I) | |
| if m2: | |
| block = m2.group(1).strip() | |
| codes = [] | |
| for ln in block.splitlines(): | |
| ln = ln.strip() | |
| if ln.startswith("-"): | |
| codes.append(ln[1:].strip()) | |
| if codes: | |
| fixed_codes = codes | |
| m3 = re.search(r"EXPLANATION:\s*(.+)\s*$", text, flags=re.S | re.I) | |
| if m3: | |
| explanation = m3.group(1).strip() | |
| return improved, fixed_codes, explanation | |
| # ======================= | |
| # PlantUML: простая валидация/минимальные правки + (опционально) LLM-подсказки | |
| # ======================= | |
| class PlantUMLAgent: | |
| START = "@startuml" | |
| END = "@enduml" | |
| def extract_blocks(self, text: str) -> List[str]: | |
| if not text: | |
| return [] | |
| blocks = [] | |
| pattern = re.compile(r"@startuml[\s\S]*?@enduml", re.IGNORECASE) | |
| for m in pattern.finditer(text): | |
| blocks.append(m.group(0).strip()) | |
| return blocks | |
| def basic_fix(self, block: str) -> Tuple[str, List[str]]: | |
| changes: List[str] = [] | |
| b = block.strip() | |
| if self.START not in b.lower(): | |
| b = f"@startuml\n{b}" | |
| changes.append("Добавлен @startuml в начале блока.") | |
| if self.END not in b.lower(): | |
| b = f"{b}\n@enduml" | |
| changes.append("Добавлен @enduml в конце блока.") | |
| # нормализуем переносы и табы | |
| b2 = b.replace("\t", " ") | |
| if b2 != b: | |
| b = b2 | |
| changes.append("Заменены табы на пробелы.") | |
| # типовые опечатки | |
| b2 = re.sub(r"@end\s*uml", "@enduml", b, flags=re.IGNORECASE) | |
| if b2 != b: | |
| b = b2 | |
| changes.append("Исправлено '@end uml' → '@enduml'.") | |
| return b.strip(), changes | |
| def run(self, plantuml_text: str) -> Tuple[str, List[str]]: | |
| blocks = self.extract_blocks(plantuml_text) | |
| if not blocks and (plantuml_text or "").strip(): | |
| # если пользователь вставил диаграмму без start/end — считаем это одним блоком | |
| blocks = [(plantuml_text or "").strip()] | |
| fixed_blocks: List[str] = [] | |
| changes: List[str] = [] | |
| for i, bl in enumerate(blocks, start=1): | |
| fixed, ch = self.basic_fix(bl) | |
| fixed_blocks.append(fixed) | |
| for c in ch: | |
| changes.append(f"Диаграмма #{i}: {c}") | |
| return ("\n\n".join(fixed_blocks)).strip(), changes | |
| # ======================= | |
| # Orchestrator: связывает агентов и собирает единый ответ | |
| # ======================= | |
| class Orchestrator: | |
| def __init__(self, model_id: str): | |
| self.model = OllamaCloudModel(model_id=model_id.strip() or "gpt-oss:120b") | |
| self.analysis = AnalysisAgent() | |
| self.quality = QualityAgent() | |
| self.refactor = RefactorAgent(self.model) | |
| self.plantuml = PlantUMLAgent() | |
| def run(self, srs: str, plantuml_optional: str) -> Dict[str, Any]: | |
| changes: List[str] = [] | |
| tips: List[str] = [] | |
| # 1) PlantUML (опционально) | |
| fixed_diagrams = "" | |
| diagram_changes: List[str] = [] | |
| if (plantuml_optional or "").strip(): | |
| fixed_diagrams, diagram_changes = self.plantuml.run(plantuml_optional) | |
| changes.extend(diagram_changes) | |
| # 2) Analysis | |
| normalized = self.analysis.run(srs) | |
| if not normalized: | |
| return { | |
| "final_md": "❗️Не нашёл требований в тексте SRS. Вставь хотя бы 1-2 пункта требований.", | |
| "normalized_json": "[]", | |
| "quality_json": "[]", | |
| "refactor_json": "[]", | |
| } | |
| # 3) Quality + Refactor | |
| quality_reports: List[QualityReport] = [] | |
| refactors: List[RefactorResult] = [] | |
| for req in normalized: | |
| qr = self.quality.run(req) | |
| quality_reports.append(qr) | |
| # если дефектов нет — можно не трогать; но обычно полезно унифицировать стиль | |
| rr = self.refactor.run(req, qr) if (qr.defects or []) else RefactorResult( | |
| req_id=req.req_id, | |
| improved_text=req.normalized_text, | |
| fixed_defects=[], | |
| explanation="Дефектов не выявлено, формулировка оставлена без изменений.", | |
| ) | |
| refactors.append(rr) | |
| # изменения по требованию | |
| if rr.improved_text.strip() != req.normalized_text.strip(): | |
| changes.append(f"{req.req_id}: улучшена формулировка требования.") | |
| if qr.defects: | |
| tips.append(f"{req.req_id}: проверь критерии приемки и измеримость (особенно если это NFR).") | |
| # 4) Сборка пользовательского ответа | |
| final_md = self._render_final(normalized, quality_reports, refactors, fixed_diagrams, changes, tips) | |
| return { | |
| "final_md": final_md, | |
| "normalized_json": json.dumps([asdict(x) for x in normalized], ensure_ascii=False, indent=2), | |
| "quality_json": json.dumps([self._asdict_quality(x) for x in quality_reports], ensure_ascii=False, indent=2), | |
| "refactor_json": json.dumps([asdict(x) for x in refactors], ensure_ascii=False, indent=2), | |
| } | |
| def _asdict_quality(self, qr: QualityReport) -> Dict[str, Any]: | |
| return { | |
| "req_id": qr.req_id, | |
| "metrics": asdict(qr.metrics), | |
| "defects": [asdict(d) for d in (qr.defects or [])], | |
| } | |
| def _render_final( | |
| self, | |
| normalized: List[NormalizedRequirement], | |
| quality: List[QualityReport], | |
| refactors: List[RefactorResult], | |
| fixed_diagrams: str, | |
| changes: List[str], | |
| tips: List[str], | |
| ) -> str: | |
| q_by_id = {q.req_id: q for q in quality} | |
| r_by_id = {r.req_id: r for r in refactors} | |
| md = [] | |
| md.append("# ✅ Исправленные требования и рекомендации\n") | |
| md.append("## 1) Итоговые требования (улучшенные формулировки)\n") | |
| for req in normalized: | |
| rr = r_by_id[req.req_id] | |
| qr = q_by_id[req.req_id] | |
| md.append(f"### {req.req_id} — {req.classification}\n") | |
| md.append(f"**Было:** {req.normalized_text}\n") | |
| md.append(f"**Стало:** {rr.improved_text}\n") | |
| md.append("**Метрики качества (0..1):**") | |
| md.append( | |
| f"- overall: {qr.metrics.overall} | SMART: {qr.metrics.smart_score} | INVERT: {qr.metrics.invert_score} | GQM: {qr.metrics.gqm_score}" | |
| ) | |
| if qr.defects: | |
| md.append("**Дефекты:**") | |
| for d in qr.defects: | |
| spans = f" (спаны: {', '.join(d.spans)})" if d.spans else "" | |
| md.append(f"- [{d.severity}] {d.code}: {d.message}{spans}") | |
| md.append("**Что исправлено:**") | |
| if rr.fixed_defects: | |
| for c in rr.fixed_defects: | |
| md.append(f"- {c}") | |
| else: | |
| md.append("- (ничего)") | |
| md.append(f"**Пояснение:** {rr.explanation}\n") | |
| if fixed_diagrams.strip(): | |
| md.append("## 2) Диаграммы (PlantUML) с исправлениями\n") | |
| md.append("```plantuml") | |
| md.append(fixed_diagrams.strip()) | |
| md.append("```\n") | |
| md.append("## 3) Список изменений\n") | |
| if changes: | |
| for c in changes: | |
| md.append(f"- {c}") | |
| else: | |
| md.append("- (изменений не зафиксировано)") | |
| md.append("\n## 4) Советы\n") | |
| uniq_tips = [] | |
| for t in tips: | |
| if t not in uniq_tips: | |
| uniq_tips.append(t) | |
| if uniq_tips: | |
| for t in uniq_tips[:20]: | |
| md.append(f"- {t}") | |
| else: | |
| md.append("- Требования выглядят неплохо. Если это финальная версия, добавь тестируемые критерии приемки там, где это важно.") | |
| md.append("\n---\n") | |
| md.append("_Примечание: метрики INVERT/SMART/GQM здесь реализованы в упрощённом виде (практическая эвристика), " | |
| "а улучшение текста делает LLM-агент по списку дефектов._") | |
| return "\n".join(md) | |
| # ======================= | |
| # Gradio UI | |
| # ======================= | |
| def gradio_run( | |
| srs_text: str, | |
| plantuml_text: str, | |
| model_id: str, | |
| ): | |
| orch = Orchestrator(model_id=model_id) | |
| res = orch.run(srs_text or "", plantuml_text or "") | |
| return res["final_md"], res["normalized_json"], res["quality_json"], res["refactor_json"] | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| """ | |
| # 🧩 Multi-Agent SRS Refiner (Orchestrator + Analysis + Quality + Refactor) | |
| Вставь SRS (требования) и (опционально) PlantUML — получишь: | |
| - улучшенные формулировки требований | |
| - метрики качества и дефекты | |
| - (опционально) исправленные PlantUML блоки | |
| - список изменений и советы | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| srs_input = gr.Textbox( | |
| label="SRS / Требования (текст)", | |
| placeholder="Вставь требования списком (по строкам) или цельным текстом.", | |
| lines=14, | |
| ) | |
| plantuml_input = gr.Textbox( | |
| label="PlantUML (опционально)", | |
| placeholder="@startuml ... @enduml (можно несколько блоков)", | |
| lines=10, | |
| ) | |
| with gr.Column(scale=1): | |
| model_id_input = gr.Textbox( | |
| label="Ollama Cloud модель", | |
| value="gpt-oss:120b", | |
| placeholder="Например: gpt-oss:120b", | |
| ) | |
| run_btn = gr.Button("Запустить оркестратора") | |
| out_md = gr.Markdown(label="Результат") | |
| with gr.Accordion("Debug: Нормализованные требования (JSON)", open=False): | |
| out_norm = gr.Code(language="json") | |
| with gr.Accordion("Debug: Оценка качества (JSON)", open=False): | |
| out_quality = gr.Code(language="json") | |
| with gr.Accordion("Debug: Рефакторинг (JSON)", open=False): | |
| out_ref = gr.Code(language="json") | |
| run_btn.click( | |
| fn=gradio_run, | |
| inputs=[srs_input, plantuml_input, model_id_input], | |
| outputs=[out_md, out_norm, out_quality, out_ref], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |