GodSpeed / graph_store /extractor.py
AdithyaVardan's picture
Fix startup imports and add graph_store module
16b7df8
from __future__ import annotations
import asyncio
import json
import logging
from typing import Union
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from graph_store.config import settings
from graph_store.models import ExtractionResult
logger = logging.getLogger(__name__)
_SYSTEM_PROMPT = """You are an entity and relationship extractor for a technical knowledge graph.
Given a chunk of technical documentation, extract entities and relationships.
Entity types to extract:
- Service: microservices, APIs, backend servers, internal platforms
- Library: packages, dependencies, SDKs, frameworks β€” include version if mentioned
- Incident: incident IDs, outage references, post-mortem subjects
- Team: team names, squad names, group names
Relationship types to extract (use exact rel_type strings):
- (Chunk)-[MENTIONS]->(Service)
- (Chunk)-[REFERENCES]->(Library)
- (Service)-[DEPENDS_ON]->(Library)
- (Service)-[OWNED_BY]->(Team)
- (Incident)-[CAUSED_BY]->(Service)
- (Incident)-[OWNED_BY]->(Team)
Rules:
- Only extract entities EXPLICITLY named in the text. Never infer or hallucinate.
- For libraries: look for import statements, package names, version numbers, deprecation notices.
- Return empty lists if nothing is found β€” this is correct and expected.
- from_name and to_name must match the name of an entity you extracted above.
Return ONLY valid JSON. No preamble. No markdown code fences.
Schema:
{
"entities": [
{"label": "Service|Library|Incident|Team", "name": "...", "version": null, "properties": {}}
],
"relationships": [
{"from_label": "...", "from_name": "...", "rel_type": "...", "to_label": "...", "to_name": "..."}
]
}"""
_EMPTY_RESULT = ExtractionResult()
def _make_llm() -> ChatGoogleGenerativeAI:
return ChatGoogleGenerativeAI(
model=settings.graph_extraction_model,
google_api_key=settings.google_api_key,
temperature=0.0,
)
async def _extract_one(llm: ChatGoogleGenerativeAI, text: str) -> ExtractionResult:
messages = [SystemMessage(content=_SYSTEM_PROMPT), HumanMessage(content=f"Text:\n{text}")]
try:
response = await llm.ainvoke(messages)
raw = response.content.strip()
if raw.startswith("```"):
raw = raw.split("\n", 1)[-1]
if raw.endswith("```"):
raw = raw[: raw.rfind("```")]
data = json.loads(raw)
return ExtractionResult(**data)
except Exception:
logger.exception("extractor: failed to parse Gemini response for chunk β€” returning empty")
return _EMPTY_RESULT
async def extract_batch(texts: list[str]) -> list[ExtractionResult]:
if not texts:
return []
llm = _make_llm()
batch_size = settings.graph_extraction_batch_size
results: list[ExtractionResult] = []
for start in range(0, len(texts), batch_size):
batch = texts[start : start + batch_size]
batch_results = await asyncio.gather(
*[_extract_one(llm, text) for text in batch],
return_exceptions=False,
)
results.extend(batch_results)
logger.debug(
"extractor: processed batch %d-%d of %d",
start,
start + len(batch),
len(texts),
)
return results