Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| from notion_client import Client | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| from state import state | |
| import pandas as pd | |
| import json | |
| load_dotenv() | |
| def extract_rich_text(text): | |
| """Extract rich text segments from markdown text with inline bold formatting.""" | |
| if not text: | |
| return [{"type": "text", "text": {"content": ""}}] | |
| rich_text = [] | |
| bold_pattern = r'\*\*(.*?)\*\*' | |
| parts = re.split(bold_pattern, text) | |
| for i, part in enumerate(parts): | |
| if part is None: | |
| continue | |
| rich_text.append({ | |
| "type": "text", | |
| "text": {"content": part}, | |
| "annotations": {"bold": True} if i % 2 == 1 else {} | |
| }) | |
| return rich_text or [{"type": "text", "text": {"content": ""}}] | |
| def markdown_to_notion_blocks(markdown): | |
| """Convert markdown text to Notion blocks.""" | |
| blocks = [] | |
| heading_patterns = { | |
| r'^# (.+)$': ("heading_1", 1), | |
| r'^## (.+)$': ("heading_2", 1), | |
| r'^### (.+)$': ("heading_3", 1), | |
| r'^#### (.+)$': ("heading_3", 1), # Map h4 to h3 (Notion limit) | |
| r'^- (.+)$': ("bulleted_list_item", 1), | |
| } | |
| for line in markdown.split('\n'): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check for headings and list items | |
| matched = False | |
| for pattern, (block_type, group_idx) in heading_patterns.items(): | |
| match = re.match(pattern, line) | |
| if match: | |
| content = match.group(group_idx).strip() | |
| blocks.append({ | |
| "object": "block", | |
| "type": block_type, | |
| block_type: { | |
| "rich_text": extract_rich_text(content) | |
| } | |
| }) | |
| matched = True | |
| break | |
| # Default to paragraph | |
| if not matched: | |
| blocks.append({ | |
| "object": "block", | |
| "type": "paragraph", | |
| "paragraph": { | |
| "rich_text": extract_rich_text(line) | |
| } | |
| }) | |
| return blocks | |
| def is_separator_row(row): | |
| """Check if a row is a separator row in a markdown table.""" | |
| if not (row.strip().startswith('|') and row.strip().endswith('|')): | |
| return False | |
| # Check each cell contains only dashes, colons, and spaces | |
| cells = [cell.strip() for cell in row.strip()[1:-1].split('|')] | |
| return all(cell and re.match(r'^[\s:]*-[-\s:]*$', cell) for cell in cells) | |
| def parse_table_row(row): | |
| """Parse a markdown table row into a list of cell contents.""" | |
| if not (row.strip().startswith('|') and row.strip().endswith('|')): | |
| return [] | |
| # Strip outer pipes and split by pipe | |
| return [cell.strip() for cell in row.strip()[1:-1].split('|')] | |
| def markdown_table_to_notion_blocks(markdown_table): | |
| """Convert markdown table to Notion table blocks.""" | |
| print("Processing markdown table...") | |
| rows = [row.strip() for row in markdown_table.split('\n') if row.strip()] | |
| # Find separator row | |
| separator_index = next((i for i, row in enumerate(rows) if is_separator_row(row)), -1) | |
| if separator_index <= 0 or separator_index >= len(rows) - 1: | |
| print("Invalid table structure: missing proper separator row") | |
| return [] | |
| # Parse rows (excluding separator) | |
| parsed_rows = [parse_table_row(row) for i, row in enumerate(rows) if i != separator_index] | |
| if not parsed_rows: | |
| return [] | |
| headers = parsed_rows[0] | |
| data_rows = parsed_rows[1:] | |
| col_count = len(headers) | |
| # Create table blocks (chunk if needed for API limits) | |
| blocks = [] | |
| for i in range(0, len(data_rows), 99): | |
| chunk = data_rows[i:i + 99] | |
| table_block = { | |
| "object": "block", | |
| "type": "table", | |
| "table": { | |
| "table_width": col_count, | |
| "has_column_header": True, | |
| "has_row_header": False, | |
| "children": [] | |
| } | |
| } | |
| # Add header row | |
| table_block["table"]["children"].append({ | |
| "type": "table_row", | |
| "table_row": { | |
| "cells": [extract_rich_text(header) for header in headers] | |
| } | |
| }) | |
| # Add data rows | |
| for row in chunk: | |
| # Normalize row length | |
| normalized_row = row[:col_count] if len(row) > col_count else row + [''] * (col_count - len(row)) | |
| # Process cells | |
| table_block["table"]["children"].append({ | |
| "type": "table_row", | |
| "table_row": { | |
| "cells": [extract_rich_text(cell.replace('<br>', '\n')) for cell in normalized_row] | |
| } | |
| }) | |
| blocks.append(table_block) | |
| return blocks | |
| def process_mixed_content(content): | |
| """Process mixed markdown content (text and tables) into Notion blocks.""" | |
| blocks = [] | |
| # Improved table pattern | |
| table_pattern = re.compile( | |
| r'(\|.+\|[ \t]*\r?\n' # Header row | |
| r'\|[-: |]+\|[ \t]*\r?\n' # Separator row | |
| r'(?:\|.+\|[ \t]*\r?\n)+)', # Data row(s) | |
| re.MULTILINE | |
| ) | |
| parts = [] | |
| last_end = 0 | |
| # Find and extract tables | |
| for match in table_pattern.finditer(content): | |
| # Add text before table | |
| if match.start() > last_end: | |
| non_table = content[last_end:match.start()].strip() | |
| if non_table: | |
| parts.append(('text', non_table)) | |
| # Add table | |
| parts.append(('table', match.group(1))) | |
| last_end = match.end() | |
| # Add remaining text after last table | |
| if last_end < len(content): | |
| remaining = content[last_end:].strip() | |
| if remaining: | |
| parts.append(('text', remaining)) | |
| # Process each part | |
| for part_type, part_content in parts: | |
| if part_type == 'text': | |
| blocks.extend(markdown_to_notion_blocks(part_content)) | |
| elif part_type == 'table': | |
| blocks.extend(markdown_table_to_notion_blocks(part_content)) | |
| return blocks | |
| def mandays_markdown_to_notion_blocks(markdown): | |
| """Convert markdown text to Notion blocks.""" | |
| blocks = [] | |
| lines = markdown.split('\n') | |
| # Check if the markdown is a table | |
| if lines and '|' in lines[0]: | |
| # Process as a table | |
| headers = [header.strip() for header in lines[0].split('|') if header.strip()] | |
| for line in lines[2:]: # Skip header and separator | |
| if not line.strip(): | |
| continue | |
| cells = [cell.strip() for cell in line.split('|') if cell.strip()] | |
| if len(cells) == len(headers): | |
| row = {headers[i]: cells[i] for i in range(len(headers))} | |
| blocks.append({ | |
| "object": "block", | |
| "type": "table_row", | |
| "table_row": { | |
| "cells": [{"type": "text", "text": {"content": cell}} for cell in cells] | |
| } | |
| }) | |
| else: | |
| # Process as regular text | |
| for line in lines: | |
| if line.strip(): | |
| blocks.append({ | |
| "object": "block", | |
| "type": "paragraph", | |
| "paragraph": { | |
| "rich_text": [{"type": "text", "text": {"content": line}}] | |
| } | |
| }) | |
| return blocks | |
| def upload_blocks_to_notion(notion, page_id, blocks, chunk_size=95): | |
| """Upload blocks to Notion in chunks due to API limits.""" | |
| for i in range(0, len(blocks), chunk_size): | |
| chunk = blocks[i:i + chunk_size] | |
| notion.blocks.children.append(page_id, children=chunk) | |
| print(f"Uploaded blocks {i+1} to {i+len(chunk)}") | |
| def upload_to_notion(project_name): | |
| """Upload generation results to Notion as subpages.""" | |
| project = state.quotation_project | |
| try: | |
| notion = Client(auth=os.getenv("NOTION_TOKEN")) | |
| parent_page_id = os.getenv("NOTION_PAGE_ID") | |
| if not notion or not parent_page_id: | |
| return "Notion credentials or parent page ID not configured. Check your .env file." | |
| # Create parent page | |
| timestamp = datetime.now().strftime("%y%m%d_%H%M%S") | |
| page_title = f"{project_name}_{timestamp}" if project_name else f"Quotation_{timestamp}" | |
| parent_page = notion.pages.create( | |
| parent={"page_id": parent_page_id}, | |
| properties={"title": [{"type": "text", "text": {"content": page_title}}]} | |
| ) | |
| parent_page_id = parent_page["id"] | |
| print(f"Created parent Notion page with ID: {parent_page_id}") | |
| # Define the attributes to upload | |
| attributes_to_upload = [ | |
| "generated_prd", | |
| "generated_plan_test_components", | |
| "generated_page_dev_components", | |
| "generated_engage_dev_components", | |
| "generated_intent_list", | |
| "reformatted_dev_components", | |
| "generated_mvp_prd", | |
| "combined_cost_summary", | |
| "generated_BD_SOW", | |
| "generated_Tech_SOW" | |
| ] | |
| # Iterate over selected attributes of the project | |
| for attr_name in attributes_to_upload: | |
| if not hasattr(project, attr_name): | |
| continue | |
| content = getattr(project, attr_name) | |
| if content is None: | |
| continue | |
| # Handle JSON-like content for generated_Tech_SOW | |
| if attr_name == "generated_Tech_SOW": | |
| try: | |
| parsed_result = json.loads(content) | |
| # Convert the JSON content to a formatted markdown string | |
| if isinstance(parsed_result, dict): | |
| content = "\n\n".join(f"## {key.replace('_', ' ').title()}\n{value}" for key, value in parsed_result.items()) | |
| except json.JSONDecodeError as e: | |
| print(f"Error parsing JSON for {attr_name}: {e}") | |
| continue | |
| print(f"Creating subpage for attribute '{attr_name}'") | |
| try: | |
| # Create subpage | |
| subpage = notion.pages.create( | |
| parent={"page_id": parent_page_id}, | |
| properties={"title": [{"type": "text", "text": {"content": attr_name}}]} | |
| ) | |
| # Process content | |
| if isinstance(content, pd.DataFrame): | |
| content = content.to_markdown() | |
| elif isinstance(content, list) and all(isinstance(i, dict) for i in content): | |
| # Convert list of dictionaries to DataFrame | |
| content = pd.DataFrame(content).to_markdown() | |
| elif isinstance(content, list): | |
| # Convert list to a string representation | |
| content = "\n".join(str(item) for item in content) | |
| blocks = process_mixed_content(content) or markdown_to_notion_blocks(content) | |
| if blocks: | |
| upload_blocks_to_notion(notion, subpage["id"], blocks) | |
| else: | |
| print(f"No blocks generated for subpage '{attr_name}'") | |
| except Exception as e: | |
| print(f"Error processing attribute '{attr_name}': {e}") | |
| # Upload mandays results | |
| try: | |
| if project.mandays_results: | |
| for result in project.mandays_results: | |
| function_name = result['function_name'] | |
| result_data = result['result'] | |
| # Ensure result_data is a list of dictionaries | |
| if isinstance(result_data, dict) and function_name in result_data: | |
| actual_data = result_data[function_name] | |
| df = pd.DataFrame(actual_data) | |
| if not df.empty: | |
| # Convert DataFrame to markdown | |
| markdown_content = df.to_markdown(index=False) | |
| blocks = markdown_table_to_notion_blocks(markdown_content) | |
| subpage = notion.pages.create( | |
| parent={"page_id": parent_page_id}, | |
| properties={"title": [{"type": "text", "text": {"content": f"{function_name}_mandays"}}]} | |
| ) | |
| upload_blocks_to_notion(notion, subpage["id"], blocks) | |
| print(f"Uploaded {function_name}_mandays to Notion") | |
| else: | |
| print(f"Unexpected result data format for {function_name}.") | |
| if project.mvp_mandays_results: | |
| for result in project.mvp_mandays_results: | |
| function_name = result['function_name'] | |
| result_data = result['result'] | |
| # Check if result_data is a dictionary | |
| if isinstance(result_data, dict): | |
| for section_name, records in result_data.items(): | |
| if isinstance(records, list): | |
| df = pd.DataFrame(records) | |
| if not df.empty: | |
| # Convert DataFrame to markdown | |
| markdown_content = df.to_markdown(index=False) | |
| blocks = markdown_table_to_notion_blocks(markdown_content) | |
| subpage = notion.pages.create( | |
| parent={"page_id": parent_page_id}, | |
| properties={"title": [{"type": "text", "text": {"content": f"{function_name}_{section_name}"}}]} | |
| ) | |
| upload_blocks_to_notion(notion, subpage["id"], blocks) | |
| print(f"Uploaded {function_name}_{section_name}_mvp_mandays to Notion") | |
| else: | |
| print(f"Unexpected data format for {section_name} in {function_name}.") | |
| else: | |
| print(f"Unexpected result data format for {function_name}.") | |
| except Exception as e: | |
| print(f"Failed to upload mandays results to Notion: {e}") | |
| return f"Successfully uploaded to Notion page: {page_title}. Link: https://www.notion.so/{parent_page_id}" | |
| except Exception as e: | |
| print(f"Notion upload error: {str(e)}") | |
| return f"Failed to upload to Notion. Error: {str(e)}" |