import sys import os import json import logging from pathlib import Path from typing import List from dotenv import load_dotenv from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import NotionDBLoader from langchain_chroma import Chroma from langchain_community.vectorstores.utils import filter_complex_metadata from langchain_core.prompts import ChatPromptTemplate # Add src to path sys.path.append(str(Path(__file__).parent.parent / "src")) from core.settings import settings from core.llm import get_model from core.embeddings import get_embeddings load_dotenv() # Config PORTFOLIO_OWNER = os.getenv("PORTFOLIO_OWNER") NOTION_TOKEN = os.getenv("NOTION_TOKEN") CHROMA_PERSIST_DIR = "./chroma_db" CHROMA_COLLECTION = "portfolio" DB_ENV_MAP = { "education": "NOTION_EDUCATION_ID", "experience": "NOTION_EXPERIENCE_ID", "projects": "NOTION_PROJECT_ID", "testimonials": "NOTION_TESTIMONIAL_ID", "blog": "NOTION_BLOG_ID", } logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s") logger = logging.getLogger(__name__) # MODELS LLM = get_model(settings.DEFAULT_MODEL) EMBEDDINGS = get_embeddings(settings.DEFAULT_EMBEDDING_MODEL) # PROMPTS SYSTEM_PROMPT = """ You are a portfolio knowledge-base curator. All content is about {owner} and written in third person. The output is stored in a vector database. Task: Format the provided input into a clean, concise Markdown entry. Rules: - Output valid Markdown only - Start with an H2 or H3 title - Use short, factual bullet points - Write ONLY what is explicitly present in the input - Preserve original wording exactly - Do NOT infer, normalize, summarize, interpret, or embellish - Do NOT add, remove, rename, or assume any fields - If something is not explicitly stated, do not include it Formatting: - Present the information clearly and concisely - Do not introduce structure beyond what the input supports - Do not guess labels or categories Images: - Render image immediately after the title if a valid image URL exists in metadata Metadata: - Always include a final "Metadata" section - Include all provided metadata exactly as given """ DB_STRATEGY_PROMPTS = { "education": """ Required fields (must be present if available): - Education level - Course or board name - Institution and location - Start and end year or completion year - 1–2 factual achievements or distinctions explicitly stated in the input """, "experience": """ Required fields (must be present if available): - Role - Organization - Location - Duration - Up to 3 responsibilities or contributions explicitly stated in the input """, "projects": """ Required fields (must be present if available): - Project name - Objective or problem statement - Up to 3 explicitly stated technologies, features, or outcomes """, "testimonials": """ Required fields (must be present if available): - Author - Context - Verbatim feedback lines from the input (no paraphrasing) """, "blog": """ Required fields (must be present if available): - Blog title - Description - Explicitly listed topics, tags, or technologies """, } ENRICH_PROMPT = ChatPromptTemplate.from_messages( [ ("system", SYSTEM_PROMPT), ( "human", """ Owner: {owner} Source: {source_db} Strategy: {strategy} Raw Input: {payload} Task: Extract one structured Markdown entry. """ ), ] ) # ENRICHMENT def enrich_and_normalize(docs: List[Document], source_db: str) -> List[Document]: strategy = DB_STRATEGY_PROMPTS[source_db] processed_docs = [] for idx, doc in enumerate(docs, start=1): logger.info(f"Enriching [{source_db}] document {idx}/{len(docs)}") print(f"Enriching [{source_db}] document {idx}/{len(docs)}") payload = { "content": doc.page_content, "metadata": doc.metadata, } try: response = LLM.invoke( ENRICH_PROMPT.format_messages( owner=PORTFOLIO_OWNER, source_db=source_db, strategy=strategy, payload=json.dumps(payload, indent=2, default=str), ) ) markdown = response.content.strip() logger.info(f"[CONTENT]: {markdown}") print(f"[CONTENT]: {markdown}") except Exception as e: logger.error(f"LLM failure: {e}") print(f"LLM failure: {e}") markdown = ( "## Raw Record\n\n" "```json\n" f"{json.dumps(payload, indent=2, default=str)}\n" "```" ) processed_docs.append( Document( page_content=markdown, metadata=doc.metadata or {}, ) ) return processed_docs # INGESTION def run_ingestion(): if not NOTION_TOKEN: logger.error("NOTION_TOKEN is missing.") print("NOTION_TOKEN is missing.") return splitter = RecursiveCharacterTextSplitter( chunk_size=900, chunk_overlap=100, ) all_chunks = [] for db_name, env_key in DB_ENV_MAP.items(): db_id = os.getenv(env_key) if not db_id: continue logger.info(f"Loading {db_name} from Notion") print(f"Loading {db_name} from Notion") loader = NotionDBLoader( integration_token=NOTION_TOKEN, database_id=db_id, ) raw_docs = loader.load() if not raw_docs: logger.warning(f"No records found for {db_name}") print(f"No records found for {db_name}") continue enriched = enrich_and_normalize(raw_docs, db_name) chunks = splitter.split_documents(enriched) all_chunks.extend(chunks) if not all_chunks: logger.error("No documents to ingest.") print("No documents to ingest.") return safe_docs = filter_complex_metadata(all_chunks) Chroma.from_documents( documents=safe_docs, embedding=EMBEDDINGS, persist_directory=CHROMA_PERSIST_DIR, collection_name=CHROMA_COLLECTION, ) logger.info("Ingestion completed successfully.") print("Ingestion completed successfully.") if __name__ == "__main__": run_ingestion()