import os import io import markdown import pandas as pd from datetime import datetime from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseUpload from google.oauth2 import service_account from googleapiclient.errors import HttpError from docx import Document import re from datetime import datetime import gradio as gr from state import state import json # Path to your Service Account key file SERVICE_ACCOUNT_FILE = 'gdrive_service_account.json' # Define the scopes. For full Drive access, use 'https://www.googleapis.com/auth/drive' # For access limited to files created by the app, use 'https://www.googleapis.com/auth/drive.file' SCOPES = ['https://www.googleapis.com/auth/drive'] # ID of the Google Drive folder shared with the Service Account DRIVE_FOLDER_ID = '1ksgImxpdYor73BkYnp60oAbRiE1nHAyz' def authenticate_drive_service(): """Authenticate using Service Account and return the Drive service.""" print("Authenticating Drive service...") credentials = service_account.Credentials.from_service_account_file( SERVICE_ACCOUNT_FILE, scopes=SCOPES) # If you need to impersonate a user (optional, requires domain-wide delegation) # credentials = credentials.with_subject('user@yourdomain.com') service = build('drive', 'v3', credentials=credentials) print("Drive service authenticated.") return service def add_bold_text(paragraph, text): """ Add text to a paragraph, handling bold formatting (text wrapped in **). Removes the ** markers and applies bold formatting to the enclosed text. """ parts = re.split(r'(\*\*.+?\*\*)', text) for part in parts: if part.startswith('**') and part.endswith('**'): paragraph.add_run(part[2:-2]).bold = True else: paragraph.add_run(part) def process_table(doc, table_rows): """ Process a Markdown table and add it to the Word document. Handles tables with proper formatting and alignment. """ # if not table_rows or len(table_rows) < 3: # Need at least header, separator, and one data row # return # Clean up table rows and extract content cleaned_rows = [] for row in table_rows: # Remove leading/trailing pipes and split by pipes cells = [cell.strip() for cell in row.strip().strip('|').split('|')] # Process each cell for bold formatting processed_cells = [] for cell in cells: # Check if the entire cell is bold is_fully_bold = cell.startswith('**') and cell.endswith('**') and len(cell) >= 4 # Process bold formatting if is_fully_bold: # Remove the bold markers content = cell[2:-2].strip() processed_cells.append((content, True)) else: # Handle regular cell content processed_cells.append((cell, False)) cleaned_rows.append(processed_cells) # Skip separator row (typically the second row) # Determine if second row is a separator (contains only -, :, and spaces) is_separator = all(cell[0].replace('-', '').replace(':', '').replace(' ', '') == '' for cell in cleaned_rows[1]) if len(cleaned_rows) > 1 else False # If we have a proper separator row, use the standard table format if is_separator: header_row = cleaned_rows[0] data_rows = cleaned_rows[2:] # Skip header and separator else: # If no separator, treat first row as header and rest as data header_row = cleaned_rows[0] data_rows = cleaned_rows[1:] # Determine number of columns num_cols = len(header_row) # Create the table table = doc.add_table(rows=1, cols=num_cols) table.style = 'Table Grid' # Add header row header_cells = table.rows[0].cells for i, (cell_content, is_bold) in enumerate(header_row): if i < len(header_cells): paragraph = header_cells[i].paragraphs[0] paragraph.runs.clear() # Clear existing runs run = paragraph.add_run(cell_content) run.bold = True # Headers are always bold # Add data rows for row_data in data_rows: row_cells = table.add_row().cells for i, (cell_content, is_bold) in enumerate(row_data): if i < len(row_cells): paragraph = row_cells[i].paragraphs[0] paragraph.runs.clear() # Handle line breaks in cell content if '
' in cell_content: parts = cell_content.split('
') for j, part in enumerate(parts): part = part.strip() run = paragraph.add_run(part) run.bold = is_bold # Add line break between parts if j < len(parts) - 1: run.add_break() else: # Regular text run = paragraph.add_run(cell_content) run.bold = is_bold def process_non_table_line(doc, stripped_line): """ Process a non-table line of markdown text, handling various formatting options. """ # Handle headings if stripped_line.startswith('#'): heading_level = 0 for char in stripped_line: if char == '#': heading_level += 1 else: break heading_text = stripped_line[heading_level:].strip() if 1 <= heading_level <= 6: # Map heading levels 4-6 to Word's maximum supported level 3 mapped_level = min(heading_level, 3) heading = doc.add_paragraph() heading.style = f'Heading {mapped_level}' add_bold_text(heading, heading_text) return # Handle unordered lists if stripped_line.startswith(('* ', '- ')): list_text = stripped_line[2:].strip() paragraph = doc.add_paragraph(style='List Bullet') add_bold_text(paragraph, list_text) return # Handle ordered lists if re.match(r'^\d+\.\s', stripped_line): list_text = re.sub(r'^\d+\.\s', '', stripped_line) paragraph = doc.add_paragraph(style='List Number') add_bold_text(paragraph, list_text) return # Handle horizontal rules if stripped_line in ('---', '***'): doc.add_paragraph().add_run().add_break() return # Handle regular paragraphs paragraph = doc.add_paragraph() add_bold_text(paragraph, stripped_line) def convert_df_to_text(df): """ Convert a DataFrame to a formatted Markdown table string. Ensures consistent table formatting with pipe characters. """ if df is None or df.empty: return "" # Create header with proper markdown formatting header = " | ".join(f"**{str(col)}**" for col in df.columns) # Create separator separator = "|".join("-" * len(col) for col in df.columns) # Create rows with proper formatting rows = [] for _, row in df.iterrows(): formatted_row = " | ".join(str(val) for val in row.items()) rows.append(f"| {formatted_row} |") # Combine all parts with proper markdown table formatting table = f"| {header} |\n|{separator}|\n" + "\n".join(rows) return table def convert_md_to_docx(md_content): """ Convert Markdown content to a DOCX document using python-docx. Handles headings, lists, tables, and text formatting. """ doc = Document() lines = md_content.split('\n') in_table = False table_rows = [] line_index = 0 while line_index < len(lines): line = lines[line_index] stripped_line = line.strip() # Handle tables if stripped_line.startswith('|') and stripped_line.endswith('|'): if not in_table: in_table = True table_rows = [] # Reset table rows when starting a new table table_rows.append(stripped_line) line_index += 1 continue # Handle end of table if in_table: # Process the table when we hit non-table content if table_rows: process_table(doc, table_rows) table_rows = [] in_table = False # Only add paragraph if we're not at an empty line if not stripped_line: doc.add_paragraph() # Handle non-table content if stripped_line: process_non_table_line(doc, stripped_line) elif not in_table: # Only add empty paragraph if we're not in a table doc.add_paragraph() line_index += 1 # Handle any remaining table at the end of the document if in_table and table_rows: process_table(doc, table_rows) # Convert to bytes output = io.BytesIO() doc.save(output) output.seek(0) return output.getvalue() def determine_mime_type(filename): """Determine MIME type based on file extension for Google Drive conversion.""" print(f"Determining MIME type for {filename}...") ext = os.path.splitext(filename)[1].lower() if ext == '.md': # Convert Markdown to Google Docs by uploading as DOCX mime_type = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' drive_mime = 'application/vnd.google-apps.document' elif ext == '.txt': # Convert plain text to Google Docs mime_type = 'text/plain' drive_mime = 'application/vnd.google-apps.document' elif ext == '.csv': # Convert CSV to Google Sheets mime_type = 'text/csv' drive_mime = 'application/vnd.google-apps.spreadsheet' else: # Default to binary upload without conversion mime_type = 'application/octet-stream' drive_mime = None print(f"MIME type determined: {mime_type}, Drive MIME: {drive_mime}") return mime_type, drive_mime def upload_content(service, folder_id, filename, content): """ Upload content directly to Google Drive as a Google Doc or Sheet. """ print(f"Initiating upload process for {filename} to Google Drive...") mime_type, drive_mime = determine_mime_type(filename) print(f"Determined MIME type: {mime_type}, Drive MIME: {drive_mime}") if filename.endswith('.md'): content = convert_md_to_docx(content) if content is None: print("Failed to convert Markdown to DOCX.") return # Prepare file metadata file_metadata = { 'name': os.path.splitext(filename)[0], 'parents': [folder_id], } if drive_mime: file_metadata['mimeType'] = drive_mime print(f"Prepared file metadata: {file_metadata}") try: # Check if content is a JSON string and parse it if isinstance(content, str): try: parsed_content = json.loads(content) if isinstance(parsed_content, (dict, list)): content = parsed_content except json.JSONDecodeError: pass # Prepare media based on content type if isinstance(content, pd.DataFrame): # For DataFrame, convert to CSV string csv_content = content.to_csv(index=False) media = MediaIoBaseUpload( io.BytesIO(csv_content.encode('utf-8')), mimetype=mime_type, resumable=True ) elif isinstance(content, str): # For string content (non-markdown) if mime_type == 'text/plain': media = MediaIoBaseUpload( io.BytesIO(content.encode('utf-8')), mimetype=mime_type, resumable=True ) else: # For binary content (e.g., DOCX) media = MediaIoBaseUpload( io.BytesIO(content if isinstance(content, bytes) else content.encode('utf-8')), mimetype=mime_type, resumable=True ) elif isinstance(content, bytes): # For binary content (already bytes) media = MediaIoBaseUpload( io.BytesIO(content), mimetype=mime_type, resumable=True ) elif isinstance(content, dict): # Convert dict to JSON string json_content = json.dumps(content, indent=2) media = MediaIoBaseUpload( io.BytesIO(json_content.encode('utf-8')), mimetype='application/json', resumable=True ) elif isinstance(content, list): # Convert list to a string representation list_content = "\n".join(str(item) for item in content) media = MediaIoBaseUpload( io.BytesIO(list_content.encode('utf-8')), mimetype='text/plain', resumable=True ) else: raise ValueError("Unsupported content type for upload.") print(f"Prepared media for upload: {media}") file = service.files().create( body=file_metadata, media_body=media, fields='id' ).execute() print(f"Successfully uploaded {filename} to Google Drive with ID: {file.get('id')}") except Exception as e: print(f"An error occurred while uploading {filename}: {e}") raise e def upload_to_gdrive(project_name, progress=gr.Progress()): print("Starting upload to Google Drive...") service = authenticate_drive_service() project = state.quotation_project if project is None: print("Error: quotation_project is not set") parent_folder_id = DRIVE_FOLDER_ID if not parent_folder_id: return "Drive folder ID is not set." if not project_name: project_name = "Final Quotation" folder_metadata = { 'name': f'{project_name}_{datetime.now().strftime("%y%m%d_%H%M%S")}', 'mimeType': 'application/vnd.google-apps.folder', 'parents': [parent_folder_id] } subfolder = service.files().create(body=folder_metadata, fields='id').execute() subfolder_id = subfolder.get('id') print(f"Created subfolder with ID: {subfolder_id}") progress(0.1, "Created subfolder and preparing files to upload.") try: # Define the attributes to upload with appropriate file extensions attributes_to_upload = { "generated_prd": "PRD.md", "generated_plan_test_components": "plan_test_components.md", "generated_page_dev_components": "page_dev_components.md", "generated_engage_dev_components": "engage_dev_components.md", "generated_intent_list": "intent_list.md", "reformatted_dev_components": "reformatted_dev_components.md", "generated_mvp_prd": "MVP_prd.md", "combined_cost_summary": "cost_summary.md", "generated_BD_SOW": "BD_SOW.md", "generated_Tech_SOW": "Tech_SOW.md" } # Upload each attribute using the upload_content function for attr, filename in attributes_to_upload.items(): try: content = getattr(project, attr, None) print(f"Uploading {attr} with filename {filename}...") if content: # Convert list of dicts to DataFrame if necessary if isinstance(content, list) and all(isinstance(i, dict) for i in content): content = pd.DataFrame(content) # Handle JSON-like content for generated_Tech_SOW if attr == "generated_Tech_SOW": if isinstance(content, str): try: parsed_result = json.loads(content) except json.JSONDecodeError as e: print(f"Error parsing JSON for {attr}: {e}") continue # Skip this attribute if parsing fails # Convert the JSON content to a formatted markdown string if isinstance(parsed_result, dict): content = "\n\n".join(f"## {key.replace('_', ' ').title()}\n{value}" for key, value in parsed_result.items()) print(f"Content for {filename}: {content}") upload_content(service, subfolder_id, filename, content) progress(0.1, f"Uploaded {filename}") else: print(f"No content found for {attr}") except Exception as e: print(f"Failed to upload {filename}: {e}") # Handle the mandays csv results try: if project.mandays_results: for result in project.mandays_results: function_name = result['function_name'] result_data = result['result'] # Ensure result_data is a list of dictionaries if isinstance(result_data, dict) and function_name in result_data: actual_data = result_data[function_name] df = pd.DataFrame(actual_data) if not df.empty: # Ensure CSV content is correctly formatted csv_content = df.to_csv(index=False) # Upload the CSV content to Google Drive upload_content(service, subfolder_id, f"{function_name}.csv", csv_content) progress(0.1, f"Uploaded {function_name}.csv") else: print(f"Unexpected result data format for {function_name}.") if project.mvp_mandays_results: for result in project.mvp_mandays_results: function_name = result['function_name'] result_data = result['result'] # Check if result_data is a dictionary if isinstance(result_data, dict): for section_name, records in result_data.items(): if isinstance(records, list): df = pd.DataFrame(records) if not df.empty: # Ensure CSV content is correctly formatted csv_content = df.to_csv(index=False) # Upload the CSV content to Google Drive upload_content(service, subfolder_id, f"{function_name}_{section_name}.csv", csv_content) progress(0.1, f"Uploaded {function_name}_{section_name}.csv") else: print(f"Unexpected data format for {section_name} in {function_name}.") else: print(f"Unexpected result data format for {function_name}.") except Exception as e: print(f"Failed to upload mandays results: {e}") folder_url = f"https://drive.google.com/drive/folders/{subfolder_id}" progress(1.0, "Upload complete") return f"All files uploaded successfully. Folder URL: {folder_url}" except Exception as e: print(f"An error occurred: {e}") return f"Failed to upload files. Error: {e}" def upload_combined_content(service, subfolder_id, combined_cost_summary, generated_plan_test_components, reformatted_dev_components, generated_intent_list): """ Combine various content sections into a single Markdown document and upload it. """ # Combine the content into a single Markdown string combined_content = f""" # Final Cost Summary {combined_cost_summary} # Final Planning and Testing Component {generated_plan_test_components} # Final Development Component {reformatted_dev_components} # Final Intent List {generated_intent_list} """ # Upload the combined content as a Markdown file upload_content(service, subfolder_id, "quotation_document.md", combined_content)