File size: 3,343 Bytes
16b7df8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from __future__ import annotations

import asyncio
import json
import logging
from typing import Union

from langchain_core.messages import HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI

from graph_store.config import settings
from graph_store.models import ExtractionResult

logger = logging.getLogger(__name__)

_SYSTEM_PROMPT = """You are an entity and relationship extractor for a technical knowledge graph.

Given a chunk of technical documentation, extract entities and relationships.

Entity types to extract:
- Service: microservices, APIs, backend servers, internal platforms
- Library: packages, dependencies, SDKs, frameworks — include version if mentioned
- Incident: incident IDs, outage references, post-mortem subjects
- Team: team names, squad names, group names

Relationship types to extract (use exact rel_type strings):
- (Chunk)-[MENTIONS]->(Service)
- (Chunk)-[REFERENCES]->(Library)
- (Service)-[DEPENDS_ON]->(Library)
- (Service)-[OWNED_BY]->(Team)
- (Incident)-[CAUSED_BY]->(Service)
- (Incident)-[OWNED_BY]->(Team)

Rules:
- Only extract entities EXPLICITLY named in the text. Never infer or hallucinate.
- For libraries: look for import statements, package names, version numbers, deprecation notices.
- Return empty lists if nothing is found — this is correct and expected.
- from_name and to_name must match the name of an entity you extracted above.

Return ONLY valid JSON. No preamble. No markdown code fences.

Schema:
{
  "entities": [
    {"label": "Service|Library|Incident|Team", "name": "...", "version": null, "properties": {}}
  ],
  "relationships": [
    {"from_label": "...", "from_name": "...", "rel_type": "...", "to_label": "...", "to_name": "..."}
  ]
}"""

_EMPTY_RESULT = ExtractionResult()


def _make_llm() -> ChatGoogleGenerativeAI:
    return ChatGoogleGenerativeAI(
        model=settings.graph_extraction_model,
        google_api_key=settings.google_api_key,
        temperature=0.0,
    )


async def _extract_one(llm: ChatGoogleGenerativeAI, text: str) -> ExtractionResult:
    messages = [SystemMessage(content=_SYSTEM_PROMPT), HumanMessage(content=f"Text:\n{text}")]
    try:
        response = await llm.ainvoke(messages)
        raw = response.content.strip()
        if raw.startswith("```"):
            raw = raw.split("\n", 1)[-1]
            if raw.endswith("```"):
                raw = raw[: raw.rfind("```")]
        data = json.loads(raw)
        return ExtractionResult(**data)
    except Exception:
        logger.exception("extractor: failed to parse Gemini response for chunk — returning empty")
        return _EMPTY_RESULT


async def extract_batch(texts: list[str]) -> list[ExtractionResult]:
    if not texts:
        return []

    llm = _make_llm()
    batch_size = settings.graph_extraction_batch_size
    results: list[ExtractionResult] = []

    for start in range(0, len(texts), batch_size):
        batch = texts[start : start + batch_size]
        batch_results = await asyncio.gather(
            *[_extract_one(llm, text) for text in batch],
            return_exceptions=False,
        )
        results.extend(batch_results)
        logger.debug(
            "extractor: processed batch %d-%d of %d",
            start,
            start + len(batch),
            len(texts),
        )

    return results