AdithyaVardan commited on
Commit
159f5a5
Β·
1 Parent(s): 0493349

Add agent and ingestion modules for Enterprise Knowledge Copilot

Browse files

- agent/: LangGraph orchestration with Gemini, Qdrant hybrid retrieval,
BGE-M3 embeddings, BGE reranker, GLiNER PII masking, FastAPI SSE endpoint
- ingestion/: document ingestion pipeline with Confluence, GitHub, PDF, and
Jira sources, semantic chunker, Celery background jobs, Supabase + Qdrant
storage, and nightly CAG snapshot job
- main.py: FastAPI entry point mounting both routers
- supabase/schema.sql: full schema with RLS team isolation
- requirements.txt, .env.example, .gitignore

.env.example ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Google AI Studio
2
+ GOOGLE_API_KEY=
3
+
4
+ # Qdrant (defaults to localhost:6333)
5
+ QDRANT_HOST=localhost
6
+ QDRANT_PORT=6333
7
+ QDRANT_COLLECTION=knowledge_base
8
+
9
+ # Supabase
10
+ SUPABASE_URL=https://your-project.supabase.co
11
+ SUPABASE_KEY=your-anon-or-service-role-key
12
+
13
+ # Redis (Celery broker + backend)
14
+ REDIS_URL=redis://localhost:6379/0
15
+
16
+ # Confluence
17
+ CONFLUENCE_BASE_URL=https://your-org.atlassian.net
18
+ CONFLUENCE_EMAIL=you@your-org.com
19
+ CONFLUENCE_TOKEN=
20
+
21
+ # GitHub
22
+ GITHUB_TOKEN=
23
+ GITHUB_PATH_FILTER=docs/
24
+ GITHUB_BRANCH=main
25
+
26
+ # Jira
27
+ JIRA_BASE_URL=https://your-org.atlassian.net
28
+ JIRA_API_TOKEN=
29
+ JIRA_PROJECT_KEY=
30
+
31
+ # Model overrides (optional β€” defaults shown)
32
+ PLANNER_MODEL=gemini-2.5-pro
33
+ SYNTHESISER_MODEL=gemini-2.5-pro
34
+ SUMMARISER_MODEL=gemini-2.5-flash
35
+ GUARDRAIL_MODEL=gemini-2.5-flash
36
+ CAG_MODEL=gemini-2.5-pro
.gitignore ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python
2
+ __pycache__/
3
+ *.py[cod]
4
+ *.pyo
5
+ *.pyd
6
+ .Python
7
+ *.egg-info/
8
+ dist/
9
+ build/
10
+ *.egg
11
+
12
+ # Virtual environments
13
+ .venv/
14
+ venv/
15
+ env/
16
+
17
+ # Environment variables
18
+ .env
19
+
20
+ # Data / model artifacts
21
+ data/
22
+ *.pkl
23
+ *.pt
24
+ *.bin
25
+
26
+ # IDE
27
+ .vscode/
28
+ .idea/
29
+
30
+ # OS
31
+ .DS_Store
32
+ Thumbs.db
33
+
34
+ # Logs
35
+ *.log
36
+
37
+ # Celery
38
+ celerybeat-schedule
39
+ celerybeat.pid
agent/README.md ADDED
@@ -0,0 +1,142 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Enterprise Knowledge Copilot β€” Agent Module
2
+
3
+ LangGraph-based multi-agent RAG system with Gemini, Qdrant, BGE-M3, and streaming SSE.
4
+
5
+ ## Architecture
6
+
7
+ ```
8
+ POST /agent/query
9
+ β”‚
10
+ β–Ό
11
+ planner_node (Gemini 2.5 Pro)
12
+ β”‚ ExecutionPlan
13
+ β–Ό
14
+ β”Œβ”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
15
+ β”‚ β”‚ (parallel)
16
+ doc_search ticket_lookup
17
+ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
18
+ β”‚ live_docs (conditional)
19
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
20
+ β”‚
21
+ synthesiser_node (Gemini 2.5 Pro, streaming)
22
+ β”‚
23
+ guardrail_node (Gemini 2.5 Flash)
24
+ β”‚
25
+ done / escalate
26
+ ```
27
+
28
+ ### Two-level orchestration
29
+
30
+ 1. **Planner** (Level 1): Gemini analyses the query and returns a structured `ExecutionPlan` β€” which agents to run and which can be parallelised.
31
+ 2. **LangGraph** (Level 2): Executes the plan, running independent nodes concurrently via `asyncio`.
32
+
33
+ ### Parallelism rules
34
+
35
+ - `doc_search` and `ticket_lookup` always run in parallel when both are needed.
36
+ - `live_docs` runs after `doc_search` only if confidence is low OR the query names an external library.
37
+ - Each agent node calls exactly one tool. No agent calls two tools.
38
+
39
+ ### Confidence gating
40
+
41
+ After BGE reranker scoring:
42
+ - `β‰₯ 0.6` β†’ `high`
43
+ - `0.4–0.6` β†’ `medium`
44
+ - `< 0.4` β†’ `low`
45
+
46
+ The synthesiser adjusts its tone and the guardrail applies stricter escalation at low confidence.
47
+
48
+ ## Setup
49
+
50
+ ```bash
51
+ # 1. Install dependencies
52
+ pip install -r requirements.txt
53
+
54
+ # 2. Copy env file and fill in keys
55
+ cp .env.example .env
56
+ # Set at minimum: GOOGLE_API_KEY
57
+
58
+ # 3. Start Qdrant locally
59
+ docker run -p 6333:6333 qdrant/qdrant
60
+
61
+ # 4. Run the API
62
+ uvicorn main:app --reload
63
+ ```
64
+
65
+ Your `main.py` should include:
66
+
67
+ ```python
68
+ from fastapi import FastAPI
69
+ from agent.api import router
70
+
71
+ app = FastAPI()
72
+ app.include_router(router)
73
+ ```
74
+
75
+ ## Environment variables
76
+
77
+ | Variable | Required | Description |
78
+ |---|---|---|
79
+ | `GOOGLE_API_KEY` | βœ… | Google AI Studio key |
80
+ | `QDRANT_HOST` | optional | Default: `localhost` |
81
+ | `QDRANT_PORT` | optional | Default: `6333` |
82
+ | `JIRA_BASE_URL` | optional | Enables ticket_lookup |
83
+ | `JIRA_API_TOKEN` | optional | Enables ticket_lookup |
84
+ | `FIRECRAWL_API_KEY` | optional | Enables live_docs |
85
+ | `TAVILY_API_KEY` | optional | Enables live_docs |
86
+
87
+ ## BM25 index
88
+
89
+ `doc_search` expects a BM25 index at `data/bm25_index.pkl` as a pickle with:
90
+
91
+ ```python
92
+ {
93
+ "index": BM25Okapi(...),
94
+ "corpus": ["doc text 1", "doc text 2", ...],
95
+ "doc_ids": ["chunk_id_1", "chunk_id_2", ...]
96
+ }
97
+ ```
98
+
99
+ If the file is missing, BM25 is silently skipped and only Qdrant vectors are used.
100
+
101
+ ## Qdrant collection schema
102
+
103
+ Collection name: `knowledge_base`
104
+
105
+ ```
106
+ dense vector: name="dense", size=1024
107
+ sparse vector: name="sparse"
108
+ payload: chunk_id, text, source, source_type, team_id
109
+ ```
110
+
111
+ Data is filtered by `team_id` on every query β€” teams see only their own documents.
112
+
113
+ ## Adding a new agent
114
+
115
+ 1. Add a new tool in `tools/my_tool.py` with `async def run_my_tool(query, team_id) -> list[RetrievedChunk]`.
116
+ 2. Add `"my_tool"` to the `Literal` in `models.py β†’ AgentTask.agent`.
117
+ 3. Add a node function in `graph.py`:
118
+
119
+ ```python
120
+ async def my_tool_node(state: KnowledgeGraphState) -> dict:
121
+ await _push_event(queue, "agent_started", {"agent": "my_tool"})
122
+ chunks = await run_my_tool(task_input, state.query_input.team_id)
123
+ ...
124
+ ```
125
+
126
+ 4. Register the node and wire its edges in `build_graph()`.
127
+ 5. Update `PLANNER_SYSTEM_PROMPT` in `prompts.py` to describe when to use the new agent.
128
+
129
+ ## SSE event stream
130
+
131
+ Events emitted in order:
132
+
133
+ | Event | Payload |
134
+ |---|---|
135
+ | `plan_ready` | `{tasks, reasoning}` |
136
+ | `agent_started` | `{agent}` |
137
+ | `agent_done` | `{agent, chunks, confidence}` |
138
+ | `synthesis_started` | `{}` |
139
+ | `answer_chunk` | `{chunk}` (one per token) |
140
+ | `guardrail_result` | `{score, escalate}` |
141
+ | `done` | `{}` |
142
+ | `error` | `{message}` |
agent/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """Enterprise Knowledge Copilot agent package."""
2
+
3
+ from agent.api import router
4
+ from agent.config import settings
5
+
6
+ __all__ = ["router", "settings"]
agent/agents/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Agent implementations package."""
agent/agents/_gemini.py ADDED
@@ -0,0 +1,88 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Shared Gemini call helper with exponential-backoff retry."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import asyncio
6
+ import json
7
+ import logging
8
+ from typing import Any
9
+
10
+ from langchain_google_genai import ChatGoogleGenerativeAI
11
+ from langchain_core.messages import HumanMessage, SystemMessage
12
+
13
+ from agent.config import settings
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ async def call_gemini_text(
19
+ model_name: str,
20
+ system_prompt: str,
21
+ user_message: str,
22
+ ) -> str:
23
+ llm = ChatGoogleGenerativeAI(
24
+ model=model_name,
25
+ google_api_key=settings.google_api_key,
26
+ temperature=0.0,
27
+ )
28
+ messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_message)]
29
+
30
+ for attempt in range(settings.gemini_max_retries):
31
+ try:
32
+ response = await llm.ainvoke(messages)
33
+ return response.content
34
+ except Exception as exc:
35
+ if attempt == settings.gemini_max_retries - 1:
36
+ logger.error("Gemini call failed after %d retries: %s", settings.gemini_max_retries, exc)
37
+ raise
38
+ delay = settings.gemini_retry_base_delay * (2 ** attempt)
39
+ logger.warning("Gemini call attempt %d failed (%s); retrying in %.1fs", attempt + 1, exc, delay)
40
+ await asyncio.sleep(delay)
41
+
42
+ raise RuntimeError("Unreachable")
43
+
44
+
45
+ async def call_gemini_json(
46
+ model_name: str,
47
+ system_prompt: str,
48
+ user_message: str,
49
+ ) -> dict[str, Any]:
50
+ raw = await call_gemini_text(model_name, system_prompt, user_message)
51
+ # Strip markdown code fences if model ignores the instruction
52
+ cleaned = raw.strip()
53
+ if cleaned.startswith("```"):
54
+ cleaned = cleaned.split("\n", 1)[-1]
55
+ if cleaned.endswith("```"):
56
+ cleaned = cleaned[: cleaned.rfind("```")]
57
+ try:
58
+ return json.loads(cleaned)
59
+ except json.JSONDecodeError as exc:
60
+ logger.error("Failed to parse Gemini JSON response: %s\nRaw: %s", exc, raw[:500])
61
+ raise
62
+
63
+
64
+ async def stream_gemini_text(
65
+ model_name: str,
66
+ system_prompt: str,
67
+ user_message: str,
68
+ ):
69
+ llm = ChatGoogleGenerativeAI(
70
+ model=model_name,
71
+ google_api_key=settings.google_api_key,
72
+ temperature=0.0,
73
+ streaming=True,
74
+ )
75
+ messages = [SystemMessage(content=system_prompt), HumanMessage(content=user_message)]
76
+
77
+ for attempt in range(settings.gemini_max_retries):
78
+ try:
79
+ async for chunk in llm.astream(messages):
80
+ yield chunk.content
81
+ return
82
+ except Exception as exc:
83
+ if attempt == settings.gemini_max_retries - 1:
84
+ logger.error("Gemini stream failed after %d retries: %s", settings.gemini_max_retries, exc)
85
+ raise
86
+ delay = settings.gemini_retry_base_delay * (2 ** attempt)
87
+ logger.warning("Gemini stream attempt %d failed (%s); retrying in %.1fs", attempt + 1, exc, delay)
88
+ await asyncio.sleep(delay)
agent/agents/guardrail.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Guardrail agent β€” checks answer groundedness, returns confidence score."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+ from typing import Optional
7
+
8
+ from agent.agents._gemini import call_gemini_json
9
+ from agent.config import settings
10
+ from agent.models import RetrievedChunk
11
+ from agent.prompts import GUARDRAIL_SYSTEM_PROMPT, build_guardrail_prompt
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+
16
+ async def run_guardrail(
17
+ answer: str,
18
+ chunks: list[RetrievedChunk],
19
+ ) -> tuple[float, bool]:
20
+ """Returns (score, escalate) β€” score in [0,1], escalate True if score < 0.5."""
21
+ if not answer.strip():
22
+ logger.warning("guardrail: empty answer received, escalating")
23
+ return 0.0, True
24
+
25
+ chunks_text = "\n\n".join(
26
+ f"[{c.source}] {c.text}" for c in chunks
27
+ )
28
+ prompt = build_guardrail_prompt(answer, chunks_text)
29
+
30
+ try:
31
+ data = await call_gemini_json(
32
+ model_name=settings.guardrail_model,
33
+ system_prompt=GUARDRAIL_SYSTEM_PROMPT,
34
+ user_message=prompt,
35
+ )
36
+ score = float(data.get("score", 0.0))
37
+ escalate = bool(data.get("escalate", score < 0.5))
38
+ logger.info("guardrail: score=%.3f escalate=%s reasoning=%r", score, escalate, data.get("reasoning"))
39
+ return score, escalate
40
+ except Exception:
41
+ logger.exception("guardrail: evaluation failed β€” escalating by default")
42
+ return 0.0, True
agent/agents/planner.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Planner agent β€” analyses query, returns ExecutionPlan via Gemini Pro."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+
7
+ from agent.agents._gemini import call_gemini_json
8
+ from agent.config import settings
9
+ from agent.models import AgentTask, ExecutionPlan, QueryInput
10
+ from agent.prompts import PLANNER_SYSTEM_PROMPT
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+
15
+ async def run_planner(query_input: QueryInput) -> ExecutionPlan:
16
+ logger.info("planner: generating execution plan for query=%r", query_input.query)
17
+
18
+ data = await call_gemini_json(
19
+ model_name=settings.planner_model,
20
+ system_prompt=PLANNER_SYSTEM_PROMPT,
21
+ user_message=f"Query: {query_input.query}",
22
+ )
23
+
24
+ tasks = [AgentTask(**t) for t in data["tasks"]]
25
+ plan = ExecutionPlan(tasks=tasks, reasoning=data.get("reasoning", ""))
26
+ logger.info(
27
+ "planner: plan has %d tasks β€” %s",
28
+ len(tasks),
29
+ [t.agent for t in tasks],
30
+ )
31
+ return plan
agent/agents/synthesiser.py ADDED
@@ -0,0 +1,62 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Synthesiser agent β€” merges agent results and streams a cited answer."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import asyncio
6
+ import logging
7
+ from typing import AsyncGenerator
8
+
9
+ from agent.agents._gemini import stream_gemini_text
10
+ from agent.config import settings
11
+ from agent.models import AgentResult, RetrievedChunk
12
+ from agent.prompts import SYNTHESISER_SYSTEM_PROMPT, build_synthesiser_prompt
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+
17
+ def _overall_confidence(agent_results: dict[str, AgentResult]) -> str:
18
+ levels = [r.retrieval_confidence for r in agent_results.values() if r.chunks]
19
+ if not levels:
20
+ return "low"
21
+ if "high" in levels:
22
+ return "high"
23
+ if "medium" in levels:
24
+ return "medium"
25
+ return "low"
26
+
27
+
28
+ def _collect_all_chunks(agent_results: dict[str, AgentResult]) -> list[RetrievedChunk]:
29
+ seen_ids: set[str] = set()
30
+ chunks: list[RetrievedChunk] = []
31
+ for result in agent_results.values():
32
+ for chunk in result.chunks:
33
+ if chunk.chunk_id not in seen_ids:
34
+ seen_ids.add(chunk.chunk_id)
35
+ chunks.append(chunk)
36
+ return chunks
37
+
38
+
39
+ async def stream_synthesis(
40
+ query: str,
41
+ agent_results: dict[str, AgentResult],
42
+ ) -> AsyncGenerator[str, None]:
43
+ confidence = _overall_confidence(agent_results)
44
+ all_chunks = _collect_all_chunks(agent_results)
45
+
46
+ chunks_text = "\n\n".join(
47
+ f"[{c.source}] {c.text}" for c in all_chunks
48
+ )
49
+ prompt = build_synthesiser_prompt(query, confidence, chunks_text)
50
+
51
+ logger.info(
52
+ "synthesiser: streaming answer β€” confidence=%s, chunks=%d",
53
+ confidence,
54
+ len(all_chunks),
55
+ )
56
+
57
+ async for token in stream_gemini_text(
58
+ model_name=settings.synthesiser_model,
59
+ system_prompt=SYNTHESISER_SYSTEM_PROMPT,
60
+ user_message=prompt,
61
+ ):
62
+ yield token
agent/api.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """FastAPI router with SSE streaming endpoint for the knowledge copilot."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import asyncio
6
+ import json
7
+ import logging
8
+ from typing import AsyncGenerator
9
+
10
+ from fastapi import APIRouter
11
+ from fastapi.responses import StreamingResponse
12
+
13
+ from agent.graph import graph
14
+ from agent.models import KnowledgeGraphState, QueryInput
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+ router = APIRouter(prefix="/agent", tags=["agent"])
19
+
20
+
21
+ async def _event_generator(
22
+ query_input: QueryInput,
23
+ queue: asyncio.Queue,
24
+ ) -> AsyncGenerator[str, None]:
25
+ _SENTINEL = object()
26
+
27
+ async def run_graph() -> None:
28
+ initial_state = KnowledgeGraphState(
29
+ query_input=query_input,
30
+ sse_queue=queue,
31
+ )
32
+ try:
33
+ await graph.ainvoke(initial_state)
34
+ except Exception as exc:
35
+ logger.exception("Graph execution error for session=%s", query_input.session_id)
36
+ await queue.put({"event": "error", "data": {"message": str(exc)}})
37
+ finally:
38
+ await queue.put(_SENTINEL)
39
+
40
+ task = asyncio.create_task(run_graph())
41
+
42
+ try:
43
+ while True:
44
+ item = await queue.get()
45
+ if item is _SENTINEL:
46
+ break
47
+
48
+ event_name = item.get("event", "message")
49
+ data_str = json.dumps(item.get("data", {}))
50
+ yield f"event: {event_name}\ndata: {data_str}\n\n"
51
+
52
+ yield "event: done\ndata: {}\n\n"
53
+
54
+ except asyncio.CancelledError:
55
+ logger.info("SSE stream cancelled for session=%s", query_input.session_id)
56
+ task.cancel()
57
+ raise
58
+ finally:
59
+ if not task.done():
60
+ task.cancel()
61
+
62
+
63
+ @router.post("/query")
64
+ async def query_endpoint(query_input: QueryInput) -> StreamingResponse:
65
+ queue: asyncio.Queue = asyncio.Queue()
66
+ logger.info(
67
+ "query_endpoint: session=%s team=%s query=%r",
68
+ query_input.session_id,
69
+ query_input.team_id,
70
+ query_input.query,
71
+ )
72
+
73
+ return StreamingResponse(
74
+ _event_generator(query_input, queue),
75
+ media_type="text/event-stream",
76
+ headers={
77
+ "Cache-Control": "no-cache",
78
+ "X-Accel-Buffering": "no",
79
+ },
80
+ )
agent/config.py ADDED
@@ -0,0 +1,50 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic_settings import BaseSettings, SettingsConfigDict
2
+
3
+
4
+ class Settings(BaseSettings):
5
+ model_config = SettingsConfigDict(
6
+ env_file=".env",
7
+ env_file_encoding="utf-8",
8
+ case_sensitive=False,
9
+ extra="ignore",
10
+ )
11
+
12
+ google_api_key: str = ""
13
+
14
+ planner_model: str = "gemini-2.5-pro"
15
+ synthesiser_model: str = "gemini-2.5-pro"
16
+ summariser_model: str = "gemini-2.5-flash"
17
+ guardrail_model: str = "gemini-2.5-flash"
18
+
19
+ qdrant_host: str = "localhost"
20
+ qdrant_port: int = 6333
21
+ qdrant_collection: str = "knowledge_base"
22
+ qdrant_dense_vector_name: str = "dense"
23
+ qdrant_sparse_vector_name: str = "sparse"
24
+ qdrant_dense_size: int = 1024
25
+
26
+ bge_embedding_model: str = "BAAI/bge-m3"
27
+ bge_reranker_model: str = "BAAI/bge-reranker-v2-m3"
28
+
29
+ bm25_index_path: str = "data/bm25_index.pkl"
30
+
31
+ gliner_model: str = "urchade/gliner_mediumv2.1"
32
+
33
+ rrf_top_k: int = 50
34
+ final_top_k: int = 5
35
+ reranker_high_threshold: float = 0.6
36
+ reranker_medium_threshold: float = 0.4
37
+ live_docs_confidence_threshold: float = 0.5
38
+
39
+ gemini_max_retries: int = 3
40
+ gemini_retry_base_delay: float = 1.0
41
+
42
+ jira_base_url: str = ""
43
+ jira_api_token: str = ""
44
+ jira_project_key: str = ""
45
+
46
+ firecrawl_api_key: str = ""
47
+ tavily_api_key: str = ""
48
+
49
+
50
+ settings = Settings()
agent/graph.py ADDED
@@ -0,0 +1,251 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """LangGraph graph definition β€” nodes, edges, and parallel execution."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import asyncio
6
+ import logging
7
+ from typing import Any
8
+
9
+ from langgraph.graph import END, StateGraph
10
+
11
+ from agent.agents.guardrail import run_guardrail
12
+ from agent.agents.planner import run_planner
13
+ from agent.agents.synthesiser import stream_synthesis
14
+ from agent.models import AgentResult, KnowledgeGraphState, RetrievedChunk
15
+ from agent.tools.doc_search import compute_retrieval_confidence, run_doc_search
16
+ from agent.tools.live_docs import run_live_docs
17
+ from agent.tools.ticket_lookup import run_ticket_lookup
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+
22
+ async def _push_event(queue: asyncio.Queue, event: str, data: Any) -> None:
23
+ if queue is not None:
24
+ await queue.put({"event": event, "data": data})
25
+
26
+
27
+ async def planner_node(state: KnowledgeGraphState) -> dict:
28
+ queue = state.sse_queue
29
+ plan = await run_planner(state.query_input)
30
+ await _push_event(
31
+ queue,
32
+ "plan_ready",
33
+ {
34
+ "tasks": [t.model_dump() for t in plan.tasks],
35
+ "reasoning": plan.reasoning,
36
+ },
37
+ )
38
+ return {"execution_plan": plan}
39
+
40
+
41
+ async def doc_search_node(state: KnowledgeGraphState) -> dict:
42
+ queue = state.sse_queue
43
+ await _push_event(queue, "agent_started", {"agent": "doc_search"})
44
+
45
+ task_input = _find_task_input(state, "doc_search") or state.query_input.query
46
+ chunks: list[RetrievedChunk] = []
47
+ error: str | None = None
48
+
49
+ try:
50
+ chunks = await run_doc_search(task_input, state.query_input.team_id)
51
+ except Exception as exc:
52
+ logger.exception("doc_search_node error")
53
+ error = str(exc)
54
+
55
+ confidence = compute_retrieval_confidence(chunks)
56
+ result = AgentResult(
57
+ agent="doc_search",
58
+ chunks=chunks,
59
+ retrieval_confidence=confidence,
60
+ error=error,
61
+ )
62
+ await _push_event(
63
+ queue,
64
+ "agent_done",
65
+ {"agent": "doc_search", "chunks": len(chunks), "confidence": confidence},
66
+ )
67
+ return {"agent_results": {**state.agent_results, "doc_search": result}}
68
+
69
+
70
+ async def ticket_lookup_node(state: KnowledgeGraphState) -> dict:
71
+ queue = state.sse_queue
72
+ await _push_event(queue, "agent_started", {"agent": "ticket_lookup"})
73
+
74
+ task_input = _find_task_input(state, "ticket_lookup") or state.query_input.query
75
+ chunks: list[RetrievedChunk] = []
76
+ error: str | None = None
77
+
78
+ try:
79
+ chunks = await run_ticket_lookup(task_input, state.query_input.team_id)
80
+ except Exception as exc:
81
+ logger.exception("ticket_lookup_node error")
82
+ error = str(exc)
83
+
84
+ confidence = compute_retrieval_confidence(chunks)
85
+ result = AgentResult(
86
+ agent="ticket_lookup",
87
+ chunks=chunks,
88
+ retrieval_confidence=confidence,
89
+ error=error,
90
+ )
91
+ await _push_event(
92
+ queue,
93
+ "agent_done",
94
+ {"agent": "ticket_lookup", "chunks": len(chunks), "confidence": confidence},
95
+ )
96
+ return {"agent_results": {**state.agent_results, "ticket_lookup": result}}
97
+
98
+
99
+ async def live_docs_node(state: KnowledgeGraphState) -> dict:
100
+ queue = state.sse_queue
101
+ await _push_event(queue, "agent_started", {"agent": "live_docs"})
102
+
103
+ task_input = _find_task_input(state, "live_docs") or state.query_input.query
104
+ chunks: list[RetrievedChunk] = []
105
+ error: str | None = None
106
+
107
+ try:
108
+ chunks = await run_live_docs(task_input, state.query_input.team_id)
109
+ except Exception as exc:
110
+ logger.exception("live_docs_node error")
111
+ error = str(exc)
112
+
113
+ confidence = compute_retrieval_confidence(chunks)
114
+ result = AgentResult(
115
+ agent="live_docs",
116
+ chunks=chunks,
117
+ retrieval_confidence=confidence,
118
+ error=error,
119
+ )
120
+ await _push_event(
121
+ queue,
122
+ "agent_done",
123
+ {"agent": "live_docs", "chunks": len(chunks), "confidence": confidence},
124
+ )
125
+ return {"agent_results": {**state.agent_results, "live_docs": result}}
126
+
127
+
128
+ async def synthesiser_node(state: KnowledgeGraphState) -> dict:
129
+ queue = state.sse_queue
130
+ await _push_event(queue, "synthesis_started", {})
131
+
132
+ full_answer_parts: list[str] = []
133
+ async for token in stream_synthesis(state.query_input.query, state.agent_results):
134
+ full_answer_parts.append(token)
135
+ await _push_event(queue, "answer_chunk", {"chunk": token})
136
+
137
+ final_answer = "".join(full_answer_parts)
138
+
139
+ all_chunks: list[RetrievedChunk] = []
140
+ seen: set[str] = set()
141
+ for result in state.agent_results.values():
142
+ for chunk in result.chunks:
143
+ if chunk.chunk_id not in seen:
144
+ seen.add(chunk.chunk_id)
145
+ all_chunks.append(chunk)
146
+
147
+ return {"final_answer": final_answer, "citations": all_chunks}
148
+
149
+
150
+ async def join_node(state: KnowledgeGraphState) -> dict:
151
+ """Fan-in synchronisation point β€” waits for all retrieval nodes, then hands off to synthesiser."""
152
+ await _push_event(state.sse_queue, "agent_started", {"agent": "synthesiser"})
153
+ return {}
154
+
155
+
156
+ async def guardrail_node(state: KnowledgeGraphState) -> dict:
157
+ queue = state.sse_queue
158
+ score, escalate = await run_guardrail(
159
+ state.final_answer or "",
160
+ state.citations,
161
+ )
162
+ await _push_event(
163
+ queue,
164
+ "guardrail_result",
165
+ {"score": score, "escalate": escalate},
166
+ )
167
+ return {
168
+ "guardrail_passed": not escalate,
169
+ "guardrail_score": score,
170
+ "escalate": escalate,
171
+ }
172
+
173
+
174
+ def _find_task_input(state: KnowledgeGraphState, agent: str) -> str | None:
175
+ if state.execution_plan is None:
176
+ return None
177
+ for task in state.execution_plan.tasks:
178
+ if task.agent == agent:
179
+ return task.input
180
+ return None
181
+
182
+
183
+ def _plan_includes(state: KnowledgeGraphState, agent: str) -> bool:
184
+ if state.execution_plan is None:
185
+ return False
186
+ return any(t.agent == agent for t in state.execution_plan.tasks)
187
+
188
+
189
+ def _route_after_planner(state: KnowledgeGraphState) -> list[str]:
190
+ if state.execution_plan is None:
191
+ return ["synthesiser_node"]
192
+
193
+ plan = state.execution_plan
194
+ immediate: list[str] = []
195
+ for task in plan.tasks:
196
+ if not task.depends_on:
197
+ immediate.append(f"{task.agent}_node")
198
+
199
+ # If nothing is immediate (shouldn't happen), fall back to synthesiser
200
+ return immediate or ["synthesiser_node"]
201
+
202
+
203
+ def _route_after_guardrail(state: KnowledgeGraphState) -> str:
204
+ return "escalate" if state.escalate else END
205
+
206
+
207
+ def build_graph() -> Any:
208
+ builder = StateGraph(KnowledgeGraphState)
209
+
210
+ builder.add_node("planner_node", planner_node)
211
+ builder.add_node("doc_search_node", doc_search_node)
212
+ builder.add_node("ticket_lookup_node", ticket_lookup_node)
213
+ builder.add_node("live_docs_node", live_docs_node)
214
+ builder.add_node("join_node", join_node)
215
+ builder.add_node("synthesiser_node", synthesiser_node)
216
+ builder.add_node("guardrail_node", guardrail_node)
217
+
218
+ builder.set_entry_point("planner_node")
219
+
220
+ builder.add_conditional_edges(
221
+ "planner_node",
222
+ _route_after_planner,
223
+ {
224
+ "doc_search_node": "doc_search_node",
225
+ "ticket_lookup_node": "ticket_lookup_node",
226
+ "live_docs_node": "live_docs_node",
227
+ "summariser_node": "synthesiser_node",
228
+ "synthesiser_node": "synthesiser_node",
229
+ },
230
+ )
231
+
232
+ # Retrieval nodes all converge on join_node β€” LangGraph waits for every
233
+ # incoming edge to fire before executing join_node (fan-in).
234
+ builder.add_edge("doc_search_node", "join_node")
235
+ builder.add_edge("ticket_lookup_node", "join_node")
236
+ builder.add_edge("live_docs_node", "join_node")
237
+
238
+ builder.add_edge("join_node", "synthesiser_node")
239
+
240
+ builder.add_edge("synthesiser_node", "guardrail_node")
241
+
242
+ builder.add_conditional_edges(
243
+ "guardrail_node",
244
+ _route_after_guardrail,
245
+ {END: END, "escalate": END},
246
+ )
247
+
248
+ return builder.compile()
249
+
250
+
251
+ graph = build_graph()
agent/models.py ADDED
@@ -0,0 +1,58 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ from typing import Any, Literal, Optional
5
+
6
+ from pydantic import BaseModel, Field
7
+
8
+
9
+ class QueryInput(BaseModel):
10
+ query: str
11
+ team_id: str
12
+ session_id: str
13
+
14
+
15
+ class AgentTask(BaseModel):
16
+ agent: Literal["doc_search", "ticket_lookup", "live_docs", "summariser"]
17
+ input: str
18
+ depends_on: list[str] = Field(default_factory=list)
19
+
20
+
21
+ class ExecutionPlan(BaseModel):
22
+ tasks: list[AgentTask]
23
+ reasoning: str
24
+
25
+
26
+ class RetrievedChunk(BaseModel):
27
+ chunk_id: str
28
+ text: str
29
+ source: str
30
+ source_type: str
31
+ score: float
32
+ reranker_score: Optional[float] = None
33
+
34
+
35
+ class AgentResult(BaseModel):
36
+ agent: str
37
+ chunks: list[RetrievedChunk] = Field(default_factory=list)
38
+ retrieval_confidence: Literal["high", "medium", "low"] = "low"
39
+ error: Optional[str] = None
40
+
41
+
42
+ class KnowledgeGraphState(BaseModel):
43
+ model_config = {"arbitrary_types_allowed": True}
44
+
45
+ query_input: QueryInput
46
+ execution_plan: Optional[ExecutionPlan] = None
47
+ agent_results: dict[str, AgentResult] = Field(default_factory=dict)
48
+ final_answer: Optional[str] = None
49
+ citations: list[RetrievedChunk] = Field(default_factory=list)
50
+ guardrail_passed: Optional[bool] = None
51
+ guardrail_score: Optional[float] = None
52
+ escalate: bool = False
53
+ sse_queue: Any = None # asyncio.Queue for streaming events
54
+
55
+
56
+ class SSEEvent(BaseModel):
57
+ event: str
58
+ data: Any
agent/prompts.py ADDED
@@ -0,0 +1,107 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ PLANNER_SYSTEM_PROMPT = """You are a planning agent for an Enterprise Knowledge Copilot.
2
+
3
+ Given a user query, decide which retrieval agents are needed and in what order.
4
+
5
+ Available agents:
6
+ - doc_search: Searches the internal knowledge base (Qdrant vector DB). Use for product docs, runbooks, internal wikis, architecture docs.
7
+ - ticket_lookup: Searches Jira tickets. Use when query mentions bugs, issues, tickets, sprints, or task tracking.
8
+ - live_docs: Fetches live web content via Firecrawl/Tavily. Use ONLY when the query mentions a specific external library, framework, or third-party tool where internal docs are insufficient.
9
+ - summariser: Summarises a large set of retrieved chunks. Use ONLY when more than 10 chunks are expected.
10
+
11
+ Rules:
12
+ 1. doc_search and ticket_lookup should run in parallel when both are needed β€” set depends_on: [] for both.
13
+ 2. live_docs only runs if you expect doc_search confidence will be low OR the query names a specific external library/framework. Set depends_on: ["doc_search"] to run after.
14
+ 3. summariser only runs after doc_search. Set depends_on: ["doc_search"].
15
+ 4. Do NOT include agents that are not needed for this query.
16
+ 5. Rephrase the input for each agent to be focused and specific to what that agent can retrieve.
17
+
18
+ Return ONLY valid JSON matching this exact schema. No preamble. No markdown code fences. No explanation outside the JSON.
19
+
20
+ Schema:
21
+ {
22
+ "tasks": [
23
+ {
24
+ "agent": "<agent_name>",
25
+ "input": "<focused query for this agent>",
26
+ "depends_on": []
27
+ }
28
+ ],
29
+ "reasoning": "<one sentence explaining your agent selection>"
30
+ }"""
31
+
32
+
33
+ SYNTHESISER_SYSTEM_PROMPT = """You are a synthesiser agent for an Enterprise Knowledge Copilot.
34
+
35
+ Your job: given a user query and retrieved knowledge chunks from multiple agents, produce a clear, accurate, cited answer.
36
+
37
+ Rules:
38
+ 1. Every factual claim MUST be followed by an inline citation in the format [source_name].
39
+ 2. Do NOT make any claim that is not directly supported by the retrieved chunks.
40
+ 3. If retrieval_confidence is "low", explicitly state at the top: "Note: retrieved knowledge has low confidence. This answer may be incomplete."
41
+ 4. If retrieval_confidence is "medium", add a brief caveat recommending the user verify key details.
42
+ 5. Structure your answer with clear paragraphs. Use bullet points for lists of steps or options.
43
+ 6. If chunks from different agents contradict each other, note the discrepancy and present both views.
44
+ 7. Be concise β€” prefer 3-5 sentences over long paragraphs unless complexity demands more.
45
+
46
+ You will receive:
47
+ - original_query: the user's question
48
+ - retrieval_confidence: overall confidence level
49
+ - chunks: list of retrieved chunks with their source and text"""
50
+
51
+
52
+ GUARDRAIL_SYSTEM_PROMPT = """You are a guardrail agent for an Enterprise Knowledge Copilot.
53
+
54
+ Your job: evaluate whether the generated answer is fully grounded in the provided source chunks.
55
+
56
+ For each claim in the answer, check if it appears in or is directly inferrable from the provided chunks.
57
+
58
+ Scoring:
59
+ - 1.0: Every claim is directly supported by a chunk.
60
+ - 0.7-0.9: Most claims are supported; minor inferences acceptable.
61
+ - 0.5-0.7: Some claims lack clear chunk support; uncertain.
62
+ - 0.0-0.5: Significant claims are unsupported or hallucinated.
63
+
64
+ Rules:
65
+ - If score < 0.5, set escalate: true.
66
+ - Return ONLY valid JSON. No preamble. No markdown code fences.
67
+
68
+ Schema:
69
+ {
70
+ "score": <float between 0.0 and 1.0>,
71
+ "escalate": <true or false>,
72
+ "reasoning": "<one sentence explaining the score>"
73
+ }"""
74
+
75
+
76
+ def build_synthesiser_prompt(
77
+ query: str,
78
+ retrieval_confidence: str,
79
+ chunks_text: str,
80
+ ) -> str:
81
+ return f"""original_query: {query}
82
+
83
+ retrieval_confidence: {retrieval_confidence}
84
+
85
+ Retrieved chunks:
86
+ {chunks_text}
87
+
88
+ Generate your answer now."""
89
+
90
+
91
+ def build_guardrail_prompt(answer: str, chunks_text: str) -> str:
92
+ return f"""Answer to evaluate:
93
+ {answer}
94
+
95
+ Source chunks:
96
+ {chunks_text}
97
+
98
+ Evaluate grounding and return JSON now."""
99
+
100
+
101
+ def build_summariser_prompt(chunks_text: str, query: str) -> str:
102
+ return f"""Summarise the following retrieved chunks in relation to this query: {query}
103
+
104
+ Chunks:
105
+ {chunks_text}
106
+
107
+ Provide a concise summary (3-5 sentences) capturing the key points."""
agent/tools/__init__.py ADDED
@@ -0,0 +1 @@
 
 
1
+ """Agent tools package."""
agent/tools/doc_search.py ADDED
@@ -0,0 +1,272 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Qdrant hybrid retrieval with BGE-M3 embeddings, BM25, RRF fusion, and reranking."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+ import pickle
7
+ from pathlib import Path
8
+ from typing import Optional
9
+
10
+ from FlagEmbedding import BGEM3FlagModel, FlagReranker
11
+ from gliner import GLiNER
12
+ from qdrant_client import AsyncQdrantClient
13
+ from qdrant_client.http import models as qmodels
14
+ from rank_bm25 import BM25Okapi
15
+
16
+ from agent.config import settings
17
+ from agent.models import RetrievedChunk
18
+
19
+ logger = logging.getLogger(__name__)
20
+
21
+ _PII_ENTITY_TYPES = [
22
+ "person",
23
+ "email",
24
+ "phone",
25
+ "ssn",
26
+ "credit_card",
27
+ "address",
28
+ "date_of_birth",
29
+ ]
30
+
31
+ # singletons β€” loaded once on first call, models are expensive to initialise
32
+ _embedding_model: Optional[BGEM3FlagModel] = None
33
+ _reranker: Optional[FlagReranker] = None
34
+ _gliner: Optional[GLiNER] = None
35
+ _bm25_index: Optional[BM25Okapi] = None
36
+ _bm25_corpus: Optional[list[str]] = None
37
+ _bm25_doc_ids: Optional[list[str]] = None
38
+
39
+
40
+ def _get_embedding_model() -> BGEM3FlagModel:
41
+ global _embedding_model
42
+ if _embedding_model is None:
43
+ logger.info("Loading BGE-M3 embedding model: %s", settings.bge_embedding_model)
44
+ _embedding_model = BGEM3FlagModel(
45
+ settings.bge_embedding_model, use_fp16=True
46
+ )
47
+ return _embedding_model
48
+
49
+
50
+ def _get_reranker() -> FlagReranker:
51
+ global _reranker
52
+ if _reranker is None:
53
+ logger.info("Loading BGE reranker: %s", settings.bge_reranker_model)
54
+ _reranker = FlagReranker(settings.bge_reranker_model, use_fp16=True)
55
+ return _reranker
56
+
57
+
58
+ def _get_gliner() -> GLiNER:
59
+ global _gliner
60
+ if _gliner is None:
61
+ logger.info("Loading GLiNER model: %s", settings.gliner_model)
62
+ _gliner = GLiNER.from_pretrained(settings.gliner_model)
63
+ return _gliner
64
+
65
+
66
+ def _load_bm25() -> tuple[Optional[BM25Okapi], Optional[list[str]], Optional[list[str]]]:
67
+ global _bm25_index, _bm25_corpus, _bm25_doc_ids
68
+ if _bm25_index is not None:
69
+ return _bm25_index, _bm25_corpus, _bm25_doc_ids
70
+
71
+ index_path = Path(settings.bm25_index_path)
72
+ if not index_path.exists():
73
+ logger.warning("BM25 index not found at %s β€” skipping BM25 retrieval", index_path)
74
+ return None, None, None
75
+
76
+ try:
77
+ with index_path.open("rb") as f:
78
+ data = pickle.load(f)
79
+ _bm25_index = data["index"]
80
+ _bm25_corpus = data["corpus"]
81
+ _bm25_doc_ids = data["doc_ids"]
82
+ logger.info("Loaded BM25 index with %d documents", len(_bm25_doc_ids))
83
+ except Exception:
84
+ logger.exception("Failed to load BM25 index from %s", index_path)
85
+ return None, None, None
86
+
87
+ return _bm25_index, _bm25_corpus, _bm25_doc_ids
88
+
89
+
90
+ def _mask_pii(text: str) -> str:
91
+ try:
92
+ gliner = _get_gliner()
93
+ entities = gliner.predict_entities(text, _PII_ENTITY_TYPES, threshold=0.5)
94
+ # iterate reverse so substring replacements don't shift later indices
95
+ entities_sorted = sorted(entities, key=lambda e: e["start"], reverse=True)
96
+ masked = text
97
+ for ent in entities_sorted:
98
+ masked = masked[: ent["start"]] + "[REDACTED]" + masked[ent["end"] :]
99
+ return masked
100
+ except Exception:
101
+ logger.exception("GLiNER PII masking failed; using raw query")
102
+ return text
103
+
104
+
105
+ def _reciprocal_rank_fusion(
106
+ ranked_lists: list[list[str]], k: int = 60
107
+ ) -> dict[str, float]:
108
+ scores: dict[str, float] = {}
109
+ for ranked in ranked_lists:
110
+ for rank, doc_id in enumerate(ranked):
111
+ scores[doc_id] = scores.get(doc_id, 0.0) + 1.0 / (k + rank + 1)
112
+ return scores
113
+
114
+
115
+ async def run_doc_search(query: str, team_id: str) -> list[RetrievedChunk]:
116
+ masked_query = _mask_pii(query)
117
+ logger.info("doc_search: masked query = %r", masked_query)
118
+
119
+ embed_model = _get_embedding_model()
120
+ output = embed_model.encode(
121
+ [masked_query],
122
+ return_dense=True,
123
+ return_sparse=True,
124
+ return_colbert_vecs=False,
125
+ )
126
+ dense_vector: list[float] = output["dense_vecs"][0].tolist()
127
+ sparse_weights: dict[int, float] = output["lexical_weights"][0]
128
+
129
+ sparse_indices = list(sparse_weights.keys())
130
+ sparse_values = [sparse_weights[i] for i in sparse_indices]
131
+
132
+ bm25_ranked_ids: list[str] = []
133
+ bm25, corpus, doc_ids = _load_bm25()
134
+ if bm25 is not None and doc_ids:
135
+ tokenized = masked_query.lower().split()
136
+ bm25_scores = bm25.get_scores(tokenized)
137
+ top_bm25_idx = sorted(
138
+ range(len(bm25_scores)), key=lambda i: bm25_scores[i], reverse=True
139
+ )[: settings.rrf_top_k]
140
+ bm25_ranked_ids = [doc_ids[i] for i in top_bm25_idx]
141
+
142
+ qdrant_ranked_ids: list[str] = []
143
+ qdrant_payload_map: dict[str, dict] = {}
144
+ qdrant_score_map: dict[str, float] = {}
145
+
146
+ try:
147
+ client = AsyncQdrantClient(host=settings.qdrant_host, port=settings.qdrant_port)
148
+ results = await client.search(
149
+ collection_name=settings.qdrant_collection,
150
+ query_vector=qmodels.NamedVector(
151
+ name=settings.qdrant_dense_vector_name,
152
+ vector=dense_vector,
153
+ ),
154
+ query_filter=qmodels.Filter(
155
+ must=[
156
+ qmodels.FieldCondition(
157
+ key="team_id",
158
+ match=qmodels.MatchValue(value=team_id),
159
+ )
160
+ ]
161
+ ),
162
+ limit=settings.rrf_top_k,
163
+ with_payload=True,
164
+ )
165
+ for hit in results:
166
+ doc_id = hit.payload.get("chunk_id", str(hit.id))
167
+ qdrant_ranked_ids.append(doc_id)
168
+ qdrant_payload_map[doc_id] = hit.payload
169
+ qdrant_score_map[doc_id] = hit.score
170
+
171
+ sparse_results = await client.search(
172
+ collection_name=settings.qdrant_collection,
173
+ query_vector=qmodels.NamedSparseVector(
174
+ name=settings.qdrant_sparse_vector_name,
175
+ vector=qmodels.SparseVector(
176
+ indices=sparse_indices,
177
+ values=sparse_values,
178
+ ),
179
+ ),
180
+ query_filter=qmodels.Filter(
181
+ must=[
182
+ qmodels.FieldCondition(
183
+ key="team_id",
184
+ match=qmodels.MatchValue(value=team_id),
185
+ )
186
+ ]
187
+ ),
188
+ limit=settings.rrf_top_k,
189
+ with_payload=True,
190
+ )
191
+ sparse_ranked_ids: list[str] = []
192
+ for hit in sparse_results:
193
+ doc_id = hit.payload.get("chunk_id", str(hit.id))
194
+ sparse_ranked_ids.append(doc_id)
195
+ qdrant_payload_map.setdefault(doc_id, hit.payload)
196
+ qdrant_score_map.setdefault(doc_id, hit.score)
197
+
198
+ await client.close()
199
+
200
+ except Exception:
201
+ logger.exception("Qdrant search failed β€” returning empty results")
202
+ sparse_ranked_ids = []
203
+
204
+ ranked_lists = [lst for lst in [qdrant_ranked_ids, sparse_ranked_ids, bm25_ranked_ids] if lst]
205
+ if not ranked_lists:
206
+ logger.warning("All retrieval sources returned empty β€” no results")
207
+ return []
208
+
209
+ rrf_scores = _reciprocal_rank_fusion(ranked_lists)
210
+ top_ids = sorted(rrf_scores, key=lambda x: rrf_scores[x], reverse=True)[: settings.rrf_top_k]
211
+
212
+ candidates: list[dict] = []
213
+ for doc_id in top_ids:
214
+ payload = qdrant_payload_map.get(doc_id)
215
+ if payload is None:
216
+ # BM25-only hit has no Qdrant payload β€” reconstruct from corpus
217
+ if doc_ids and doc_id in doc_ids:
218
+ idx = doc_ids.index(doc_id)
219
+ text = corpus[idx] if corpus else ""
220
+ payload = {"chunk_id": doc_id, "text": text, "source": "bm25", "source_type": "internal"}
221
+ else:
222
+ continue
223
+ candidates.append({"id": doc_id, "payload": payload, "rrf_score": rrf_scores[doc_id]})
224
+
225
+ if not candidates:
226
+ return []
227
+
228
+ reranker = _get_reranker()
229
+ pairs = [(masked_query, c["payload"].get("text", "")) for c in candidates]
230
+ try:
231
+ rerank_scores: list[float] = reranker.compute_score(pairs, normalize=True)
232
+ except Exception:
233
+ logger.exception("Reranker failed β€” falling back to RRF scores")
234
+ rerank_scores = [c["rrf_score"] for c in candidates]
235
+
236
+ for i, cand in enumerate(candidates):
237
+ cand["reranker_score"] = rerank_scores[i]
238
+
239
+ candidates.sort(key=lambda c: c["reranker_score"], reverse=True)
240
+ top_candidates = candidates[: settings.final_top_k]
241
+
242
+ chunks: list[RetrievedChunk] = []
243
+ for cand in top_candidates:
244
+ p = cand["payload"]
245
+ chunks.append(
246
+ RetrievedChunk(
247
+ chunk_id=p.get("chunk_id", cand["id"]),
248
+ text=p.get("text", ""),
249
+ source=p.get("source", "unknown"),
250
+ source_type=p.get("source_type", "internal"),
251
+ score=cand["rrf_score"],
252
+ reranker_score=cand["reranker_score"],
253
+ )
254
+ )
255
+
256
+ logger.info(
257
+ "doc_search: returning %d chunks, top reranker score=%.3f",
258
+ len(chunks),
259
+ chunks[0].reranker_score if chunks else 0.0,
260
+ )
261
+ return chunks
262
+
263
+
264
+ def compute_retrieval_confidence(chunks: list[RetrievedChunk]) -> str:
265
+ if not chunks:
266
+ return "low"
267
+ top_score = chunks[0].reranker_score or 0.0
268
+ if top_score >= settings.reranker_high_threshold:
269
+ return "high"
270
+ if top_score >= settings.reranker_medium_threshold:
271
+ return "medium"
272
+ return "low"
agent/tools/live_docs.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Live web documentation fetcher β€” Firecrawl + Tavily stub."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+
7
+ from agent.config import settings
8
+ from agent.models import RetrievedChunk
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+
13
+ async def run_live_docs(query: str, team_id: str) -> list[RetrievedChunk]:
14
+ """Stub β€” returns empty until FIRECRAWL_API_KEY or TAVILY_API_KEY are configured."""
15
+ if not settings.firecrawl_api_key and not settings.tavily_api_key:
16
+ logger.info("live_docs: no API keys configured, returning empty results")
17
+ return []
18
+
19
+ logger.warning("live_docs: stub returning empty results")
20
+ return []
agent/tools/summariser.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Summariser tool β€” condenses large chunk sets using Gemini Flash."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+
7
+ from agent.config import settings
8
+ from agent.models import RetrievedChunk
9
+ from agent.prompts import build_summariser_prompt
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ async def run_summariser(chunks: list[RetrievedChunk], query: str) -> str:
15
+ from agent.agents._gemini import call_gemini_text
16
+
17
+ if not chunks:
18
+ return ""
19
+
20
+ chunks_text = "\n\n".join(
21
+ f"[{c.source}] {c.text}" for c in chunks
22
+ )
23
+ prompt = build_summariser_prompt(chunks_text, query)
24
+
25
+ summary = await call_gemini_text(
26
+ model_name=settings.summariser_model,
27
+ system_prompt="You are a concise technical summariser. Summarise only what is in the provided text.",
28
+ user_message=prompt,
29
+ )
30
+ logger.info("summariser: produced %d-char summary", len(summary))
31
+ return summary
agent/tools/ticket_lookup.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Jira ticket lookup tool β€” interface stub, no live credentials yet."""
2
+
3
+ from __future__ import annotations
4
+
5
+ import logging
6
+
7
+ from agent.config import settings
8
+ from agent.models import RetrievedChunk
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+
13
+ async def run_ticket_lookup(query: str, team_id: str) -> list[RetrievedChunk]:
14
+ """Stub β€” returns empty until JIRA_BASE_URL and JIRA_API_TOKEN are configured."""
15
+ if not settings.jira_base_url or not settings.jira_api_token:
16
+ logger.info("ticket_lookup: Jira credentials not configured, returning empty results")
17
+ return []
18
+
19
+ logger.warning("ticket_lookup: stub returning empty results")
20
+ return []
ingestion/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from ingestion.api import router
2
+
3
+ __all__ = ["router"]
ingestion/api.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import uuid
5
+ from datetime import datetime
6
+
7
+ from fastapi import APIRouter, HTTPException, UploadFile
8
+ from fastapi.responses import JSONResponse
9
+
10
+ from ingestion.models import (
11
+ ConfluenceIngestRequest,
12
+ GithubIngestRequest,
13
+ IngestJobResponse,
14
+ IngestJobStatus,
15
+ IngestSourcePayload,
16
+ )
17
+
18
+ logger = logging.getLogger(__name__)
19
+
20
+ router = APIRouter(prefix="/ingest", tags=["ingest"])
21
+
22
+
23
+ def _dispatch(payload: IngestSourcePayload) -> IngestJobResponse:
24
+ from ingestion.jobs.ingest_job import run_ingest
25
+ from ingestion.models import IngestJobRecord
26
+ from ingestion.storage.supabase_store import get_client, upsert_job
27
+
28
+ task = run_ingest.delay(payload.model_dump())
29
+ job_id = task.id
30
+
31
+ record = IngestJobRecord(
32
+ job_id=job_id,
33
+ celery_task_id=task.id,
34
+ status=IngestJobStatus.pending,
35
+ source_type=payload.source_type,
36
+ team_id=payload.team_id,
37
+ created_at=datetime.utcnow(),
38
+ )
39
+ try:
40
+ upsert_job(record, client=get_client())
41
+ except Exception:
42
+ logger.exception("api: failed to persist job record for task %s", job_id)
43
+
44
+ return IngestJobResponse(job_id=job_id, status=IngestJobStatus.pending)
45
+
46
+
47
+ @router.post("/confluence", response_model=IngestJobResponse)
48
+ async def ingest_confluence(request: ConfluenceIngestRequest) -> IngestJobResponse:
49
+ payload = IngestSourcePayload(
50
+ source_type="confluence",
51
+ team_id=request.team_id,
52
+ params={"space_key": request.space_key, "page_ids": request.page_ids},
53
+ )
54
+ return _dispatch(payload)
55
+
56
+
57
+ @router.post("/github", response_model=IngestJobResponse)
58
+ async def ingest_github(request: GithubIngestRequest) -> IngestJobResponse:
59
+ payload = IngestSourcePayload(
60
+ source_type="github",
61
+ team_id=request.team_id,
62
+ params={
63
+ "repo_url": request.repo_url,
64
+ "path_filter": request.path_filter,
65
+ "branch": request.branch,
66
+ },
67
+ )
68
+ return _dispatch(payload)
69
+
70
+
71
+ @router.post("/upload", response_model=IngestJobResponse)
72
+ async def ingest_pdf(team_id: str, file: UploadFile) -> IngestJobResponse:
73
+ if not file.filename or not file.filename.lower().endswith(".pdf"):
74
+ raise HTTPException(status_code=400, detail="Only PDF files are accepted")
75
+
76
+ content = await file.read()
77
+ if not content:
78
+ raise HTTPException(status_code=400, detail="Uploaded file is empty")
79
+
80
+ # PDF bytes are not JSON-serialisable; store transiently and pass filename+content via task
81
+ # For production, upload to object storage (S3/GCS) and pass the URL instead
82
+ import base64
83
+
84
+ payload = IngestSourcePayload(
85
+ source_type="pdf",
86
+ team_id=team_id,
87
+ params={
88
+ "filename": file.filename,
89
+ "content": base64.b64encode(content).decode(),
90
+ },
91
+ )
92
+ return _dispatch(payload)
93
+
94
+
95
+ @router.get("/jobs/{job_id}", response_model=IngestJobResponse)
96
+ async def get_job_status(job_id: str) -> IngestJobResponse:
97
+ from ingestion.storage.supabase_store import get_job
98
+
99
+ record = get_job(job_id)
100
+ if record is None:
101
+ # Fall back to Celery task state if Supabase record not yet written
102
+ from ingestion.jobs.celery_app import celery_app
103
+
104
+ task = celery_app.AsyncResult(job_id)
105
+ state_map = {
106
+ "PENDING": IngestJobStatus.pending,
107
+ "STARTED": IngestJobStatus.running,
108
+ "SUCCESS": IngestJobStatus.completed,
109
+ "FAILURE": IngestJobStatus.failed,
110
+ }
111
+ status = state_map.get(task.state, IngestJobStatus.pending)
112
+ return IngestJobResponse(job_id=job_id, status=status)
113
+
114
+ return IngestJobResponse(
115
+ job_id=record["job_id"],
116
+ status=IngestJobStatus(record["status"]),
117
+ )
ingestion/config.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pydantic_settings import BaseSettings, SettingsConfigDict
2
+
3
+
4
+ class IngestionSettings(BaseSettings):
5
+ model_config = SettingsConfigDict(
6
+ env_file=".env",
7
+ env_file_encoding="utf-8",
8
+ case_sensitive=False,
9
+ extra="ignore",
10
+ )
11
+
12
+ google_api_key: str = ""
13
+ cag_model: str = "gemini-2.5-pro"
14
+
15
+ qdrant_host: str = "localhost"
16
+ qdrant_port: int = 6333
17
+ qdrant_collection: str = "knowledge_base"
18
+ qdrant_dense_vector_name: str = "dense"
19
+ qdrant_sparse_vector_name: str = "sparse"
20
+ qdrant_dense_size: int = 1024
21
+
22
+ supabase_url: str = ""
23
+ supabase_key: str = ""
24
+
25
+ redis_url: str = "redis://localhost:6379/0"
26
+
27
+ bge_embedding_model: str = "BAAI/bge-m3"
28
+ bge_reranker_model: str = "BAAI/bge-reranker-v2-m3"
29
+ gliner_model: str = "urchade/gliner_mediumv2.1"
30
+ spacy_model: str = "en_core_web_sm"
31
+
32
+ bm25_index_path: str = "data/bm25_index.pkl"
33
+
34
+ embed_batch_size: int = 32
35
+ chunk_target_tokens: int = 512
36
+ chunk_max_tokens: int = 768
37
+ chunk_overlap_ratio: float = 0.15
38
+
39
+ confluence_base_url: str = ""
40
+ confluence_token: str = ""
41
+ confluence_email: str = ""
42
+
43
+ github_token: str = ""
44
+ github_api_url: str = "https://api.github.com"
45
+ github_path_filter: str = "docs/"
46
+ github_branch: str = "main"
47
+
48
+ jira_base_url: str = ""
49
+ jira_api_token: str = ""
50
+ jira_project_key: str = ""
51
+
52
+ cag_lookback_days: int = 14
53
+ cag_max_tokens: int = 50_000
54
+
55
+
56
+ settings = IngestionSettings()
ingestion/jobs/__init__.py ADDED
File without changes
ingestion/jobs/cag_job.py ADDED
@@ -0,0 +1,185 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import logging
5
+ from datetime import datetime, timedelta
6
+ from typing import Any
7
+
8
+ from ingestion.jobs.celery_app import celery_app
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ _CAG_SYSTEM_PROMPT = """You are a technical project analyst for an enterprise engineering team.
13
+
14
+ Given a team's recent Jira activity and GitHub commits from the last 14 days, produce a structured project snapshot.
15
+
16
+ Return a JSON object with:
17
+ {
18
+ "summary": "<2-3 sentence executive summary of team activity>",
19
+ "active_areas": ["<area>", ...],
20
+ "recent_issues": [{"key": "<JIRA-123>", "title": "...", "status": "..."}],
21
+ "recent_commits": [{"sha": "<short sha>", "message": "...", "repo": "..."}],
22
+ "blockers": ["<blocker if evident from tickets>"],
23
+ "generated_at": "<ISO datetime>"
24
+ }
25
+
26
+ Be concise. Do not hallucinate. Use only the data provided."""
27
+
28
+
29
+ @celery_app.task(name="ingestion.jobs.cag_job.run_cag")
30
+ def run_cag() -> dict[str, Any]:
31
+ return asyncio.run(_run_cag_async())
32
+
33
+
34
+ async def _run_cag_async() -> dict[str, Any]:
35
+ from ingestion.storage.supabase_store import get_all_teams, get_client, update_cag_snapshot
36
+
37
+ sb = get_client()
38
+ teams = get_all_teams(client=sb)
39
+ logger.info("cag_job: processing %d teams", len(teams))
40
+
41
+ results: dict[str, str] = {}
42
+ for team in teams:
43
+ team_id = team["team_id"]
44
+ try:
45
+ snapshot = await _build_team_snapshot(team_id, sb)
46
+ update_cag_snapshot(team_id, snapshot, client=sb)
47
+ results[team_id] = "ok"
48
+ except Exception:
49
+ logger.exception("cag_job: failed to build snapshot for team %s", team_id)
50
+ results[team_id] = "error"
51
+
52
+ return results
53
+
54
+
55
+ async def _build_team_snapshot(team_id: str, sb: Any) -> str:
56
+ from ingestion.config import settings
57
+
58
+ since = datetime.utcnow() - timedelta(days=settings.cag_lookback_days)
59
+ since_str = since.strftime("%Y-%m-%d")
60
+
61
+ jira_text = await _fetch_jira_activity(since_str)
62
+ github_text = await _fetch_github_activity(team_id, sb, since_str)
63
+
64
+ combined = f"Jira activity (last {settings.cag_lookback_days} days):\n{jira_text}\n\nGitHub commits (last {settings.cag_lookback_days} days):\n{github_text}"
65
+
66
+ # Truncate to stay under token budget; rough estimate is 4 chars/token
67
+ max_chars = settings.cag_max_tokens * 4
68
+ if len(combined) > max_chars:
69
+ combined = combined[:max_chars]
70
+ logger.warning("cag_job: truncated input for team %s to ~%d tokens", team_id, settings.cag_max_tokens)
71
+
72
+ snapshot = await _call_gemini(combined)
73
+ return snapshot
74
+
75
+
76
+ async def _fetch_jira_activity(since: str) -> str:
77
+ from ingestion.config import settings
78
+
79
+ if not settings.jira_base_url or not settings.jira_api_token:
80
+ return "(Jira not configured)"
81
+
82
+ import base64
83
+ import httpx
84
+
85
+ jql = f'project = "{settings.jira_project_key}" AND updated >= "{since}" ORDER BY updated DESC'
86
+ url = f"{settings.jira_base_url}/rest/api/3/search"
87
+ credentials = base64.b64encode(
88
+ f"{settings.confluence_email}:{settings.jira_api_token}".encode()
89
+ ).decode()
90
+ headers = {"Authorization": f"Basic {credentials}", "Accept": "application/json"}
91
+
92
+ try:
93
+ async with httpx.AsyncClient(headers=headers, timeout=20) as client:
94
+ resp = await client.get(url, params={"jql": jql, "maxResults": 50, "fields": "summary,status,assignee,updated"})
95
+ resp.raise_for_status()
96
+ issues = resp.json().get("issues", [])
97
+
98
+ lines = []
99
+ for issue in issues:
100
+ f = issue["fields"]
101
+ status = f.get("status", {}).get("name", "?")
102
+ assignee = (f.get("assignee") or {}).get("displayName", "unassigned")
103
+ lines.append(f"- [{issue['key']}] {f.get('summary', '')} | {status} | {assignee}")
104
+ return "\n".join(lines) if lines else "(no recent Jira activity)"
105
+ except Exception:
106
+ logger.exception("cag_job: Jira fetch failed")
107
+ return "(Jira fetch failed)"
108
+
109
+
110
+ async def _fetch_github_activity(team_id: str, sb: Any, since: str) -> str:
111
+ from ingestion.config import settings
112
+
113
+ if not settings.github_token:
114
+ return "(GitHub not configured)"
115
+
116
+ import httpx
117
+
118
+ try:
119
+ result = (
120
+ sb.table("documents")
121
+ .select("source_url, metadata")
122
+ .eq("team_id", team_id)
123
+ .eq("source_type", "github")
124
+ .execute()
125
+ )
126
+ repos = result.data or []
127
+ except Exception:
128
+ logger.exception("cag_job: failed to fetch repos for team %s", team_id)
129
+ return "(GitHub repo lookup failed)"
130
+
131
+ headers = {
132
+ "Authorization": f"Bearer {settings.github_token}",
133
+ "Accept": "application/vnd.github+json",
134
+ "X-GitHub-Api-Version": "2022-11-28",
135
+ }
136
+
137
+ lines: list[str] = []
138
+ async with httpx.AsyncClient(headers=headers, timeout=20) as client:
139
+ seen_repos: set[str] = set()
140
+ for doc in repos:
141
+ meta = doc.get("metadata") or {}
142
+ repo = meta.get("repo")
143
+ if not repo or repo in seen_repos:
144
+ continue
145
+ seen_repos.add(repo)
146
+
147
+ url = f"{settings.github_api_url}/repos/{repo}/commits"
148
+ try:
149
+ resp = await client.get(url, params={"since": f"{since}T00:00:00Z", "per_page": 20})
150
+ resp.raise_for_status()
151
+ for commit in resp.json():
152
+ sha = commit["sha"][:7]
153
+ message = commit["commit"]["message"].split("\n")[0]
154
+ lines.append(f"- [{repo}] {sha} {message}")
155
+ except Exception:
156
+ logger.exception("cag_job: commit fetch failed for %s", repo)
157
+
158
+ return "\n".join(lines) if lines else "(no recent GitHub activity)"
159
+
160
+
161
+ async def _call_gemini(user_message: str) -> str:
162
+ import asyncio
163
+
164
+ from langchain_core.messages import HumanMessage, SystemMessage
165
+ from langchain_google_genai import ChatGoogleGenerativeAI
166
+
167
+ from ingestion.config import settings
168
+
169
+ llm = ChatGoogleGenerativeAI(
170
+ model=settings.cag_model,
171
+ google_api_key=settings.google_api_key,
172
+ temperature=0.0,
173
+ )
174
+ messages = [SystemMessage(content=_CAG_SYSTEM_PROMPT), HumanMessage(content=user_message)]
175
+
176
+ for attempt in range(3):
177
+ try:
178
+ response = await llm.ainvoke(messages)
179
+ return response.content
180
+ except Exception as exc:
181
+ if attempt == 2:
182
+ raise
183
+ await asyncio.sleep(2 ** attempt)
184
+
185
+ raise RuntimeError("Unreachable")
ingestion/jobs/celery_app.py ADDED
@@ -0,0 +1,33 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from celery import Celery
4
+ from celery.schedules import crontab
5
+
6
+ from ingestion.config import settings
7
+
8
+ celery_app = Celery(
9
+ "ingestion",
10
+ broker=settings.redis_url,
11
+ backend=settings.redis_url,
12
+ include=[
13
+ "ingestion.jobs.ingest_job",
14
+ "ingestion.jobs.cag_job",
15
+ ],
16
+ )
17
+
18
+ celery_app.conf.update(
19
+ task_serializer="json",
20
+ result_serializer="json",
21
+ accept_content=["json"],
22
+ timezone="UTC",
23
+ enable_utc=True,
24
+ task_track_started=True,
25
+ result_expires=86400,
26
+ )
27
+
28
+ celery_app.conf.beat_schedule = {
29
+ "cag-nightly": {
30
+ "task": "ingestion.jobs.cag_job.run_cag",
31
+ "schedule": crontab(hour=2, minute=0),
32
+ }
33
+ }
ingestion/jobs/ingest_job.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import asyncio
4
+ import logging
5
+ from datetime import datetime
6
+ from typing import Any
7
+
8
+ from celery import Task
9
+
10
+ from ingestion.jobs.celery_app import celery_app
11
+ from ingestion.models import IngestJobRecord, IngestJobStatus, IngestSourcePayload
12
+
13
+ logger = logging.getLogger(__name__)
14
+
15
+ _SOURCE_REGISTRY: dict[str, Any] = {}
16
+
17
+
18
+ def _get_source_registry() -> dict[str, Any]:
19
+ if not _SOURCE_REGISTRY:
20
+ from ingestion.sources.confluence import ConfluenceSource
21
+ from ingestion.sources.github import GithubSource
22
+ from ingestion.sources.jira import JiraSource
23
+ from ingestion.sources.pdf import PDFSource
24
+
25
+ _SOURCE_REGISTRY.update(
26
+ {
27
+ "confluence": ConfluenceSource,
28
+ "github": GithubSource,
29
+ "pdf": PDFSource,
30
+ "jira": JiraSource,
31
+ }
32
+ )
33
+ return _SOURCE_REGISTRY
34
+
35
+
36
+ @celery_app.task(bind=True, name="ingestion.jobs.ingest_job.run_ingest", max_retries=2)
37
+ def run_ingest(self: Task, payload: dict[str, Any]) -> dict[str, Any]:
38
+ job_id = self.request.id
39
+ return asyncio.run(_run_ingest_async(job_id, IngestSourcePayload(**payload)))
40
+
41
+
42
+ async def _run_ingest_async(job_id: str, payload: IngestSourcePayload) -> dict[str, Any]:
43
+ from ingestion.pipeline.chunker import chunk_document
44
+ from ingestion.pipeline.embedder import embed_chunks
45
+ from ingestion.pipeline.pii_masker import mask_pii
46
+ from ingestion.storage.bm25_store import rebuild_from_supabase
47
+ from ingestion.storage.qdrant_store import delete_chunks_for_doc, upsert_chunks as qdrant_upsert
48
+ from ingestion.storage.supabase_store import (
49
+ delete_chunks_for_doc as sb_delete_chunks,
50
+ get_client,
51
+ upsert_chunks as sb_upsert_chunks,
52
+ upsert_document,
53
+ upsert_job,
54
+ )
55
+
56
+ sb = get_client()
57
+ job = IngestJobRecord(
58
+ job_id=job_id,
59
+ celery_task_id=job_id,
60
+ status=IngestJobStatus.running,
61
+ source_type=payload.source_type,
62
+ team_id=payload.team_id,
63
+ )
64
+ upsert_job(job, client=sb)
65
+
66
+ total_chunks = 0
67
+
68
+ try:
69
+ registry = _get_source_registry()
70
+ source_cls = registry.get(payload.source_type)
71
+ if source_cls is None:
72
+ raise ValueError(f"Unknown source type: {payload.source_type}")
73
+
74
+ if payload.source_type == "github":
75
+ # GithubSource requires supabase_client for SHA change-detection
76
+ source = source_cls(team_id=payload.team_id, supabase_client=sb, **payload.params)
77
+ elif payload.source_type == "pdf":
78
+ # content arrives as base64 string because Celery serialises to JSON
79
+ import base64
80
+ params = dict(payload.params)
81
+ if isinstance(params.get("content"), str):
82
+ params["content"] = base64.b64decode(params["content"])
83
+ source = source_cls(team_id=payload.team_id, **params)
84
+ else:
85
+ source = source_cls(team_id=payload.team_id, **payload.params)
86
+
87
+ raw_docs = await source.fetch()
88
+ logger.info("ingest_job: fetched %d documents from %s", len(raw_docs), payload.source_type)
89
+
90
+ for doc in raw_docs:
91
+ # Delete stale vectors and chunk records before re-ingesting the same document
92
+ delete_chunks_for_doc(doc.doc_id)
93
+ sb_delete_chunks(doc.doc_id, client=sb)
94
+
95
+ chunks = chunk_document(doc)
96
+ if not chunks:
97
+ logger.warning("ingest_job: no chunks produced for doc_id=%s", doc.doc_id)
98
+ continue
99
+
100
+ # Mask PII in chunk text before embedding or storing
101
+ for chunk in chunks:
102
+ chunk.text = mask_pii(chunk.text)
103
+
104
+ embedded = embed_chunks(chunks)
105
+ qdrant_upsert(embedded)
106
+ sb_upsert_chunks(chunks, client=sb)
107
+ upsert_document(doc, client=sb)
108
+ total_chunks += len(chunks)
109
+ logger.info("ingest_job: ingested %d chunks for doc_id=%s", len(chunks), doc.doc_id)
110
+
111
+ rebuild_from_supabase()
112
+
113
+ job.status = IngestJobStatus.completed
114
+ job.completed_at = datetime.utcnow()
115
+ job.chunks_ingested = total_chunks
116
+
117
+ except Exception as exc:
118
+ logger.exception("ingest_job: job %s failed", job_id)
119
+ job.status = IngestJobStatus.failed
120
+ job.completed_at = datetime.utcnow()
121
+ job.error = str(exc)
122
+
123
+ upsert_job(job, client=sb)
124
+ return {"job_id": job_id, "status": job.status.value, "chunks_ingested": total_chunks}
ingestion/models.py ADDED
@@ -0,0 +1,86 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from datetime import datetime
4
+ from enum import Enum
5
+ from typing import Any, Literal, Optional
6
+
7
+ from pydantic import BaseModel, Field
8
+
9
+
10
+ class RawDocument(BaseModel):
11
+ doc_id: str
12
+ title: str
13
+ content: str
14
+ source_url: str
15
+ source_type: str
16
+ team_id: str
17
+ metadata: dict[str, Any] = Field(default_factory=dict)
18
+ fetched_at: datetime = Field(default_factory=datetime.utcnow)
19
+
20
+
21
+ class DocumentChunk(BaseModel):
22
+ chunk_id: str
23
+ doc_id: str
24
+ text: str
25
+ source: str
26
+ source_type: str
27
+ team_id: str
28
+ chunk_index: int
29
+ metadata: dict[str, Any] = Field(default_factory=dict)
30
+
31
+
32
+ class EmbeddedChunk(BaseModel):
33
+ chunk_id: str
34
+ doc_id: str
35
+ text: str
36
+ source: str
37
+ source_type: str
38
+ team_id: str
39
+ chunk_index: int
40
+ dense_vector: list[float]
41
+ sparse_indices: list[int]
42
+ sparse_values: list[float]
43
+ metadata: dict[str, Any] = Field(default_factory=dict)
44
+
45
+
46
+ class IngestJobStatus(str, Enum):
47
+ pending = "pending"
48
+ running = "running"
49
+ completed = "completed"
50
+ failed = "failed"
51
+
52
+
53
+ class IngestJobRecord(BaseModel):
54
+ job_id: str
55
+ celery_task_id: str
56
+ status: IngestJobStatus
57
+ source_type: str
58
+ team_id: str
59
+ created_at: datetime = Field(default_factory=datetime.utcnow)
60
+ completed_at: Optional[datetime] = None
61
+ error: Optional[str] = None
62
+ chunks_ingested: int = 0
63
+
64
+
65
+ class IngestSourcePayload(BaseModel):
66
+ source_type: Literal["confluence", "github", "pdf", "jira"]
67
+ team_id: str
68
+ params: dict[str, Any] = Field(default_factory=dict)
69
+
70
+
71
+ class ConfluenceIngestRequest(BaseModel):
72
+ space_key: str
73
+ team_id: str
74
+ page_ids: Optional[list[str]] = None
75
+
76
+
77
+ class GithubIngestRequest(BaseModel):
78
+ repo_url: str
79
+ team_id: str
80
+ path_filter: str = "docs/"
81
+ branch: str = "main"
82
+
83
+
84
+ class IngestJobResponse(BaseModel):
85
+ job_id: str
86
+ status: IngestJobStatus
ingestion/pipeline/__init__.py ADDED
File without changes
ingestion/pipeline/chunker.py ADDED
@@ -0,0 +1,96 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import hashlib
4
+ import logging
5
+ from typing import Optional
6
+
7
+ import spacy
8
+
9
+ from ingestion.config import settings
10
+ from ingestion.models import DocumentChunk, RawDocument
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+ _nlp: Optional[spacy.language.Language] = None
15
+
16
+
17
+ def _get_nlp() -> spacy.language.Language:
18
+ global _nlp
19
+ if _nlp is None:
20
+ logger.info("chunker: loading spacy model %s", settings.spacy_model)
21
+ _nlp = spacy.load(settings.spacy_model, disable=["ner", "parser"])
22
+ _nlp.enable_pipe("senter")
23
+ return _nlp
24
+
25
+
26
+ def _token_count(nlp: spacy.language.Language, text: str) -> int:
27
+ return len(nlp.tokenizer(text))
28
+
29
+
30
+ def chunk_document(doc: RawDocument) -> list[DocumentChunk]:
31
+ nlp = _get_nlp()
32
+ spacy_doc = nlp(doc.content)
33
+ sentences = [s.text.strip() for s in spacy_doc.sents if s.text.strip()]
34
+
35
+ if not sentences:
36
+ return []
37
+
38
+ token_counts = [_token_count(nlp, s) for s in sentences]
39
+ target = settings.chunk_target_tokens
40
+ hard_max = settings.chunk_max_tokens
41
+ overlap_budget = int(target * settings.chunk_overlap_ratio)
42
+
43
+ chunks: list[DocumentChunk] = []
44
+ chunk_start = 0
45
+
46
+ while chunk_start < len(sentences):
47
+ accumulated = 0
48
+ chunk_end = chunk_start
49
+
50
+ while chunk_end < len(sentences):
51
+ next_count = token_counts[chunk_end]
52
+ # Force-include at least one sentence even if it exceeds hard_max
53
+ if accumulated + next_count > hard_max and accumulated > 0:
54
+ break
55
+ accumulated += next_count
56
+ chunk_end += 1
57
+ if accumulated >= target:
58
+ break
59
+
60
+ text = " ".join(sentences[chunk_start:chunk_end])
61
+ chunk_id = hashlib.sha256(
62
+ f"{doc.doc_id}:{chunk_start}:{chunk_end}".encode()
63
+ ).hexdigest()
64
+ chunks.append(
65
+ DocumentChunk(
66
+ chunk_id=chunk_id,
67
+ doc_id=doc.doc_id,
68
+ text=text,
69
+ source=doc.source_url,
70
+ source_type=doc.source_type,
71
+ team_id=doc.team_id,
72
+ chunk_index=len(chunks),
73
+ metadata={**doc.metadata, "title": doc.title},
74
+ )
75
+ )
76
+
77
+ # Walk back from chunk_end to find the overlap window for the next chunk
78
+ overlap_tokens = 0
79
+ overlap_start = chunk_end
80
+ while overlap_start > chunk_start + 1:
81
+ candidate = overlap_start - 1
82
+ if overlap_tokens + token_counts[candidate] <= overlap_budget:
83
+ overlap_tokens += token_counts[candidate]
84
+ overlap_start = candidate
85
+ else:
86
+ break
87
+
88
+ # Guard: if overlap would not advance the cursor, step forward unconditionally
89
+ next_start = overlap_start if overlap_start > chunk_start else chunk_end
90
+ if next_start <= chunk_start:
91
+ next_start = chunk_end
92
+
93
+ chunk_start = next_start
94
+
95
+ logger.debug("chunker: %s -> %d chunks", doc.doc_id, len(chunks))
96
+ return chunks
ingestion/pipeline/embedder.py ADDED
@@ -0,0 +1,75 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from typing import Optional
5
+
6
+ from FlagEmbedding import BGEM3FlagModel
7
+
8
+ from ingestion.config import settings
9
+ from ingestion.models import DocumentChunk, EmbeddedChunk
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+ # loaded once at module level β€” BGE-M3 is large; per-request init is unacceptable latency
14
+ _model: Optional[BGEM3FlagModel] = None
15
+
16
+
17
+ def _get_model() -> BGEM3FlagModel:
18
+ global _model
19
+ if _model is None:
20
+ logger.info("embedder: loading BGE-M3 model %s", settings.bge_embedding_model)
21
+ _model = BGEM3FlagModel(settings.bge_embedding_model, use_fp16=True)
22
+ return _model
23
+
24
+
25
+ def embed_chunks(chunks: list[DocumentChunk]) -> list[EmbeddedChunk]:
26
+ if not chunks:
27
+ return []
28
+
29
+ model = _get_model()
30
+ batch_size = settings.embed_batch_size
31
+ embedded: list[EmbeddedChunk] = []
32
+
33
+ for batch_start in range(0, len(chunks), batch_size):
34
+ batch = chunks[batch_start : batch_start + batch_size]
35
+ texts = [c.text for c in batch]
36
+
37
+ output = model.encode(
38
+ texts,
39
+ return_dense=True,
40
+ return_sparse=True,
41
+ return_colbert_vecs=False,
42
+ batch_size=batch_size,
43
+ )
44
+
45
+ for i, chunk in enumerate(batch):
46
+ dense_vector: list[float] = output["dense_vecs"][i].tolist()
47
+ # lexical_weights keys are token ids (int); values are float weights
48
+ sparse_weights: dict[int, float] = output["lexical_weights"][i]
49
+ sparse_indices = list(sparse_weights.keys())
50
+ sparse_values = [sparse_weights[k] for k in sparse_indices]
51
+
52
+ embedded.append(
53
+ EmbeddedChunk(
54
+ chunk_id=chunk.chunk_id,
55
+ doc_id=chunk.doc_id,
56
+ text=chunk.text,
57
+ source=chunk.source,
58
+ source_type=chunk.source_type,
59
+ team_id=chunk.team_id,
60
+ chunk_index=chunk.chunk_index,
61
+ dense_vector=dense_vector,
62
+ sparse_indices=sparse_indices,
63
+ sparse_values=sparse_values,
64
+ metadata=chunk.metadata,
65
+ )
66
+ )
67
+
68
+ logger.debug(
69
+ "embedder: embedded batch %d-%d of %d",
70
+ batch_start,
71
+ batch_start + len(batch),
72
+ len(chunks),
73
+ )
74
+
75
+ return embedded
ingestion/pipeline/pii_masker.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from typing import Optional
5
+
6
+ from gliner import GLiNER
7
+
8
+ from ingestion.config import settings
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ _PII_ENTITY_TYPES = [
13
+ "person",
14
+ "email",
15
+ "phone",
16
+ "ssn",
17
+ "credit_card",
18
+ "address",
19
+ "date_of_birth",
20
+ ]
21
+
22
+ _model: Optional[GLiNER] = None
23
+
24
+
25
+ def _get_model() -> GLiNER:
26
+ global _model
27
+ if _model is None:
28
+ logger.info("pii_masker: loading GLiNER model %s", settings.gliner_model)
29
+ _model = GLiNER.from_pretrained(settings.gliner_model)
30
+ return _model
31
+
32
+
33
+ def mask_pii(text: str) -> str:
34
+ try:
35
+ model = _get_model()
36
+ entities = model.predict_entities(text, _PII_ENTITY_TYPES, threshold=0.5)
37
+ # iterate reverse so substring replacements don't shift later indices
38
+ for ent in sorted(entities, key=lambda e: e["start"], reverse=True):
39
+ text = text[: ent["start"]] + "[REDACTED]" + text[ent["end"] :]
40
+ return text
41
+ except Exception:
42
+ logger.exception("pii_masker: masking failed; using original text")
43
+ return text
44
+
45
+
46
+ def mask_chunks(texts: list[str]) -> list[str]:
47
+ return [mask_pii(t) for t in texts]
ingestion/pipeline/reranker.py ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from typing import Optional
5
+
6
+ from FlagEmbedding import FlagReranker
7
+
8
+ from ingestion.config import settings
9
+
10
+ logger = logging.getLogger(__name__)
11
+
12
+ _reranker: Optional[FlagReranker] = None
13
+
14
+
15
+ def _get_reranker() -> FlagReranker:
16
+ global _reranker
17
+ if _reranker is None:
18
+ logger.info("reranker: loading model %s", settings.bge_reranker_model)
19
+ _reranker = FlagReranker(settings.bge_reranker_model, use_fp16=True)
20
+ return _reranker
21
+
22
+
23
+ def rerank(query: str, texts: list[str], normalize: bool = True) -> list[float]:
24
+ if not texts:
25
+ return []
26
+ reranker = _get_reranker()
27
+ pairs = [(query, t) for t in texts]
28
+ try:
29
+ return reranker.compute_score(pairs, normalize=normalize)
30
+ except Exception:
31
+ logger.exception("reranker: compute_score failed")
32
+ return [0.0] * len(texts)
ingestion/sources/__init__.py ADDED
File without changes
ingestion/sources/base.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ from abc import ABC, abstractmethod
4
+
5
+ from ingestion.models import RawDocument
6
+
7
+
8
+ class BaseSource(ABC):
9
+ @abstractmethod
10
+ async def fetch(self) -> list[RawDocument]:
11
+ ...
12
+
13
+ @abstractmethod
14
+ def source_type(self) -> str:
15
+ ...
ingestion/sources/confluence.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import hashlib
4
+ import logging
5
+ import re
6
+ from datetime import datetime
7
+ from typing import Optional
8
+
9
+ import httpx
10
+
11
+ from ingestion.config import settings
12
+ from ingestion.models import RawDocument
13
+ from ingestion.sources.base import BaseSource
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+ _HTML_TAG_RE = re.compile(r"<[^>]+>")
18
+ _WHITESPACE_RE = re.compile(r"\s{2,}")
19
+
20
+
21
+ def _strip_html(html: str) -> str:
22
+ text = _HTML_TAG_RE.sub(" ", html)
23
+ return _WHITESPACE_RE.sub(" ", text).strip()
24
+
25
+
26
+ class ConfluenceSource(BaseSource):
27
+ def __init__(
28
+ self,
29
+ team_id: str,
30
+ space_key: str,
31
+ page_ids: Optional[list[str]] = None,
32
+ base_url: str = "",
33
+ token: str = "",
34
+ email: str = "",
35
+ ) -> None:
36
+ self._team_id = team_id
37
+ self._space_key = space_key
38
+ self._page_ids = page_ids
39
+ self._base_url = base_url or settings.confluence_base_url
40
+ self._token = token or settings.confluence_token
41
+ self._email = email or settings.confluence_email
42
+
43
+ def source_type(self) -> str:
44
+ return "confluence"
45
+
46
+ def _auth_headers(self) -> dict[str, str]:
47
+ import base64
48
+ credentials = base64.b64encode(f"{self._email}:{self._token}".encode()).decode()
49
+ return {"Authorization": f"Basic {credentials}", "Accept": "application/json"}
50
+
51
+ async def fetch(self) -> list[RawDocument]:
52
+ if not self._base_url or not self._token:
53
+ logger.warning("confluence: credentials not configured, returning empty")
54
+ return []
55
+
56
+ async with httpx.AsyncClient(headers=self._auth_headers(), timeout=30) as client:
57
+ if self._page_ids:
58
+ return await self._fetch_pages(client, self._page_ids)
59
+ return await self._fetch_space(client)
60
+
61
+ async def _fetch_space(self, client: httpx.AsyncClient) -> list[RawDocument]:
62
+ docs: list[RawDocument] = []
63
+ url = f"{self._base_url}/wiki/rest/api/content"
64
+ params: dict = {"spaceKey": self._space_key, "expand": "body.storage", "limit": 50, "start": 0}
65
+
66
+ while True:
67
+ resp = await client.get(url, params=params)
68
+ resp.raise_for_status()
69
+ data = resp.json()
70
+ for page in data.get("results", []):
71
+ doc = self._page_to_raw_document(page)
72
+ if doc:
73
+ docs.append(doc)
74
+ if data.get("_links", {}).get("next"):
75
+ params["start"] += params["limit"]
76
+ else:
77
+ break
78
+
79
+ logger.info("confluence: fetched %d pages from space %s", len(docs), self._space_key)
80
+ return docs
81
+
82
+ async def _fetch_pages(self, client: httpx.AsyncClient, page_ids: list[str]) -> list[RawDocument]:
83
+ docs: list[RawDocument] = []
84
+ for page_id in page_ids:
85
+ url = f"{self._base_url}/wiki/rest/api/content/{page_id}"
86
+ try:
87
+ resp = await client.get(url, params={"expand": "body.storage"})
88
+ resp.raise_for_status()
89
+ doc = self._page_to_raw_document(resp.json())
90
+ if doc:
91
+ docs.append(doc)
92
+ except Exception:
93
+ logger.exception("confluence: failed to fetch page %s", page_id)
94
+ return docs
95
+
96
+ def _page_to_raw_document(self, page: dict) -> Optional[RawDocument]:
97
+ try:
98
+ page_id = page["id"]
99
+ title = page.get("title", "Untitled")
100
+ html_body = page.get("body", {}).get("storage", {}).get("value", "")
101
+ content = _strip_html(html_body)
102
+ if not content.strip():
103
+ return None
104
+ source_url = f"{self._base_url}/wiki/spaces/{self._space_key}/pages/{page_id}"
105
+ doc_id = hashlib.sha256(f"confluence:{page_id}".encode()).hexdigest()
106
+ return RawDocument(
107
+ doc_id=doc_id,
108
+ title=title,
109
+ content=content,
110
+ source_url=source_url,
111
+ source_type="confluence",
112
+ team_id=self._team_id,
113
+ metadata={"page_id": page_id, "space_key": self._space_key},
114
+ )
115
+ except Exception:
116
+ logger.exception("confluence: failed to parse page payload")
117
+ return None
ingestion/sources/github.py ADDED
@@ -0,0 +1,163 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import base64
4
+ import hashlib
5
+ import logging
6
+ from typing import Optional
7
+ from urllib.parse import urlparse
8
+
9
+ import httpx
10
+ from supabase import Client
11
+
12
+ from ingestion.config import settings
13
+ from ingestion.models import RawDocument
14
+ from ingestion.sources.base import BaseSource
15
+
16
+ logger = logging.getLogger(__name__)
17
+
18
+
19
+ def _parse_owner_repo(repo_url: str) -> tuple[str, str]:
20
+ path = urlparse(repo_url).path.strip("/")
21
+ parts = path.split("/")
22
+ if len(parts) < 2:
23
+ raise ValueError(f"Cannot parse owner/repo from URL: {repo_url}")
24
+ return parts[0], parts[1]
25
+
26
+
27
+ class GithubSource(BaseSource):
28
+ def __init__(
29
+ self,
30
+ team_id: str,
31
+ repo_url: str,
32
+ supabase_client: Client,
33
+ path_filter: str = "",
34
+ branch: str = "",
35
+ token: str = "",
36
+ ) -> None:
37
+ self._team_id = team_id
38
+ self._repo_url = repo_url
39
+ self._supabase = supabase_client
40
+ self._path_filter = path_filter or settings.github_path_filter
41
+ self._branch = branch or settings.github_branch
42
+ self._token = token or settings.github_token
43
+ self._owner, self._repo = _parse_owner_repo(repo_url)
44
+
45
+ def source_type(self) -> str:
46
+ return "github"
47
+
48
+ def _headers(self) -> dict[str, str]:
49
+ h = {"Accept": "application/vnd.github+json", "X-GitHub-Api-Version": "2022-11-28"}
50
+ if self._token:
51
+ h["Authorization"] = f"Bearer {self._token}"
52
+ return h
53
+
54
+ async def fetch(self) -> list[RawDocument]:
55
+ async with httpx.AsyncClient(headers=self._headers(), timeout=30) as client:
56
+ latest_sha = await self._get_latest_commit_sha(client)
57
+ stored_sha = self._get_stored_sha()
58
+
59
+ if latest_sha and latest_sha == stored_sha:
60
+ logger.info("github: %s/%s unchanged at SHA %s β€” skipping", self._owner, self._repo, latest_sha)
61
+ return []
62
+
63
+ docs = await self._fetch_markdown_files(client, latest_sha or self._branch)
64
+
65
+ if docs and latest_sha:
66
+ self._store_sha(latest_sha)
67
+
68
+ logger.info("github: fetched %d documents from %s/%s", len(docs), self._owner, self._repo)
69
+ return docs
70
+
71
+ async def _get_latest_commit_sha(self, client: httpx.AsyncClient) -> Optional[str]:
72
+ url = f"{settings.github_api_url}/repos/{self._owner}/{self._repo}/commits"
73
+ params = {"path": self._path_filter, "per_page": 1, "sha": self._branch}
74
+ try:
75
+ resp = await client.get(url, params=params)
76
+ resp.raise_for_status()
77
+ commits = resp.json()
78
+ return commits[0]["sha"] if commits else None
79
+ except Exception:
80
+ logger.exception("github: failed to get latest commit SHA")
81
+ return None
82
+
83
+ def _get_stored_sha(self) -> Optional[str]:
84
+ repo_doc_id = self._repo_doc_id()
85
+ try:
86
+ result = (
87
+ self._supabase.table("documents")
88
+ .select("last_commit_sha")
89
+ .eq("doc_id", repo_doc_id)
90
+ .maybe_single()
91
+ .execute()
92
+ )
93
+ return result.data["last_commit_sha"] if result.data else None
94
+ except Exception:
95
+ logger.exception("github: failed to fetch stored SHA")
96
+ return None
97
+
98
+ def _store_sha(self, sha: str) -> None:
99
+ repo_doc_id = self._repo_doc_id()
100
+ try:
101
+ self._supabase.table("documents").upsert(
102
+ {
103
+ "doc_id": repo_doc_id,
104
+ "title": f"{self._owner}/{self._repo}",
105
+ "source_url": self._repo_url,
106
+ "source_type": "github",
107
+ "team_id": self._team_id,
108
+ "last_commit_sha": sha,
109
+ },
110
+ on_conflict="doc_id",
111
+ ).execute()
112
+ except Exception:
113
+ logger.exception("github: failed to store new SHA")
114
+
115
+ def _repo_doc_id(self) -> str:
116
+ return hashlib.sha256(f"github:repo:{self._owner}/{self._repo}".encode()).hexdigest()
117
+
118
+ async def _fetch_markdown_files(self, client: httpx.AsyncClient, ref: str) -> list[RawDocument]:
119
+ url = f"{settings.github_api_url}/repos/{self._owner}/{self._repo}/git/trees/{ref}"
120
+ try:
121
+ resp = await client.get(url, params={"recursive": "1"})
122
+ resp.raise_for_status()
123
+ tree = resp.json().get("tree", [])
124
+ except Exception:
125
+ logger.exception("github: failed to fetch file tree")
126
+ return []
127
+
128
+ md_paths = [
129
+ item["path"]
130
+ for item in tree
131
+ if item["type"] == "blob"
132
+ and item["path"].endswith(".md")
133
+ and item["path"].startswith(self._path_filter)
134
+ ]
135
+
136
+ docs: list[RawDocument] = []
137
+ for path in md_paths:
138
+ doc = await self._fetch_file(client, path)
139
+ if doc:
140
+ docs.append(doc)
141
+ return docs
142
+
143
+ async def _fetch_file(self, client: httpx.AsyncClient, path: str) -> Optional[RawDocument]:
144
+ url = f"{settings.github_api_url}/repos/{self._owner}/{self._repo}/contents/{path}"
145
+ try:
146
+ resp = await client.get(url, params={"ref": self._branch})
147
+ resp.raise_for_status()
148
+ data = resp.json()
149
+ content = base64.b64decode(data["content"]).decode("utf-8", errors="replace")
150
+ doc_id = hashlib.sha256(f"github:{self._owner}/{self._repo}/{path}".encode()).hexdigest()
151
+ source_url = data.get("html_url", f"{self._repo_url}/blob/{self._branch}/{path}")
152
+ return RawDocument(
153
+ doc_id=doc_id,
154
+ title=path.split("/")[-1].removesuffix(".md"),
155
+ content=content,
156
+ source_url=source_url,
157
+ source_type="github",
158
+ team_id=self._team_id,
159
+ metadata={"repo": f"{self._owner}/{self._repo}", "path": path, "branch": self._branch},
160
+ )
161
+ except Exception:
162
+ logger.exception("github: failed to fetch file %s", path)
163
+ return None
ingestion/sources/jira.py ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+
5
+ from ingestion.config import settings
6
+ from ingestion.models import RawDocument
7
+ from ingestion.sources.base import BaseSource
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class JiraSource(BaseSource):
13
+ def __init__(
14
+ self,
15
+ team_id: str,
16
+ project_key: str = "",
17
+ lookback_days: int = 30,
18
+ base_url: str = "",
19
+ api_token: str = "",
20
+ ) -> None:
21
+ self._team_id = team_id
22
+ self._project_key = project_key or settings.jira_project_key
23
+ self._lookback_days = lookback_days
24
+ self._base_url = base_url or settings.jira_base_url
25
+ self._api_token = api_token or settings.jira_api_token
26
+
27
+ def source_type(self) -> str:
28
+ return "jira"
29
+
30
+ async def fetch(self) -> list[RawDocument]:
31
+ """Stub β€” returns empty until JIRA_BASE_URL and JIRA_API_TOKEN are configured."""
32
+ if not self._base_url or not self._api_token:
33
+ logger.info("jira: credentials not configured, returning empty")
34
+ return []
35
+
36
+ logger.warning("jira: fetch stub returning empty results")
37
+ return []
38
+
39
+ # Real implementation shape (uncomment when credentials available):
40
+ #
41
+ # since = (datetime.utcnow() - timedelta(days=self._lookback_days)).strftime("%Y-%m-%d")
42
+ # jql = f'project = "{self._project_key}" AND updated >= "{since}" ORDER BY updated DESC'
43
+ # url = f"{self._base_url}/rest/api/3/search"
44
+ # import base64
45
+ # headers = {
46
+ # "Authorization": "Basic " + base64.b64encode(
47
+ # f"{settings.confluence_email}:{self._api_token}".encode()
48
+ # ).decode(),
49
+ # "Accept": "application/json",
50
+ # }
51
+ # async with httpx.AsyncClient(headers=headers, timeout=30) as client:
52
+ # resp = await client.get(url, params={"jql": jql, "maxResults": 100, "fields": "summary,description,status,assignee,updated"})
53
+ # resp.raise_for_status()
54
+ # issues = resp.json().get("issues", [])
55
+ # docs = []
56
+ # for issue in issues:
57
+ # f = issue["fields"]
58
+ # text = f"{f.get('summary', '')}\n{f.get('description') or ''}"
59
+ # doc_id = hashlib.sha256(f"jira:{issue['key']}".encode()).hexdigest()
60
+ # docs.append(RawDocument(
61
+ # doc_id=doc_id,
62
+ # title=f"{issue['key']}: {f.get('summary', '')}",
63
+ # content=text,
64
+ # source_url=f"{self._base_url}/browse/{issue['key']}",
65
+ # source_type="jira",
66
+ # team_id=self._team_id,
67
+ # metadata={"issue_key": issue["key"], "status": f.get("status", {}).get("name")},
68
+ # ))
69
+ # return docs
ingestion/sources/pdf.py ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import hashlib
4
+ import logging
5
+
6
+ import fitz # PyMuPDF
7
+
8
+ from ingestion.models import RawDocument
9
+ from ingestion.sources.base import BaseSource
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ class PDFSource(BaseSource):
15
+ def __init__(self, team_id: str, filename: str, content: bytes) -> None:
16
+ self._team_id = team_id
17
+ self._filename = filename
18
+ self._content = content
19
+
20
+ def source_type(self) -> str:
21
+ return "pdf"
22
+
23
+ async def fetch(self) -> list[RawDocument]:
24
+ try:
25
+ doc = fitz.open(stream=self._content, filetype="pdf")
26
+ pages_text: list[str] = []
27
+ for page in doc:
28
+ pages_text.append(page.get_text())
29
+ doc.close()
30
+
31
+ full_text = "\n\n".join(t for t in pages_text if t.strip())
32
+ if not full_text.strip():
33
+ logger.warning("pdf: no text extracted from %s", self._filename)
34
+ return []
35
+
36
+ doc_id = hashlib.sha256(self._content).hexdigest()
37
+ title = self._filename.removesuffix(".pdf")
38
+ return [
39
+ RawDocument(
40
+ doc_id=doc_id,
41
+ title=title,
42
+ content=full_text,
43
+ source_url=self._filename,
44
+ source_type="pdf",
45
+ team_id=self._team_id,
46
+ metadata={"filename": self._filename, "pages": len(pages_text)},
47
+ )
48
+ ]
49
+ except Exception:
50
+ logger.exception("pdf: failed to parse %s", self._filename)
51
+ return []
ingestion/storage/__init__.py ADDED
File without changes
ingestion/storage/bm25_store.py ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import pickle
5
+ from pathlib import Path
6
+
7
+ from rank_bm25 import BM25Okapi
8
+
9
+ from ingestion.config import settings
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ def rebuild_bm25_index(chunk_ids: list[str], texts: list[str]) -> None:
15
+ if not chunk_ids or not texts:
16
+ logger.warning("bm25_store: no chunks provided, skipping rebuild")
17
+ return
18
+
19
+ if len(chunk_ids) != len(texts):
20
+ raise ValueError("chunk_ids and texts must have equal length")
21
+
22
+ tokenized_corpus = [t.lower().split() for t in texts]
23
+ index = BM25Okapi(tokenized_corpus)
24
+
25
+ index_path = Path(settings.bm25_index_path)
26
+ index_path.parent.mkdir(parents=True, exist_ok=True)
27
+
28
+ with index_path.open("wb") as f:
29
+ pickle.dump({"index": index, "corpus": texts, "doc_ids": chunk_ids}, f)
30
+
31
+ logger.info("bm25_store: rebuilt index with %d documents -> %s", len(chunk_ids), index_path)
32
+
33
+
34
+ def rebuild_from_supabase() -> None:
35
+ from ingestion.storage.supabase_store import get_all_chunks
36
+
37
+ rows = get_all_chunks()
38
+ if not rows:
39
+ logger.warning("bm25_store: no chunks in Supabase, skipping rebuild")
40
+ return
41
+
42
+ chunk_ids = [r["chunk_id"] for r in rows]
43
+ texts = [r["text"] for r in rows]
44
+ rebuild_bm25_index(chunk_ids, texts)
ingestion/storage/qdrant_store.py ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ import uuid
5
+
6
+ from qdrant_client import QdrantClient
7
+ from qdrant_client.http import models as qmodels
8
+
9
+ from ingestion.config import settings
10
+ from ingestion.models import EmbeddedChunk
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+
15
+ def _chunk_uuid(chunk_id: str) -> str:
16
+ # stable UUID derived from chunk_id so re-ingestion overwrites the same point
17
+ return str(uuid.uuid5(uuid.NAMESPACE_DNS, chunk_id))
18
+
19
+
20
+ def _get_client() -> QdrantClient:
21
+ return QdrantClient(host=settings.qdrant_host, port=settings.qdrant_port)
22
+
23
+
24
+ def ensure_collection_exists() -> None:
25
+ client = _get_client()
26
+ existing = {c.name for c in client.get_collections().collections}
27
+ if settings.qdrant_collection in existing:
28
+ return
29
+
30
+ client.create_collection(
31
+ collection_name=settings.qdrant_collection,
32
+ vectors_config={
33
+ settings.qdrant_dense_vector_name: qmodels.VectorParams(
34
+ size=settings.qdrant_dense_size,
35
+ distance=qmodels.Distance.COSINE,
36
+ )
37
+ },
38
+ sparse_vectors_config={
39
+ settings.qdrant_sparse_vector_name: qmodels.SparseVectorParams(
40
+ index=qmodels.SparseIndexParams(on_disk=False)
41
+ )
42
+ },
43
+ )
44
+ logger.info("qdrant_store: created collection %s", settings.qdrant_collection)
45
+
46
+
47
+ def upsert_chunks(chunks: list[EmbeddedChunk]) -> None:
48
+ if not chunks:
49
+ return
50
+
51
+ try:
52
+ ensure_collection_exists()
53
+ client = _get_client()
54
+
55
+ points = [
56
+ qmodels.PointStruct(
57
+ id=_chunk_uuid(chunk.chunk_id),
58
+ vector={
59
+ settings.qdrant_dense_vector_name: chunk.dense_vector,
60
+ settings.qdrant_sparse_vector_name: qmodels.SparseVector(
61
+ indices=chunk.sparse_indices,
62
+ values=chunk.sparse_values,
63
+ ),
64
+ },
65
+ payload={
66
+ "chunk_id": chunk.chunk_id,
67
+ "doc_id": chunk.doc_id,
68
+ "text": chunk.text,
69
+ "source": chunk.source,
70
+ "source_type": chunk.source_type,
71
+ "team_id": chunk.team_id,
72
+ "chunk_index": chunk.chunk_index,
73
+ **chunk.metadata,
74
+ },
75
+ )
76
+ for chunk in chunks
77
+ ]
78
+
79
+ client.upsert(collection_name=settings.qdrant_collection, points=points)
80
+ logger.info("qdrant_store: upserted %d points", len(points))
81
+
82
+ except Exception:
83
+ logger.exception("qdrant_store: upsert failed")
84
+ raise
85
+
86
+
87
+ def delete_chunks_for_doc(doc_id: str) -> None:
88
+ try:
89
+ client = _get_client()
90
+ client.delete(
91
+ collection_name=settings.qdrant_collection,
92
+ points_selector=qmodels.FilterSelector(
93
+ filter=qmodels.Filter(
94
+ must=[qmodels.FieldCondition(key="doc_id", match=qmodels.MatchValue(value=doc_id))]
95
+ )
96
+ ),
97
+ )
98
+ logger.info("qdrant_store: deleted points for doc_id=%s", doc_id)
99
+ except Exception:
100
+ logger.exception("qdrant_store: delete failed for doc_id=%s", doc_id)
101
+ raise
ingestion/storage/supabase_store.py ADDED
@@ -0,0 +1,138 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import logging
4
+ from datetime import datetime
5
+ from typing import Any, Optional
6
+
7
+ from supabase import Client, create_client
8
+
9
+ from ingestion.config import settings
10
+ from ingestion.models import DocumentChunk, IngestJobRecord, IngestJobStatus, RawDocument
11
+
12
+ logger = logging.getLogger(__name__)
13
+
14
+
15
+ def get_client() -> Client:
16
+ return create_client(settings.supabase_url, settings.supabase_key)
17
+
18
+
19
+ def upsert_document(doc: RawDocument, client: Optional[Client] = None) -> None:
20
+ sb = client or get_client()
21
+ try:
22
+ sb.table("documents").upsert(
23
+ {
24
+ "doc_id": doc.doc_id,
25
+ "title": doc.title,
26
+ "source_url": doc.source_url,
27
+ "source_type": doc.source_type,
28
+ "team_id": doc.team_id,
29
+ "metadata": doc.metadata,
30
+ "last_commit_sha": doc.metadata.get("last_commit_sha"),
31
+ "updated_at": datetime.utcnow().isoformat(),
32
+ },
33
+ on_conflict="doc_id",
34
+ ).execute()
35
+ except Exception:
36
+ logger.exception("supabase_store: failed to upsert document %s", doc.doc_id)
37
+ raise
38
+
39
+
40
+ def upsert_chunks(chunks: list[DocumentChunk], client: Optional[Client] = None) -> None:
41
+ if not chunks:
42
+ return
43
+ sb = client or get_client()
44
+ try:
45
+ rows = [
46
+ {
47
+ "chunk_id": c.chunk_id,
48
+ "doc_id": c.doc_id,
49
+ "text": c.text,
50
+ "source": c.source,
51
+ "source_type": c.source_type,
52
+ "team_id": c.team_id,
53
+ "chunk_index": c.chunk_index,
54
+ }
55
+ for c in chunks
56
+ ]
57
+ sb.table("chunks").upsert(rows, on_conflict="chunk_id").execute()
58
+ logger.info("supabase_store: upserted %d chunks", len(rows))
59
+ except Exception:
60
+ logger.exception("supabase_store: failed to upsert chunks")
61
+ raise
62
+
63
+
64
+ def delete_chunks_for_doc(doc_id: str, client: Optional[Client] = None) -> None:
65
+ sb = client or get_client()
66
+ try:
67
+ sb.table("chunks").delete().eq("doc_id", doc_id).execute()
68
+ except Exception:
69
+ logger.exception("supabase_store: failed to delete chunks for doc_id=%s", doc_id)
70
+ raise
71
+
72
+
73
+ def upsert_job(record: IngestJobRecord, client: Optional[Client] = None) -> None:
74
+ sb = client or get_client()
75
+ try:
76
+ sb.table("ingest_jobs").upsert(
77
+ {
78
+ "job_id": record.job_id,
79
+ "celery_task_id": record.celery_task_id,
80
+ "status": record.status.value,
81
+ "source_type": record.source_type,
82
+ "team_id": record.team_id,
83
+ "created_at": record.created_at.isoformat(),
84
+ "completed_at": record.completed_at.isoformat() if record.completed_at else None,
85
+ "error": record.error,
86
+ "chunks_ingested": record.chunks_ingested,
87
+ },
88
+ on_conflict="job_id",
89
+ ).execute()
90
+ except Exception:
91
+ logger.exception("supabase_store: failed to upsert job %s", record.job_id)
92
+
93
+
94
+ def get_job(job_id: str, client: Optional[Client] = None) -> Optional[dict[str, Any]]:
95
+ sb = client or get_client()
96
+ try:
97
+ result = sb.table("ingest_jobs").select("*").eq("job_id", job_id).maybe_single().execute()
98
+ return result.data
99
+ except Exception:
100
+ logger.exception("supabase_store: failed to get job %s", job_id)
101
+ return None
102
+
103
+
104
+ def get_all_teams(client: Optional[Client] = None) -> list[dict[str, Any]]:
105
+ sb = client or get_client()
106
+ try:
107
+ result = sb.table("teams").select("team_id").execute()
108
+ return result.data or []
109
+ except Exception:
110
+ logger.exception("supabase_store: failed to fetch teams")
111
+ return []
112
+
113
+
114
+ def update_cag_snapshot(team_id: str, snapshot: str, client: Optional[Client] = None) -> None:
115
+ sb = client or get_client()
116
+ try:
117
+ sb.table("teams").upsert(
118
+ {
119
+ "team_id": team_id,
120
+ "cag_snapshot": snapshot,
121
+ "snapshot_at": datetime.utcnow().isoformat(),
122
+ },
123
+ on_conflict="team_id",
124
+ ).execute()
125
+ logger.info("supabase_store: updated CAG snapshot for team %s", team_id)
126
+ except Exception:
127
+ logger.exception("supabase_store: failed to update CAG snapshot for team %s", team_id)
128
+ raise
129
+
130
+
131
+ def get_all_chunks(client: Optional[Client] = None) -> list[dict[str, Any]]:
132
+ sb = client or get_client()
133
+ try:
134
+ result = sb.table("chunks").select("chunk_id, text").execute()
135
+ return result.data or []
136
+ except Exception:
137
+ logger.exception("supabase_store: failed to fetch all chunks")
138
+ return []
main.py ADDED
@@ -0,0 +1,19 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+
3
+ from fastapi import FastAPI
4
+
5
+ from agent.api import router as agent_router
6
+ from ingestion.api import router as ingestion_router
7
+
8
+ logging.basicConfig(
9
+ level=logging.INFO,
10
+ format="%(asctime)s %(levelname)s %(name)s: %(message)s",
11
+ )
12
+
13
+ app = FastAPI(
14
+ title="Enterprise Knowledge Copilot",
15
+ version="0.1.0",
16
+ )
17
+
18
+ app.include_router(agent_router)
19
+ app.include_router(ingestion_router)
requirements.txt CHANGED
@@ -1,9 +1,64 @@
1
- redis==5.0.1
2
- celery==5.3.4
3
- pydantic==2.5.0
4
- pydantic-settings==2.1.0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  aiohttp==3.9.1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  beautifulsoup4==4.12.2
7
  docling==0.7.0
8
- requests==2.31.0
9
- python-dotenv==1.0.0
 
1
+ # Core orchestration
2
+ langgraph==0.2.55
3
+ langchain-core==0.3.29
4
+ langchain-google-genai==2.0.9
5
+
6
+ # Embeddings and reranking
7
+ FlagEmbedding==1.2.11
8
+
9
+ # GLiNER for PII masking
10
+ gliner==0.2.13
11
+
12
+ # Vector database
13
+ qdrant-client==1.12.1
14
+
15
+ # BM25
16
+ rank-bm25==0.2.2
17
+
18
+ # API framework
19
+ fastapi==0.115.6
20
+ uvicorn[standard]==0.32.1
21
+
22
+ # Pydantic
23
+ pydantic==2.10.4
24
+ pydantic-settings==2.7.0
25
+
26
+ # Environment management
27
+ python-dotenv==1.0.1
28
+
29
+ # HTTP client
30
+ httpx==0.28.1
31
+ requests==2.31.0
32
  aiohttp==3.9.1
33
+
34
+ # Google AI SDK (transitive, pinned for stability)
35
+ google-generativeai==0.8.3
36
+ google-auth==2.37.0
37
+
38
+ # Numpy (required by FlagEmbedding)
39
+ numpy==1.26.4
40
+
41
+ # PyTorch CPU (FlagEmbedding dependency β€” use GPU build if available)
42
+ torch==2.5.1
43
+
44
+ # Transformers (FlagEmbedding dependency)
45
+ transformers==4.47.1
46
+
47
+ # Ingestion β€” document parsing
48
+ PyMuPDF==1.24.14
49
+
50
+ # Ingestion β€” NLP / chunking
51
+ spacy==3.8.3
52
+ # Run after install: python -m spacy download en_core_web_sm
53
+
54
+ # Ingestion β€” Supabase
55
+ supabase==2.11.0
56
+
57
+ # Ingestion β€” background jobs
58
+ celery==5.4.0
59
+ celery[redis]==5.4.0
60
+ redis==5.2.1
61
+
62
+ # Existing project dependencies
63
  beautifulsoup4==4.12.2
64
  docling==0.7.0
 
 
supabase/schema.sql ADDED
@@ -0,0 +1,81 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Documents table β€” one row per ingested document (PDF, Confluence page, GitHub file, Jira issue)
2
+ create table if not exists documents (
3
+ id uuid primary key default gen_random_uuid(),
4
+ doc_id text not null unique,
5
+ title text not null,
6
+ source_url text not null,
7
+ source_type text not null,
8
+ team_id text not null,
9
+ metadata jsonb not null default '{}',
10
+ last_commit_sha text,
11
+ created_at timestamptz not null default now(),
12
+ updated_at timestamptz not null default now()
13
+ );
14
+
15
+ create index if not exists documents_team_id_idx on documents (team_id);
16
+ create index if not exists documents_source_type_idx on documents (source_type);
17
+ create index if not exists documents_team_source_idx on documents (team_id, source_type);
18
+ -- Lets the CAG job find repos for a team via metadata->>'repo' without a full scan
19
+ create index if not exists documents_metadata_gin_idx on documents using gin (metadata);
20
+
21
+
22
+ -- Chunks table β€” one row per text chunk produced by the chunker
23
+ create table if not exists chunks (
24
+ id uuid primary key default gen_random_uuid(),
25
+ chunk_id text not null unique,
26
+ doc_id text not null references documents (doc_id) on delete cascade,
27
+ text text not null,
28
+ source text not null,
29
+ source_type text not null,
30
+ team_id text not null,
31
+ chunk_index integer not null,
32
+ created_at timestamptz not null default now()
33
+ );
34
+
35
+ create index if not exists chunks_doc_id_idx on chunks (doc_id);
36
+ create index if not exists chunks_team_id_idx on chunks (team_id);
37
+
38
+
39
+ -- Teams table β€” one row per tenant team
40
+ create table if not exists teams (
41
+ team_id text primary key,
42
+ cag_snapshot text,
43
+ snapshot_at timestamptz,
44
+ created_at timestamptz not null default now()
45
+ );
46
+
47
+
48
+ -- Ingest jobs table β€” tracks Celery task state for the API
49
+ create table if not exists ingest_jobs (
50
+ job_id text primary key,
51
+ celery_task_id text not null,
52
+ status text not null default 'pending',
53
+ source_type text not null,
54
+ team_id text not null,
55
+ chunks_ingested integer not null default 0,
56
+ error text,
57
+ created_at timestamptz not null default now(),
58
+ completed_at timestamptz
59
+ );
60
+
61
+ create index if not exists ingest_jobs_team_id_idx on ingest_jobs (team_id);
62
+ create index if not exists ingest_jobs_status_idx on ingest_jobs (status);
63
+
64
+
65
+ -- RLS: each team only sees its own documents and chunks
66
+ alter table documents enable row level security;
67
+ alter table chunks enable row level security;
68
+ alter table ingest_jobs enable row level security;
69
+
70
+ -- Service role bypasses RLS; these policies cover the anon / authenticated roles
71
+ create policy "team isolation β€” documents"
72
+ on documents for all
73
+ using (team_id = current_setting('app.team_id', true));
74
+
75
+ create policy "team isolation β€” chunks"
76
+ on chunks for all
77
+ using (team_id = current_setting('app.team_id', true));
78
+
79
+ create policy "team isolation β€” ingest_jobs"
80
+ on ingest_jobs for all
81
+ using (team_id = current_setting('app.team_id', true));