Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| from typing import Union | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from graph_store.config import settings | |
| from graph_store.models import ExtractionResult | |
| logger = logging.getLogger(__name__) | |
| _SYSTEM_PROMPT = """You are an entity and relationship extractor for a technical knowledge graph. | |
| Given a chunk of technical documentation, extract entities and relationships. | |
| Entity types to extract: | |
| - Service: microservices, APIs, backend servers, internal platforms | |
| - Library: packages, dependencies, SDKs, frameworks β include version if mentioned | |
| - Incident: incident IDs, outage references, post-mortem subjects | |
| - Team: team names, squad names, group names | |
| Relationship types to extract (use exact rel_type strings): | |
| - (Chunk)-[MENTIONS]->(Service) | |
| - (Chunk)-[REFERENCES]->(Library) | |
| - (Service)-[DEPENDS_ON]->(Library) | |
| - (Service)-[OWNED_BY]->(Team) | |
| - (Incident)-[CAUSED_BY]->(Service) | |
| - (Incident)-[OWNED_BY]->(Team) | |
| Rules: | |
| - Only extract entities EXPLICITLY named in the text. Never infer or hallucinate. | |
| - For libraries: look for import statements, package names, version numbers, deprecation notices. | |
| - Return empty lists if nothing is found β this is correct and expected. | |
| - from_name and to_name must match the name of an entity you extracted above. | |
| Return ONLY valid JSON. No preamble. No markdown code fences. | |
| Schema: | |
| { | |
| "entities": [ | |
| {"label": "Service|Library|Incident|Team", "name": "...", "version": null, "properties": {}} | |
| ], | |
| "relationships": [ | |
| {"from_label": "...", "from_name": "...", "rel_type": "...", "to_label": "...", "to_name": "..."} | |
| ] | |
| }""" | |
| _EMPTY_RESULT = ExtractionResult() | |
| def _make_llm() -> ChatGoogleGenerativeAI: | |
| return ChatGoogleGenerativeAI( | |
| model=settings.graph_extraction_model, | |
| google_api_key=settings.google_api_key, | |
| temperature=0.0, | |
| ) | |
| async def _extract_one(llm: ChatGoogleGenerativeAI, text: str) -> ExtractionResult: | |
| messages = [SystemMessage(content=_SYSTEM_PROMPT), HumanMessage(content=f"Text:\n{text}")] | |
| try: | |
| response = await llm.ainvoke(messages) | |
| raw = response.content.strip() | |
| if raw.startswith("```"): | |
| raw = raw.split("\n", 1)[-1] | |
| if raw.endswith("```"): | |
| raw = raw[: raw.rfind("```")] | |
| data = json.loads(raw) | |
| return ExtractionResult(**data) | |
| except Exception: | |
| logger.exception("extractor: failed to parse Gemini response for chunk β returning empty") | |
| return _EMPTY_RESULT | |
| async def extract_batch(texts: list[str]) -> list[ExtractionResult]: | |
| if not texts: | |
| return [] | |
| llm = _make_llm() | |
| batch_size = settings.graph_extraction_batch_size | |
| results: list[ExtractionResult] = [] | |
| for start in range(0, len(texts), batch_size): | |
| batch = texts[start : start + batch_size] | |
| batch_results = await asyncio.gather( | |
| *[_extract_one(llm, text) for text in batch], | |
| return_exceptions=False, | |
| ) | |
| results.extend(batch_results) | |
| logger.debug( | |
| "extractor: processed batch %d-%d of %d", | |
| start, | |
| start + len(batch), | |
| len(texts), | |
| ) | |
| return results | |