backend / scripts /load_notion.py
anujjoshi3105's picture
initial
22dcdfd
import sys
import os
import json
import logging
from pathlib import Path
from typing import List
from dotenv import load_dotenv
from langchain_core.documents import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import NotionDBLoader
from langchain_chroma import Chroma
from langchain_community.vectorstores.utils import filter_complex_metadata
from langchain_core.prompts import ChatPromptTemplate
# Add src to path
sys.path.append(str(Path(__file__).parent.parent / "src"))
from core.settings import settings
from core.llm import get_model
from core.embeddings import get_embeddings
load_dotenv()
# Config
PORTFOLIO_OWNER = os.getenv("PORTFOLIO_OWNER")
NOTION_TOKEN = os.getenv("NOTION_TOKEN")
CHROMA_PERSIST_DIR = "./chroma_db"
CHROMA_COLLECTION = "portfolio"
DB_ENV_MAP = {
"education": "NOTION_EDUCATION_ID",
"experience": "NOTION_EXPERIENCE_ID",
"projects": "NOTION_PROJECT_ID",
"testimonials": "NOTION_TESTIMONIAL_ID",
"blog": "NOTION_BLOG_ID",
}
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
# MODELS
LLM = get_model(settings.DEFAULT_MODEL)
EMBEDDINGS = get_embeddings(settings.DEFAULT_EMBEDDING_MODEL)
# PROMPTS
SYSTEM_PROMPT = """
You are a portfolio knowledge-base curator.
All content is about {owner} and written in third person.
The output is stored in a vector database.
Task:
Format the provided input into a clean, concise Markdown entry.
Rules:
- Output valid Markdown only
- Start with an H2 or H3 title
- Use short, factual bullet points
- Write ONLY what is explicitly present in the input
- Preserve original wording exactly
- Do NOT infer, normalize, summarize, interpret, or embellish
- Do NOT add, remove, rename, or assume any fields
- If something is not explicitly stated, do not include it
Formatting:
- Present the information clearly and concisely
- Do not introduce structure beyond what the input supports
- Do not guess labels or categories
Images:
- Render image immediately after the title if a valid image URL exists in metadata
Metadata:
- Always include a final "Metadata" section
- Include all provided metadata exactly as given
"""
DB_STRATEGY_PROMPTS = {
"education": """
Required fields (must be present if available):
- Education level
- Course or board name
- Institution and location
- Start and end year or completion year
- 1–2 factual achievements or distinctions explicitly stated in the input
""",
"experience": """
Required fields (must be present if available):
- Role
- Organization
- Location
- Duration
- Up to 3 responsibilities or contributions explicitly stated in the input
""",
"projects": """
Required fields (must be present if available):
- Project name
- Objective or problem statement
- Up to 3 explicitly stated technologies, features, or outcomes
""",
"testimonials": """
Required fields (must be present if available):
- Author
- Context
- Verbatim feedback lines from the input (no paraphrasing)
""",
"blog": """
Required fields (must be present if available):
- Blog title
- Description
- Explicitly listed topics, tags, or technologies
""",
}
ENRICH_PROMPT = ChatPromptTemplate.from_messages(
[
("system", SYSTEM_PROMPT),
(
"human",
"""
Owner: {owner}
Source: {source_db}
Strategy:
{strategy}
Raw Input:
{payload}
Task:
Extract one structured Markdown entry.
"""
),
]
)
# ENRICHMENT
def enrich_and_normalize(docs: List[Document], source_db: str) -> List[Document]:
strategy = DB_STRATEGY_PROMPTS[source_db]
processed_docs = []
for idx, doc in enumerate(docs, start=1):
logger.info(f"Enriching [{source_db}] document {idx}/{len(docs)}")
print(f"Enriching [{source_db}] document {idx}/{len(docs)}")
payload = {
"content": doc.page_content,
"metadata": doc.metadata,
}
try:
response = LLM.invoke(
ENRICH_PROMPT.format_messages(
owner=PORTFOLIO_OWNER,
source_db=source_db,
strategy=strategy,
payload=json.dumps(payload, indent=2, default=str),
)
)
markdown = response.content.strip()
logger.info(f"[CONTENT]: {markdown}")
print(f"[CONTENT]: {markdown}")
except Exception as e:
logger.error(f"LLM failure: {e}")
print(f"LLM failure: {e}")
markdown = (
"## Raw Record\n\n"
"```json\n"
f"{json.dumps(payload, indent=2, default=str)}\n"
"```"
)
processed_docs.append(
Document(
page_content=markdown,
metadata=doc.metadata or {},
)
)
return processed_docs
# INGESTION
def run_ingestion():
if not NOTION_TOKEN:
logger.error("NOTION_TOKEN is missing.")
print("NOTION_TOKEN is missing.")
return
splitter = RecursiveCharacterTextSplitter(
chunk_size=900,
chunk_overlap=100,
)
all_chunks = []
for db_name, env_key in DB_ENV_MAP.items():
db_id = os.getenv(env_key)
if not db_id:
continue
logger.info(f"Loading {db_name} from Notion")
print(f"Loading {db_name} from Notion")
loader = NotionDBLoader(
integration_token=NOTION_TOKEN,
database_id=db_id,
)
raw_docs = loader.load()
if not raw_docs:
logger.warning(f"No records found for {db_name}")
print(f"No records found for {db_name}")
continue
enriched = enrich_and_normalize(raw_docs, db_name)
chunks = splitter.split_documents(enriched)
all_chunks.extend(chunks)
if not all_chunks:
logger.error("No documents to ingest.")
print("No documents to ingest.")
return
safe_docs = filter_complex_metadata(all_chunks)
Chroma.from_documents(
documents=safe_docs,
embedding=EMBEDDINGS,
persist_directory=CHROMA_PERSIST_DIR,
collection_name=CHROMA_COLLECTION,
)
logger.info("Ingestion completed successfully.")
print("Ingestion completed successfully.")
if __name__ == "__main__":
run_ingestion()