QuotationChatbot_v5 / notion.py
ICAS03
- fix Tech SOW againnnn
11c3743
import os
import re
from notion_client import Client
from datetime import datetime
from dotenv import load_dotenv
from state import state
import pandas as pd
import json
load_dotenv()
def extract_rich_text(text):
"""Extract rich text segments from markdown text with inline bold formatting."""
if not text:
return [{"type": "text", "text": {"content": ""}}]
rich_text = []
bold_pattern = r'\*\*(.*?)\*\*'
parts = re.split(bold_pattern, text)
for i, part in enumerate(parts):
if part is None:
continue
rich_text.append({
"type": "text",
"text": {"content": part},
"annotations": {"bold": True} if i % 2 == 1 else {}
})
return rich_text or [{"type": "text", "text": {"content": ""}}]
def markdown_to_notion_blocks(markdown):
"""Convert markdown text to Notion blocks."""
blocks = []
heading_patterns = {
r'^# (.+)$': ("heading_1", 1),
r'^## (.+)$': ("heading_2", 1),
r'^### (.+)$': ("heading_3", 1),
r'^#### (.+)$': ("heading_3", 1), # Map h4 to h3 (Notion limit)
r'^- (.+)$': ("bulleted_list_item", 1),
}
for line in markdown.split('\n'):
line = line.strip()
if not line:
continue
# Check for headings and list items
matched = False
for pattern, (block_type, group_idx) in heading_patterns.items():
match = re.match(pattern, line)
if match:
content = match.group(group_idx).strip()
blocks.append({
"object": "block",
"type": block_type,
block_type: {
"rich_text": extract_rich_text(content)
}
})
matched = True
break
# Default to paragraph
if not matched:
blocks.append({
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": extract_rich_text(line)
}
})
return blocks
def is_separator_row(row):
"""Check if a row is a separator row in a markdown table."""
if not (row.strip().startswith('|') and row.strip().endswith('|')):
return False
# Check each cell contains only dashes, colons, and spaces
cells = [cell.strip() for cell in row.strip()[1:-1].split('|')]
return all(cell and re.match(r'^[\s:]*-[-\s:]*$', cell) for cell in cells)
def parse_table_row(row):
"""Parse a markdown table row into a list of cell contents."""
if not (row.strip().startswith('|') and row.strip().endswith('|')):
return []
# Strip outer pipes and split by pipe
return [cell.strip() for cell in row.strip()[1:-1].split('|')]
def markdown_table_to_notion_blocks(markdown_table):
"""Convert markdown table to Notion table blocks."""
print("Processing markdown table...")
rows = [row.strip() for row in markdown_table.split('\n') if row.strip()]
# Find separator row
separator_index = next((i for i, row in enumerate(rows) if is_separator_row(row)), -1)
if separator_index <= 0 or separator_index >= len(rows) - 1:
print("Invalid table structure: missing proper separator row")
return []
# Parse rows (excluding separator)
parsed_rows = [parse_table_row(row) for i, row in enumerate(rows) if i != separator_index]
if not parsed_rows:
return []
headers = parsed_rows[0]
data_rows = parsed_rows[1:]
col_count = len(headers)
# Create table blocks (chunk if needed for API limits)
blocks = []
for i in range(0, len(data_rows), 99):
chunk = data_rows[i:i + 99]
table_block = {
"object": "block",
"type": "table",
"table": {
"table_width": col_count,
"has_column_header": True,
"has_row_header": False,
"children": []
}
}
# Add header row
table_block["table"]["children"].append({
"type": "table_row",
"table_row": {
"cells": [extract_rich_text(header) for header in headers]
}
})
# Add data rows
for row in chunk:
# Normalize row length
normalized_row = row[:col_count] if len(row) > col_count else row + [''] * (col_count - len(row))
# Process cells
table_block["table"]["children"].append({
"type": "table_row",
"table_row": {
"cells": [extract_rich_text(cell.replace('<br>', '\n')) for cell in normalized_row]
}
})
blocks.append(table_block)
return blocks
def process_mixed_content(content):
"""Process mixed markdown content (text and tables) into Notion blocks."""
blocks = []
# Improved table pattern
table_pattern = re.compile(
r'(\|.+\|[ \t]*\r?\n' # Header row
r'\|[-: |]+\|[ \t]*\r?\n' # Separator row
r'(?:\|.+\|[ \t]*\r?\n)+)', # Data row(s)
re.MULTILINE
)
parts = []
last_end = 0
# Find and extract tables
for match in table_pattern.finditer(content):
# Add text before table
if match.start() > last_end:
non_table = content[last_end:match.start()].strip()
if non_table:
parts.append(('text', non_table))
# Add table
parts.append(('table', match.group(1)))
last_end = match.end()
# Add remaining text after last table
if last_end < len(content):
remaining = content[last_end:].strip()
if remaining:
parts.append(('text', remaining))
# Process each part
for part_type, part_content in parts:
if part_type == 'text':
blocks.extend(markdown_to_notion_blocks(part_content))
elif part_type == 'table':
blocks.extend(markdown_table_to_notion_blocks(part_content))
return blocks
def mandays_markdown_to_notion_blocks(markdown):
"""Convert markdown text to Notion blocks."""
blocks = []
lines = markdown.split('\n')
# Check if the markdown is a table
if lines and '|' in lines[0]:
# Process as a table
headers = [header.strip() for header in lines[0].split('|') if header.strip()]
for line in lines[2:]: # Skip header and separator
if not line.strip():
continue
cells = [cell.strip() for cell in line.split('|') if cell.strip()]
if len(cells) == len(headers):
row = {headers[i]: cells[i] for i in range(len(headers))}
blocks.append({
"object": "block",
"type": "table_row",
"table_row": {
"cells": [{"type": "text", "text": {"content": cell}} for cell in cells]
}
})
else:
# Process as regular text
for line in lines:
if line.strip():
blocks.append({
"object": "block",
"type": "paragraph",
"paragraph": {
"rich_text": [{"type": "text", "text": {"content": line}}]
}
})
return blocks
def upload_blocks_to_notion(notion, page_id, blocks, chunk_size=95):
"""Upload blocks to Notion in chunks due to API limits."""
for i in range(0, len(blocks), chunk_size):
chunk = blocks[i:i + chunk_size]
notion.blocks.children.append(page_id, children=chunk)
print(f"Uploaded blocks {i+1} to {i+len(chunk)}")
def upload_to_notion(project_name):
"""Upload generation results to Notion as subpages."""
project = state.quotation_project
try:
notion = Client(auth=os.getenv("NOTION_TOKEN"))
parent_page_id = os.getenv("NOTION_PAGE_ID")
if not notion or not parent_page_id:
return "Notion credentials or parent page ID not configured. Check your .env file."
# Create parent page
timestamp = datetime.now().strftime("%y%m%d_%H%M%S")
page_title = f"{project_name}_{timestamp}" if project_name else f"Quotation_{timestamp}"
parent_page = notion.pages.create(
parent={"page_id": parent_page_id},
properties={"title": [{"type": "text", "text": {"content": page_title}}]}
)
parent_page_id = parent_page["id"]
print(f"Created parent Notion page with ID: {parent_page_id}")
# Define the attributes to upload
attributes_to_upload = [
"generated_prd",
"generated_plan_test_components",
"generated_page_dev_components",
"generated_engage_dev_components",
"generated_intent_list",
"reformatted_dev_components",
"generated_mvp_prd",
"combined_cost_summary",
"generated_BD_SOW",
"generated_Tech_SOW"
]
# Iterate over selected attributes of the project
for attr_name in attributes_to_upload:
if not hasattr(project, attr_name):
continue
content = getattr(project, attr_name)
if content is None:
continue
# Handle JSON-like content for generated_Tech_SOW
if attr_name == "generated_Tech_SOW":
try:
parsed_result = json.loads(content)
# Convert the JSON content to a formatted markdown string
if isinstance(parsed_result, dict):
content = "\n\n".join(f"## {key.replace('_', ' ').title()}\n{value}" for key, value in parsed_result.items())
except json.JSONDecodeError as e:
print(f"Error parsing JSON for {attr_name}: {e}")
continue
print(f"Creating subpage for attribute '{attr_name}'")
try:
# Create subpage
subpage = notion.pages.create(
parent={"page_id": parent_page_id},
properties={"title": [{"type": "text", "text": {"content": attr_name}}]}
)
# Process content
if isinstance(content, pd.DataFrame):
content = content.to_markdown()
elif isinstance(content, list) and all(isinstance(i, dict) for i in content):
# Convert list of dictionaries to DataFrame
content = pd.DataFrame(content).to_markdown()
elif isinstance(content, list):
# Convert list to a string representation
content = "\n".join(str(item) for item in content)
blocks = process_mixed_content(content) or markdown_to_notion_blocks(content)
if blocks:
upload_blocks_to_notion(notion, subpage["id"], blocks)
else:
print(f"No blocks generated for subpage '{attr_name}'")
except Exception as e:
print(f"Error processing attribute '{attr_name}': {e}")
# Upload mandays results
try:
if project.mandays_results:
for result in project.mandays_results:
function_name = result['function_name']
result_data = result['result']
# Ensure result_data is a list of dictionaries
if isinstance(result_data, dict) and function_name in result_data:
actual_data = result_data[function_name]
df = pd.DataFrame(actual_data)
if not df.empty:
# Convert DataFrame to markdown
markdown_content = df.to_markdown(index=False)
blocks = markdown_table_to_notion_blocks(markdown_content)
subpage = notion.pages.create(
parent={"page_id": parent_page_id},
properties={"title": [{"type": "text", "text": {"content": f"{function_name}_mandays"}}]}
)
upload_blocks_to_notion(notion, subpage["id"], blocks)
print(f"Uploaded {function_name}_mandays to Notion")
else:
print(f"Unexpected result data format for {function_name}.")
if project.mvp_mandays_results:
for result in project.mvp_mandays_results:
function_name = result['function_name']
result_data = result['result']
# Check if result_data is a dictionary
if isinstance(result_data, dict):
for section_name, records in result_data.items():
if isinstance(records, list):
df = pd.DataFrame(records)
if not df.empty:
# Convert DataFrame to markdown
markdown_content = df.to_markdown(index=False)
blocks = markdown_table_to_notion_blocks(markdown_content)
subpage = notion.pages.create(
parent={"page_id": parent_page_id},
properties={"title": [{"type": "text", "text": {"content": f"{function_name}_{section_name}"}}]}
)
upload_blocks_to_notion(notion, subpage["id"], blocks)
print(f"Uploaded {function_name}_{section_name}_mvp_mandays to Notion")
else:
print(f"Unexpected data format for {section_name} in {function_name}.")
else:
print(f"Unexpected result data format for {function_name}.")
except Exception as e:
print(f"Failed to upload mandays results to Notion: {e}")
return f"Successfully uploaded to Notion page: {page_title}. Link: https://www.notion.so/{parent_page_id}"
except Exception as e:
print(f"Notion upload error: {str(e)}")
return f"Failed to upload to Notion. Error: {str(e)}"