Spaces:
Runtime error
Runtime error
| import os | |
| import io | |
| import markdown | |
| import pandas as pd | |
| from datetime import datetime | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseUpload | |
| from google.oauth2 import service_account | |
| from googleapiclient.errors import HttpError | |
| from docx import Document | |
| import re | |
| from datetime import datetime | |
| import gradio as gr | |
| from state import state | |
| import json | |
| # Path to your Service Account key file | |
| SERVICE_ACCOUNT_FILE = 'gdrive_service_account.json' | |
| # Define the scopes. For full Drive access, use 'https://www.googleapis.com/auth/drive' | |
| # For access limited to files created by the app, use 'https://www.googleapis.com/auth/drive.file' | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| # ID of the Google Drive folder shared with the Service Account | |
| DRIVE_FOLDER_ID = '1ksgImxpdYor73BkYnp60oAbRiE1nHAyz' | |
| def authenticate_drive_service(): | |
| """Authenticate using Service Account and return the Drive service.""" | |
| print("Authenticating Drive service...") | |
| credentials = service_account.Credentials.from_service_account_file( | |
| SERVICE_ACCOUNT_FILE, scopes=SCOPES) | |
| # If you need to impersonate a user (optional, requires domain-wide delegation) | |
| # credentials = credentials.with_subject('user@yourdomain.com') | |
| service = build('drive', 'v3', credentials=credentials) | |
| print("Drive service authenticated.") | |
| return service | |
| def add_bold_text(paragraph, text): | |
| """ | |
| Add text to a paragraph, handling bold formatting (text wrapped in **). | |
| Removes the ** markers and applies bold formatting to the enclosed text. | |
| """ | |
| parts = re.split(r'(\*\*.+?\*\*)', text) | |
| for part in parts: | |
| if part.startswith('**') and part.endswith('**'): | |
| paragraph.add_run(part[2:-2]).bold = True | |
| else: | |
| paragraph.add_run(part) | |
| def process_table(doc, table_rows): | |
| """ | |
| Process a Markdown table and add it to the Word document. | |
| Handles tables with proper formatting and alignment. | |
| """ | |
| # if not table_rows or len(table_rows) < 3: # Need at least header, separator, and one data row | |
| # return | |
| # Clean up table rows and extract content | |
| cleaned_rows = [] | |
| for row in table_rows: | |
| # Remove leading/trailing pipes and split by pipes | |
| cells = [cell.strip() for cell in row.strip().strip('|').split('|')] | |
| # Process each cell for bold formatting | |
| processed_cells = [] | |
| for cell in cells: | |
| # Check if the entire cell is bold | |
| is_fully_bold = cell.startswith('**') and cell.endswith('**') and len(cell) >= 4 | |
| # Process bold formatting | |
| if is_fully_bold: | |
| # Remove the bold markers | |
| content = cell[2:-2].strip() | |
| processed_cells.append((content, True)) | |
| else: | |
| # Handle regular cell content | |
| processed_cells.append((cell, False)) | |
| cleaned_rows.append(processed_cells) | |
| # Skip separator row (typically the second row) | |
| # Determine if second row is a separator (contains only -, :, and spaces) | |
| is_separator = all(cell[0].replace('-', '').replace(':', '').replace(' ', '') == '' | |
| for cell in cleaned_rows[1]) if len(cleaned_rows) > 1 else False | |
| # If we have a proper separator row, use the standard table format | |
| if is_separator: | |
| header_row = cleaned_rows[0] | |
| data_rows = cleaned_rows[2:] # Skip header and separator | |
| else: | |
| # If no separator, treat first row as header and rest as data | |
| header_row = cleaned_rows[0] | |
| data_rows = cleaned_rows[1:] | |
| # Determine number of columns | |
| num_cols = len(header_row) | |
| # Create the table | |
| table = doc.add_table(rows=1, cols=num_cols) | |
| table.style = 'Table Grid' | |
| # Add header row | |
| header_cells = table.rows[0].cells | |
| for i, (cell_content, is_bold) in enumerate(header_row): | |
| if i < len(header_cells): | |
| paragraph = header_cells[i].paragraphs[0] | |
| paragraph.runs.clear() # Clear existing runs | |
| run = paragraph.add_run(cell_content) | |
| run.bold = True # Headers are always bold | |
| # Add data rows | |
| for row_data in data_rows: | |
| row_cells = table.add_row().cells | |
| for i, (cell_content, is_bold) in enumerate(row_data): | |
| if i < len(row_cells): | |
| paragraph = row_cells[i].paragraphs[0] | |
| paragraph.runs.clear() | |
| # Handle line breaks in cell content | |
| if '<br>' in cell_content: | |
| parts = cell_content.split('<br>') | |
| for j, part in enumerate(parts): | |
| part = part.strip() | |
| run = paragraph.add_run(part) | |
| run.bold = is_bold | |
| # Add line break between parts | |
| if j < len(parts) - 1: | |
| run.add_break() | |
| else: | |
| # Regular text | |
| run = paragraph.add_run(cell_content) | |
| run.bold = is_bold | |
| def process_non_table_line(doc, stripped_line): | |
| """ | |
| Process a non-table line of markdown text, handling various formatting options. | |
| """ | |
| # Handle headings | |
| if stripped_line.startswith('#'): | |
| heading_level = 0 | |
| for char in stripped_line: | |
| if char == '#': | |
| heading_level += 1 | |
| else: | |
| break | |
| heading_text = stripped_line[heading_level:].strip() | |
| if 1 <= heading_level <= 6: | |
| # Map heading levels 4-6 to Word's maximum supported level 3 | |
| mapped_level = min(heading_level, 3) | |
| heading = doc.add_paragraph() | |
| heading.style = f'Heading {mapped_level}' | |
| add_bold_text(heading, heading_text) | |
| return | |
| # Handle unordered lists | |
| if stripped_line.startswith(('* ', '- ')): | |
| list_text = stripped_line[2:].strip() | |
| paragraph = doc.add_paragraph(style='List Bullet') | |
| add_bold_text(paragraph, list_text) | |
| return | |
| # Handle ordered lists | |
| if re.match(r'^\d+\.\s', stripped_line): | |
| list_text = re.sub(r'^\d+\.\s', '', stripped_line) | |
| paragraph = doc.add_paragraph(style='List Number') | |
| add_bold_text(paragraph, list_text) | |
| return | |
| # Handle horizontal rules | |
| if stripped_line in ('---', '***'): | |
| doc.add_paragraph().add_run().add_break() | |
| return | |
| # Handle regular paragraphs | |
| paragraph = doc.add_paragraph() | |
| add_bold_text(paragraph, stripped_line) | |
| def convert_df_to_text(df): | |
| """ | |
| Convert a DataFrame to a formatted Markdown table string. | |
| Ensures consistent table formatting with pipe characters. | |
| """ | |
| if df is None or df.empty: | |
| return "" | |
| # Create header with proper markdown formatting | |
| header = " | ".join(f"**{str(col)}**" for col in df.columns) | |
| # Create separator | |
| separator = "|".join("-" * len(col) for col in df.columns) | |
| # Create rows with proper formatting | |
| rows = [] | |
| for _, row in df.iterrows(): | |
| formatted_row = " | ".join(str(val) for val in row.items()) | |
| rows.append(f"| {formatted_row} |") | |
| # Combine all parts with proper markdown table formatting | |
| table = f"| {header} |\n|{separator}|\n" + "\n".join(rows) | |
| return table | |
| def convert_md_to_docx(md_content): | |
| """ | |
| Convert Markdown content to a DOCX document using python-docx. | |
| Handles headings, lists, tables, and text formatting. | |
| """ | |
| doc = Document() | |
| lines = md_content.split('\n') | |
| in_table = False | |
| table_rows = [] | |
| line_index = 0 | |
| while line_index < len(lines): | |
| line = lines[line_index] | |
| stripped_line = line.strip() | |
| # Handle tables | |
| if stripped_line.startswith('|') and stripped_line.endswith('|'): | |
| if not in_table: | |
| in_table = True | |
| table_rows = [] # Reset table rows when starting a new table | |
| table_rows.append(stripped_line) | |
| line_index += 1 | |
| continue | |
| # Handle end of table | |
| if in_table: | |
| # Process the table when we hit non-table content | |
| if table_rows: | |
| process_table(doc, table_rows) | |
| table_rows = [] | |
| in_table = False | |
| # Only add paragraph if we're not at an empty line | |
| if not stripped_line: | |
| doc.add_paragraph() | |
| # Handle non-table content | |
| if stripped_line: | |
| process_non_table_line(doc, stripped_line) | |
| elif not in_table: # Only add empty paragraph if we're not in a table | |
| doc.add_paragraph() | |
| line_index += 1 | |
| # Handle any remaining table at the end of the document | |
| if in_table and table_rows: | |
| process_table(doc, table_rows) | |
| # Convert to bytes | |
| output = io.BytesIO() | |
| doc.save(output) | |
| output.seek(0) | |
| return output.getvalue() | |
| def determine_mime_type(filename): | |
| """Determine MIME type based on file extension for Google Drive conversion.""" | |
| print(f"Determining MIME type for {filename}...") | |
| ext = os.path.splitext(filename)[1].lower() | |
| if ext == '.md': | |
| # Convert Markdown to Google Docs by uploading as DOCX | |
| mime_type = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' | |
| drive_mime = 'application/vnd.google-apps.document' | |
| elif ext == '.txt': | |
| # Convert plain text to Google Docs | |
| mime_type = 'text/plain' | |
| drive_mime = 'application/vnd.google-apps.document' | |
| elif ext == '.csv': | |
| # Convert CSV to Google Sheets | |
| mime_type = 'text/csv' | |
| drive_mime = 'application/vnd.google-apps.spreadsheet' | |
| else: | |
| # Default to binary upload without conversion | |
| mime_type = 'application/octet-stream' | |
| drive_mime = None | |
| print(f"MIME type determined: {mime_type}, Drive MIME: {drive_mime}") | |
| return mime_type, drive_mime | |
| def upload_content(service, folder_id, filename, content): | |
| """ | |
| Upload content directly to Google Drive as a Google Doc or Sheet. | |
| """ | |
| print(f"Initiating upload process for {filename} to Google Drive...") | |
| mime_type, drive_mime = determine_mime_type(filename) | |
| print(f"Determined MIME type: {mime_type}, Drive MIME: {drive_mime}") | |
| if filename.endswith('.md'): | |
| content = convert_md_to_docx(content) | |
| if content is None: | |
| print("Failed to convert Markdown to DOCX.") | |
| return | |
| # Prepare file metadata | |
| file_metadata = { | |
| 'name': os.path.splitext(filename)[0], | |
| 'parents': [folder_id], | |
| } | |
| if drive_mime: | |
| file_metadata['mimeType'] = drive_mime | |
| print(f"Prepared file metadata: {file_metadata}") | |
| try: | |
| # Check if content is a JSON string and parse it | |
| if isinstance(content, str): | |
| try: | |
| parsed_content = json.loads(content) | |
| if isinstance(parsed_content, (dict, list)): | |
| content = parsed_content | |
| except json.JSONDecodeError: | |
| pass | |
| # Prepare media based on content type | |
| if isinstance(content, pd.DataFrame): | |
| # For DataFrame, convert to CSV string | |
| csv_content = content.to_csv(index=False) | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(csv_content.encode('utf-8')), | |
| mimetype=mime_type, | |
| resumable=True | |
| ) | |
| elif isinstance(content, str): | |
| # For string content (non-markdown) | |
| if mime_type == 'text/plain': | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(content.encode('utf-8')), | |
| mimetype=mime_type, | |
| resumable=True | |
| ) | |
| else: | |
| # For binary content (e.g., DOCX) | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(content if isinstance(content, bytes) else content.encode('utf-8')), | |
| mimetype=mime_type, | |
| resumable=True | |
| ) | |
| elif isinstance(content, bytes): | |
| # For binary content (already bytes) | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(content), | |
| mimetype=mime_type, | |
| resumable=True | |
| ) | |
| elif isinstance(content, dict): | |
| # Convert dict to JSON string | |
| json_content = json.dumps(content, indent=2) | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(json_content.encode('utf-8')), | |
| mimetype='application/json', | |
| resumable=True | |
| ) | |
| elif isinstance(content, list): | |
| # Convert list to a string representation | |
| list_content = "\n".join(str(item) for item in content) | |
| media = MediaIoBaseUpload( | |
| io.BytesIO(list_content.encode('utf-8')), | |
| mimetype='text/plain', | |
| resumable=True | |
| ) | |
| else: | |
| raise ValueError("Unsupported content type for upload.") | |
| print(f"Prepared media for upload: {media}") | |
| file = service.files().create( | |
| body=file_metadata, | |
| media_body=media, | |
| fields='id' | |
| ).execute() | |
| print(f"Successfully uploaded {filename} to Google Drive with ID: {file.get('id')}") | |
| except Exception as e: | |
| print(f"An error occurred while uploading {filename}: {e}") | |
| raise e | |
| def upload_to_gdrive(project_name, progress=gr.Progress()): | |
| print("Starting upload to Google Drive...") | |
| service = authenticate_drive_service() | |
| project = state.quotation_project | |
| if project is None: | |
| print("Error: quotation_project is not set") | |
| parent_folder_id = DRIVE_FOLDER_ID | |
| if not parent_folder_id: | |
| return "Drive folder ID is not set." | |
| if not project_name: | |
| project_name = "Final Quotation" | |
| folder_metadata = { | |
| 'name': f'{project_name}_{datetime.now().strftime("%y%m%d_%H%M%S")}', | |
| 'mimeType': 'application/vnd.google-apps.folder', | |
| 'parents': [parent_folder_id] | |
| } | |
| subfolder = service.files().create(body=folder_metadata, fields='id').execute() | |
| subfolder_id = subfolder.get('id') | |
| print(f"Created subfolder with ID: {subfolder_id}") | |
| progress(0.1, "Created subfolder and preparing files to upload.") | |
| try: | |
| # Define the attributes to upload with appropriate file extensions | |
| attributes_to_upload = { | |
| "generated_prd": "PRD.md", | |
| "generated_plan_test_components": "plan_test_components.md", | |
| "generated_page_dev_components": "page_dev_components.md", | |
| "generated_engage_dev_components": "engage_dev_components.md", | |
| "generated_intent_list": "intent_list.md", | |
| "reformatted_dev_components": "reformatted_dev_components.md", | |
| "generated_mvp_prd": "MVP_prd.md", | |
| "combined_cost_summary": "cost_summary.md", | |
| "generated_BD_SOW": "BD_SOW.md", | |
| "generated_Tech_SOW": "Tech_SOW.md" | |
| } | |
| # Upload each attribute using the upload_content function | |
| for attr, filename in attributes_to_upload.items(): | |
| try: | |
| content = getattr(project, attr, None) | |
| print(f"Uploading {attr} with filename {filename}...") | |
| if content: | |
| # Convert list of dicts to DataFrame if necessary | |
| if isinstance(content, list) and all(isinstance(i, dict) for i in content): | |
| content = pd.DataFrame(content) | |
| # Handle JSON-like content for generated_Tech_SOW | |
| if attr == "generated_Tech_SOW": | |
| if isinstance(content, str): | |
| try: | |
| parsed_result = json.loads(content) | |
| except json.JSONDecodeError as e: | |
| print(f"Error parsing JSON for {attr}: {e}") | |
| continue # Skip this attribute if parsing fails | |
| # Convert the JSON content to a formatted markdown string | |
| if isinstance(parsed_result, dict): | |
| content = "\n\n".join(f"## {key.replace('_', ' ').title()}\n{value}" for key, value in parsed_result.items()) | |
| print(f"Content for {filename}: {content}") | |
| upload_content(service, subfolder_id, filename, content) | |
| progress(0.1, f"Uploaded {filename}") | |
| else: | |
| print(f"No content found for {attr}") | |
| except Exception as e: | |
| print(f"Failed to upload {filename}: {e}") | |
| # Handle the mandays csv results | |
| try: | |
| if project.mandays_results: | |
| for result in project.mandays_results: | |
| function_name = result['function_name'] | |
| result_data = result['result'] | |
| # Ensure result_data is a list of dictionaries | |
| if isinstance(result_data, dict) and function_name in result_data: | |
| actual_data = result_data[function_name] | |
| df = pd.DataFrame(actual_data) | |
| if not df.empty: | |
| # Ensure CSV content is correctly formatted | |
| csv_content = df.to_csv(index=False) | |
| # Upload the CSV content to Google Drive | |
| upload_content(service, subfolder_id, f"{function_name}.csv", csv_content) | |
| progress(0.1, f"Uploaded {function_name}.csv") | |
| else: | |
| print(f"Unexpected result data format for {function_name}.") | |
| if project.mvp_mandays_results: | |
| for result in project.mvp_mandays_results: | |
| function_name = result['function_name'] | |
| result_data = result['result'] | |
| # Check if result_data is a dictionary | |
| if isinstance(result_data, dict): | |
| for section_name, records in result_data.items(): | |
| if isinstance(records, list): | |
| df = pd.DataFrame(records) | |
| if not df.empty: | |
| # Ensure CSV content is correctly formatted | |
| csv_content = df.to_csv(index=False) | |
| # Upload the CSV content to Google Drive | |
| upload_content(service, subfolder_id, f"{function_name}_{section_name}.csv", csv_content) | |
| progress(0.1, f"Uploaded {function_name}_{section_name}.csv") | |
| else: | |
| print(f"Unexpected data format for {section_name} in {function_name}.") | |
| else: | |
| print(f"Unexpected result data format for {function_name}.") | |
| except Exception as e: | |
| print(f"Failed to upload mandays results: {e}") | |
| folder_url = f"https://drive.google.com/drive/folders/{subfolder_id}" | |
| progress(1.0, "Upload complete") | |
| return f"All files uploaded successfully. Folder URL: {folder_url}" | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return f"Failed to upload files. Error: {e}" | |
| def upload_combined_content(service, subfolder_id, combined_cost_summary, generated_plan_test_components, reformatted_dev_components, generated_intent_list): | |
| """ | |
| Combine various content sections into a single Markdown document and upload it. | |
| """ | |
| # Combine the content into a single Markdown string | |
| combined_content = f""" | |
| # Final Cost Summary | |
| {combined_cost_summary} | |
| # Final Planning and Testing Component | |
| {generated_plan_test_components} | |
| # Final Development Component | |
| {reformatted_dev_components} | |
| # Final Intent List | |
| {generated_intent_list} | |
| """ | |
| # Upload the combined content as a Markdown file | |
| upload_content(service, subfolder_id, "quotation_document.md", combined_content) |