Spaces:
Sleeping
Sleeping
| """ | |
| Template pipeline that runs a simple RAG -> router -> agent flow and streams | |
| text back in the shape expected by Gradio_Events.submit. | |
| """ | |
| import os | |
| from dataclasses import dataclass | |
| from typing import Iterator, Iterable, List, Dict, Any | |
| from openai import OpenAI | |
| # ---- Streaming message shape expected by app.py ---- | |
| class DeltaMessage: | |
| content: str | None = None | |
| reasoning_content: str | None = None # leave None when you only stream text | |
| class Choice: | |
| message: DeltaMessage | |
| class Output: | |
| choices: list[Choice] | |
| class Chunk: | |
| output: Output | |
| # ---- Example RAG / Router / Agent stubs ---- | |
| class RAGModel: | |
| """Handles retrieval + requirement extraction.""" | |
| def __init__(self, retriever, llm): | |
| self.retriever = retriever | |
| self.llm = llm | |
| def extract_requirements(self, query: str) -> dict: | |
| docs = self.retriever.search(query) | |
| # Replace with your own synthesis and compliance assessment. | |
| requirements = self.llm.summarize(query=query, docs=docs) | |
| compliant = self._assess_compliance(requirements) | |
| return { | |
| "requirements": requirements, | |
| "compliant": compliant, | |
| } | |
| def _assess_compliance(self, requirements: str) -> bool: | |
| text = requirements.lower() | |
| non_compliant_markers = [ | |
| "gap", | |
| "missing", | |
| "non-compliant", | |
| "not compliant", | |
| "fail", | |
| ] | |
| return not any(marker in text for marker in non_compliant_markers) | |
| class Router: | |
| """Chooses a target pipeline for the extracted requirements.""" | |
| def route(self, *, compliant: bool, requirements: str, user_query: str = "") -> str: | |
| """ | |
| Route based on explicit intent first, otherwise by compliance status. | |
| - If the user asks for a Jira ticket (keywords like "jira", "ticket", | |
| "issue", "bug", "story", "epic"), route to Jira. | |
| - Else: non-compliant -> Jira; compliant -> matrix. | |
| """ | |
| intent = user_query.lower() | |
| wants_jira = any( | |
| keyword in intent | |
| for keyword in ("jira", "ticket", "issue", "bug", "story", "epic") | |
| ) | |
| if wants_jira: | |
| return "jira" | |
| return "matrix" if compliant else "jira" | |
| class JiraAgent: | |
| """Generates Jira ticket content using a Qwen model on OpenRouter and streams text.""" | |
| def __init__(self, | |
| model: str = "qwen/qwen3-vl-8b-instruct", #"qwen/qwen3-4b:free", | |
| api_key: str | None = None): | |
| resolved_key = api_key or os.getenv("OPENROUTER_API_KEY") \ | |
| or os.getenv("OPENAI_API_KEY") | |
| if not resolved_key: | |
| raise ValueError( | |
| "Missing OpenRouter API key: set OPENROUTER_API_KEY (preferred) " | |
| "or OPENAI_API_KEY in the environment, or pass api_key to JiraAgent" | |
| ) | |
| self.model = model | |
| self.client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=resolved_key, | |
| ) | |
| def stream(self, requirements: str) -> Iterable[str]: | |
| system_prompt = ( | |
| "You are a Jira assistant. Respond ONLY with valid JSON (no markdown) in this exact shape:\n" | |
| '{\n' | |
| ' "fields": {\n' | |
| ' "project": { "key": "SERVICEDESK" },\n' | |
| ' "summary": "User-submitted issue summary",\n' | |
| ' "description": "Detailed problem description",\n' | |
| ' "issuetype": { "name": "Incident" },\n' | |
| ' "reporter": { "name": "username@email.com" },\n' | |
| ' "priority": { "name": "High" },\n' | |
| ' "labels": ["mobile-app", "user-submitted"],\n' | |
| ' "customfield_10002": { "value": "Customer" }\n' | |
| ' }\n' | |
| '}\n' | |
| "Do not add, remove, or rename keys. Fill the values using the user's request. Keep it to valid JSON only.") | |
| user_prompt = ( | |
| "Create a Jira ticket for these requirements:\n" | |
| f"{requirements}") | |
| stream = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{ | |
| "role": "system", | |
| "content": system_prompt | |
| }, { | |
| "role": "user", | |
| "content": user_prompt | |
| }], | |
| max_tokens=512, | |
| temperature=0.3, | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| delta = chunk.choices[0].delta | |
| if delta and delta.content: | |
| # Yield raw text increments so the frontend can stream. | |
| yield delta.content | |
| class ComplianceMatrixAgent: | |
| """Creates a compliance matrix CSV using a Qwen model on OpenRouter and streams CSV text.""" | |
| def __init__(self, | |
| model: str = "qwen/qwen3-vl-8b-instruct",#"qwen/qwen3-4b:free", | |
| api_key: str | None = None): | |
| resolved_key = api_key or os.getenv("OPENROUTER_API_KEY") \ | |
| or os.getenv("OPENAI_API_KEY") | |
| if not resolved_key: | |
| raise ValueError( | |
| "Missing OpenRouter API key: set OPENROUTER_API_KEY (preferred) " | |
| "or OPENAI_API_KEY in the environment, or pass api_key to ComplianceMatrixAgent" | |
| ) | |
| self.model = model | |
| self.client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=resolved_key, | |
| ) | |
| def stream(self, requirements: str) -> Iterable[str]: | |
| system_prompt = ( | |
| "You are a compliance analyst. Produce ONLY a markdown table (no code fences, no prose) " | |
| "with these exact headers:\n" | |
| "| Requirement ID | Requirement Text | Source (Spec/Std) | Verification Method | Evidence / Link | Status | Notes |\n" | |
| "Use realistic IDs (e.g., SYS-REQ-001…), short requirement text, cite a plausible source, " | |
| "choose a verification method (Test, Inspection, Analysis, or combo), provide a concise evidence/link label, " | |
| "set Status to one of: Compliant, Partially compliant, Non-compliant, or Pending, and keep Notes brief.") | |
| user_prompt = ( | |
| "Create a compliance matrix CSV for these requirements:\n" | |
| f"{requirements}") | |
| stream = self.client.chat.completions.create( | |
| model=self.model, | |
| messages=[{ | |
| "role": "system", | |
| "content": system_prompt | |
| }, { | |
| "role": "user", | |
| "content": user_prompt | |
| }], | |
| max_tokens=512, | |
| temperature=0.3, | |
| stream=True, | |
| ) | |
| for chunk in stream: | |
| delta = chunk.choices[0].delta | |
| if delta and delta.content: | |
| # Yield CSV text increments so the frontend can stream. | |
| yield delta.content | |
| # ---- Pipeline wrapper ---- | |
| class RequirementsPipeline: | |
| """ | |
| Wraps RAG -> router -> agent into a streaming interface compatible with | |
| Gradio_Events.submit. | |
| """ | |
| def __init__(self, rag_model: RAGModel, router: Router, | |
| jira_agent: JiraAgent, matrix_agent: ComplianceMatrixAgent): | |
| self.rag_model = rag_model | |
| self.router = router | |
| self.agents = { | |
| "jira": jira_agent, | |
| "matrix": matrix_agent, | |
| } | |
| def _extract_user_query(self, messages: List[Dict[str, Any]]) -> str: | |
| # Grab the last user message; adjust if you need a different strategy. | |
| for message in reversed(messages): | |
| if message.get("role") == "user": | |
| return message.get("content", "") | |
| return "" | |
| def stream(self, *, messages: list[dict]) -> Iterator[Chunk]: | |
| """Run RAG -> route -> agent and stream tokens as Chunk objects.""" | |
| query = self._extract_user_query(messages) | |
| extraction = self.rag_model.extract_requirements(query) | |
| requirements = extraction["requirements"] | |
| compliant = extraction["compliant"] | |
| target = self.router.route( | |
| compliant=compliant, | |
| requirements=requirements, | |
| user_query=query, | |
| ) | |
| agent = self.agents.get(target) | |
| if not agent: | |
| raise ValueError(f"No agent configured for route '{target}'") | |
| # Console visibility for debugging which agent is used. | |
| print(f"[pipeline] routing to '{target}' (compliant={compliant})") | |
| # Each agent streams plain text; front end accumulates it. | |
| for token in agent.stream(requirements=requirements): | |
| yield Chunk( | |
| output=Output( | |
| choices=[ | |
| Choice( | |
| message=DeltaMessage( | |
| content=token, | |
| reasoning_content=None, | |
| )) | |
| ])) | |
| def run(self, *, messages: list[dict]) -> str: | |
| """Non-streaming helper that collects the full text response.""" | |
| parts: list[str] = [] | |
| for chunk in self.stream(messages=messages): | |
| parts.append(chunk.output.choices[0].message.content or "") | |
| return "".join(parts) | |