Spaces:
Sleeping
Sleeping
File size: 3,343 Bytes
16b7df8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | from __future__ import annotations
import asyncio
import json
import logging
from typing import Union
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from graph_store.config import settings
from graph_store.models import ExtractionResult
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = """You are an entity and relationship extractor for a technical knowledge graph.
Given a chunk of technical documentation, extract entities and relationships.
Entity types to extract:
- Service: microservices, APIs, backend servers, internal platforms
- Library: packages, dependencies, SDKs, frameworks — include version if mentioned
- Incident: incident IDs, outage references, post-mortem subjects
- Team: team names, squad names, group names
Relationship types to extract (use exact rel_type strings):
- (Chunk)-[MENTIONS]->(Service)
- (Chunk)-[REFERENCES]->(Library)
- (Service)-[DEPENDS_ON]->(Library)
- (Service)-[OWNED_BY]->(Team)
- (Incident)-[CAUSED_BY]->(Service)
- (Incident)-[OWNED_BY]->(Team)
Rules:
- Only extract entities EXPLICITLY named in the text. Never infer or hallucinate.
- For libraries: look for import statements, package names, version numbers, deprecation notices.
- Return empty lists if nothing is found — this is correct and expected.
- from_name and to_name must match the name of an entity you extracted above.
Return ONLY valid JSON. No preamble. No markdown code fences.
Schema:
{
"entities": [
{"label": "Service|Library|Incident|Team", "name": "...", "version": null, "properties": {}}
],
"relationships": [
{"from_label": "...", "from_name": "...", "rel_type": "...", "to_label": "...", "to_name": "..."}
]
}"""
_EMPTY_RESULT = ExtractionResult()
def _make_llm() -> ChatGoogleGenerativeAI:
return ChatGoogleGenerativeAI(
model=settings.graph_extraction_model,
google_api_key=settings.google_api_key,
temperature=0.0,
)
async def _extract_one(llm: ChatGoogleGenerativeAI, text: str) -> ExtractionResult:
messages = [SystemMessage(content=_SYSTEM_PROMPT), HumanMessage(content=f"Text:\n{text}")]
try:
response = await llm.ainvoke(messages)
raw = response.content.strip()
if raw.startswith("```"):
raw = raw.split("\n", 1)[-1]
if raw.endswith("```"):
raw = raw[: raw.rfind("```")]
data = json.loads(raw)
return ExtractionResult(**data)
except Exception:
logger.exception("extractor: failed to parse Gemini response for chunk — returning empty")
return _EMPTY_RESULT
async def extract_batch(texts: list[str]) -> list[ExtractionResult]:
if not texts:
return []
llm = _make_llm()
batch_size = settings.graph_extraction_batch_size
results: list[ExtractionResult] = []
for start in range(0, len(texts), batch_size):
batch = texts[start : start + batch_size]
batch_results = await asyncio.gather(
*[_extract_one(llm, text) for text in batch],
return_exceptions=False,
)
results.extend(batch_results)
logger.debug(
"extractor: processed batch %d-%d of %d",
start,
start + len(batch),
len(texts),
)
return results
|