|
|
import sys |
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import List |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
from langchain_core.documents import Document |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_community.document_loaders import NotionDBLoader |
|
|
from langchain_chroma import Chroma |
|
|
from langchain_community.vectorstores.utils import filter_complex_metadata |
|
|
from langchain_core.prompts import ChatPromptTemplate |
|
|
|
|
|
|
|
|
sys.path.append(str(Path(__file__).parent.parent / "src")) |
|
|
|
|
|
from core.settings import settings |
|
|
from core.llm import get_model |
|
|
from core.embeddings import get_embeddings |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
PORTFOLIO_OWNER = os.getenv("PORTFOLIO_OWNER") |
|
|
NOTION_TOKEN = os.getenv("NOTION_TOKEN") |
|
|
|
|
|
CHROMA_PERSIST_DIR = "./chroma_db" |
|
|
CHROMA_COLLECTION = "portfolio" |
|
|
|
|
|
DB_ENV_MAP = { |
|
|
"education": "NOTION_EDUCATION_ID", |
|
|
"experience": "NOTION_EXPERIENCE_ID", |
|
|
"projects": "NOTION_PROJECT_ID", |
|
|
"testimonials": "NOTION_TESTIMONIAL_ID", |
|
|
"blog": "NOTION_BLOG_ID", |
|
|
} |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
LLM = get_model(settings.DEFAULT_MODEL) |
|
|
EMBEDDINGS = get_embeddings(settings.DEFAULT_EMBEDDING_MODEL) |
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = """ |
|
|
You are a portfolio knowledge-base curator. |
|
|
All content is about {owner} and written in third person. |
|
|
The output is stored in a vector database. |
|
|
|
|
|
Task: |
|
|
Format the provided input into a clean, concise Markdown entry. |
|
|
|
|
|
Rules: |
|
|
- Output valid Markdown only |
|
|
- Start with an H2 or H3 title |
|
|
- Use short, factual bullet points |
|
|
- Write ONLY what is explicitly present in the input |
|
|
- Preserve original wording exactly |
|
|
- Do NOT infer, normalize, summarize, interpret, or embellish |
|
|
- Do NOT add, remove, rename, or assume any fields |
|
|
- If something is not explicitly stated, do not include it |
|
|
|
|
|
Formatting: |
|
|
- Present the information clearly and concisely |
|
|
- Do not introduce structure beyond what the input supports |
|
|
- Do not guess labels or categories |
|
|
|
|
|
Images: |
|
|
- Render image immediately after the title if a valid image URL exists in metadata |
|
|
|
|
|
Metadata: |
|
|
- Always include a final "Metadata" section |
|
|
- Include all provided metadata exactly as given |
|
|
""" |
|
|
|
|
|
DB_STRATEGY_PROMPTS = { |
|
|
"education": """ |
|
|
Required fields (must be present if available): |
|
|
- Education level |
|
|
- Course or board name |
|
|
- Institution and location |
|
|
- Start and end year or completion year |
|
|
- 1–2 factual achievements or distinctions explicitly stated in the input |
|
|
""", |
|
|
"experience": """ |
|
|
Required fields (must be present if available): |
|
|
- Role |
|
|
- Organization |
|
|
- Location |
|
|
- Duration |
|
|
- Up to 3 responsibilities or contributions explicitly stated in the input |
|
|
""", |
|
|
"projects": """ |
|
|
Required fields (must be present if available): |
|
|
- Project name |
|
|
- Objective or problem statement |
|
|
- Up to 3 explicitly stated technologies, features, or outcomes |
|
|
""", |
|
|
"testimonials": """ |
|
|
Required fields (must be present if available): |
|
|
- Author |
|
|
- Context |
|
|
- Verbatim feedback lines from the input (no paraphrasing) |
|
|
""", |
|
|
"blog": """ |
|
|
Required fields (must be present if available): |
|
|
- Blog title |
|
|
- Description |
|
|
- Explicitly listed topics, tags, or technologies |
|
|
""", |
|
|
} |
|
|
|
|
|
ENRICH_PROMPT = ChatPromptTemplate.from_messages( |
|
|
[ |
|
|
("system", SYSTEM_PROMPT), |
|
|
( |
|
|
"human", |
|
|
""" |
|
|
Owner: {owner} |
|
|
Source: {source_db} |
|
|
|
|
|
Strategy: |
|
|
{strategy} |
|
|
|
|
|
Raw Input: |
|
|
{payload} |
|
|
|
|
|
Task: |
|
|
Extract one structured Markdown entry. |
|
|
""" |
|
|
), |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
def enrich_and_normalize(docs: List[Document], source_db: str) -> List[Document]: |
|
|
strategy = DB_STRATEGY_PROMPTS[source_db] |
|
|
processed_docs = [] |
|
|
|
|
|
for idx, doc in enumerate(docs, start=1): |
|
|
logger.info(f"Enriching [{source_db}] document {idx}/{len(docs)}") |
|
|
print(f"Enriching [{source_db}] document {idx}/{len(docs)}") |
|
|
|
|
|
payload = { |
|
|
"content": doc.page_content, |
|
|
"metadata": doc.metadata, |
|
|
} |
|
|
|
|
|
try: |
|
|
response = LLM.invoke( |
|
|
ENRICH_PROMPT.format_messages( |
|
|
owner=PORTFOLIO_OWNER, |
|
|
source_db=source_db, |
|
|
strategy=strategy, |
|
|
payload=json.dumps(payload, indent=2, default=str), |
|
|
) |
|
|
) |
|
|
markdown = response.content.strip() |
|
|
logger.info(f"[CONTENT]: {markdown}") |
|
|
print(f"[CONTENT]: {markdown}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"LLM failure: {e}") |
|
|
print(f"LLM failure: {e}") |
|
|
markdown = ( |
|
|
"## Raw Record\n\n" |
|
|
"```json\n" |
|
|
f"{json.dumps(payload, indent=2, default=str)}\n" |
|
|
"```" |
|
|
) |
|
|
|
|
|
processed_docs.append( |
|
|
Document( |
|
|
page_content=markdown, |
|
|
metadata=doc.metadata or {}, |
|
|
) |
|
|
) |
|
|
|
|
|
return processed_docs |
|
|
|
|
|
|
|
|
def run_ingestion(): |
|
|
if not NOTION_TOKEN: |
|
|
logger.error("NOTION_TOKEN is missing.") |
|
|
print("NOTION_TOKEN is missing.") |
|
|
return |
|
|
|
|
|
splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=900, |
|
|
chunk_overlap=100, |
|
|
) |
|
|
|
|
|
all_chunks = [] |
|
|
|
|
|
for db_name, env_key in DB_ENV_MAP.items(): |
|
|
db_id = os.getenv(env_key) |
|
|
if not db_id: |
|
|
continue |
|
|
|
|
|
logger.info(f"Loading {db_name} from Notion") |
|
|
print(f"Loading {db_name} from Notion") |
|
|
|
|
|
loader = NotionDBLoader( |
|
|
integration_token=NOTION_TOKEN, |
|
|
database_id=db_id, |
|
|
) |
|
|
|
|
|
raw_docs = loader.load() |
|
|
if not raw_docs: |
|
|
logger.warning(f"No records found for {db_name}") |
|
|
print(f"No records found for {db_name}") |
|
|
continue |
|
|
|
|
|
enriched = enrich_and_normalize(raw_docs, db_name) |
|
|
chunks = splitter.split_documents(enriched) |
|
|
all_chunks.extend(chunks) |
|
|
|
|
|
if not all_chunks: |
|
|
logger.error("No documents to ingest.") |
|
|
print("No documents to ingest.") |
|
|
return |
|
|
|
|
|
safe_docs = filter_complex_metadata(all_chunks) |
|
|
|
|
|
Chroma.from_documents( |
|
|
documents=safe_docs, |
|
|
embedding=EMBEDDINGS, |
|
|
persist_directory=CHROMA_PERSIST_DIR, |
|
|
collection_name=CHROMA_COLLECTION, |
|
|
) |
|
|
|
|
|
logger.info("Ingestion completed successfully.") |
|
|
print("Ingestion completed successfully.") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
run_ingestion() |