import os
import io
import markdown
import pandas as pd
from datetime import datetime
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload
from google.oauth2 import service_account
from googleapiclient.errors import HttpError
from docx import Document
import re
from datetime import datetime
import gradio as gr
from state import state
import json
# Path to your Service Account key file
SERVICE_ACCOUNT_FILE = 'gdrive_service_account.json'
# Define the scopes. For full Drive access, use 'https://www.googleapis.com/auth/drive'
# For access limited to files created by the app, use 'https://www.googleapis.com/auth/drive.file'
SCOPES = ['https://www.googleapis.com/auth/drive']
# ID of the Google Drive folder shared with the Service Account
DRIVE_FOLDER_ID = '1ksgImxpdYor73BkYnp60oAbRiE1nHAyz'
def authenticate_drive_service():
"""Authenticate using Service Account and return the Drive service."""
print("Authenticating Drive service...")
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES)
# If you need to impersonate a user (optional, requires domain-wide delegation)
# credentials = credentials.with_subject('user@yourdomain.com')
service = build('drive', 'v3', credentials=credentials)
print("Drive service authenticated.")
return service
def add_bold_text(paragraph, text):
"""
Add text to a paragraph, handling bold formatting (text wrapped in **).
Removes the ** markers and applies bold formatting to the enclosed text.
"""
parts = re.split(r'(\*\*.+?\*\*)', text)
for part in parts:
if part.startswith('**') and part.endswith('**'):
paragraph.add_run(part[2:-2]).bold = True
else:
paragraph.add_run(part)
def process_table(doc, table_rows):
"""
Process a Markdown table and add it to the Word document.
Handles tables with proper formatting and alignment.
"""
# if not table_rows or len(table_rows) < 3: # Need at least header, separator, and one data row
# return
# Clean up table rows and extract content
cleaned_rows = []
for row in table_rows:
# Remove leading/trailing pipes and split by pipes
cells = [cell.strip() for cell in row.strip().strip('|').split('|')]
# Process each cell for bold formatting
processed_cells = []
for cell in cells:
# Check if the entire cell is bold
is_fully_bold = cell.startswith('**') and cell.endswith('**') and len(cell) >= 4
# Process bold formatting
if is_fully_bold:
# Remove the bold markers
content = cell[2:-2].strip()
processed_cells.append((content, True))
else:
# Handle regular cell content
processed_cells.append((cell, False))
cleaned_rows.append(processed_cells)
# Skip separator row (typically the second row)
# Determine if second row is a separator (contains only -, :, and spaces)
is_separator = all(cell[0].replace('-', '').replace(':', '').replace(' ', '') == ''
for cell in cleaned_rows[1]) if len(cleaned_rows) > 1 else False
# If we have a proper separator row, use the standard table format
if is_separator:
header_row = cleaned_rows[0]
data_rows = cleaned_rows[2:] # Skip header and separator
else:
# If no separator, treat first row as header and rest as data
header_row = cleaned_rows[0]
data_rows = cleaned_rows[1:]
# Determine number of columns
num_cols = len(header_row)
# Create the table
table = doc.add_table(rows=1, cols=num_cols)
table.style = 'Table Grid'
# Add header row
header_cells = table.rows[0].cells
for i, (cell_content, is_bold) in enumerate(header_row):
if i < len(header_cells):
paragraph = header_cells[i].paragraphs[0]
paragraph.runs.clear() # Clear existing runs
run = paragraph.add_run(cell_content)
run.bold = True # Headers are always bold
# Add data rows
for row_data in data_rows:
row_cells = table.add_row().cells
for i, (cell_content, is_bold) in enumerate(row_data):
if i < len(row_cells):
paragraph = row_cells[i].paragraphs[0]
paragraph.runs.clear()
# Handle line breaks in cell content
if '
' in cell_content:
parts = cell_content.split('
')
for j, part in enumerate(parts):
part = part.strip()
run = paragraph.add_run(part)
run.bold = is_bold
# Add line break between parts
if j < len(parts) - 1:
run.add_break()
else:
# Regular text
run = paragraph.add_run(cell_content)
run.bold = is_bold
def process_non_table_line(doc, stripped_line):
"""
Process a non-table line of markdown text, handling various formatting options.
"""
# Handle headings
if stripped_line.startswith('#'):
heading_level = 0
for char in stripped_line:
if char == '#':
heading_level += 1
else:
break
heading_text = stripped_line[heading_level:].strip()
if 1 <= heading_level <= 6:
# Map heading levels 4-6 to Word's maximum supported level 3
mapped_level = min(heading_level, 3)
heading = doc.add_paragraph()
heading.style = f'Heading {mapped_level}'
add_bold_text(heading, heading_text)
return
# Handle unordered lists
if stripped_line.startswith(('* ', '- ')):
list_text = stripped_line[2:].strip()
paragraph = doc.add_paragraph(style='List Bullet')
add_bold_text(paragraph, list_text)
return
# Handle ordered lists
if re.match(r'^\d+\.\s', stripped_line):
list_text = re.sub(r'^\d+\.\s', '', stripped_line)
paragraph = doc.add_paragraph(style='List Number')
add_bold_text(paragraph, list_text)
return
# Handle horizontal rules
if stripped_line in ('---', '***'):
doc.add_paragraph().add_run().add_break()
return
# Handle regular paragraphs
paragraph = doc.add_paragraph()
add_bold_text(paragraph, stripped_line)
def convert_df_to_text(df):
"""
Convert a DataFrame to a formatted Markdown table string.
Ensures consistent table formatting with pipe characters.
"""
if df is None or df.empty:
return ""
# Create header with proper markdown formatting
header = " | ".join(f"**{str(col)}**" for col in df.columns)
# Create separator
separator = "|".join("-" * len(col) for col in df.columns)
# Create rows with proper formatting
rows = []
for _, row in df.iterrows():
formatted_row = " | ".join(str(val) for val in row.items())
rows.append(f"| {formatted_row} |")
# Combine all parts with proper markdown table formatting
table = f"| {header} |\n|{separator}|\n" + "\n".join(rows)
return table
def convert_md_to_docx(md_content):
"""
Convert Markdown content to a DOCX document using python-docx.
Handles headings, lists, tables, and text formatting.
"""
doc = Document()
lines = md_content.split('\n')
in_table = False
table_rows = []
line_index = 0
while line_index < len(lines):
line = lines[line_index]
stripped_line = line.strip()
# Handle tables
if stripped_line.startswith('|') and stripped_line.endswith('|'):
if not in_table:
in_table = True
table_rows = [] # Reset table rows when starting a new table
table_rows.append(stripped_line)
line_index += 1
continue
# Handle end of table
if in_table:
# Process the table when we hit non-table content
if table_rows:
process_table(doc, table_rows)
table_rows = []
in_table = False
# Only add paragraph if we're not at an empty line
if not stripped_line:
doc.add_paragraph()
# Handle non-table content
if stripped_line:
process_non_table_line(doc, stripped_line)
elif not in_table: # Only add empty paragraph if we're not in a table
doc.add_paragraph()
line_index += 1
# Handle any remaining table at the end of the document
if in_table and table_rows:
process_table(doc, table_rows)
# Convert to bytes
output = io.BytesIO()
doc.save(output)
output.seek(0)
return output.getvalue()
def determine_mime_type(filename):
"""Determine MIME type based on file extension for Google Drive conversion."""
print(f"Determining MIME type for {filename}...")
ext = os.path.splitext(filename)[1].lower()
if ext == '.md':
# Convert Markdown to Google Docs by uploading as DOCX
mime_type = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
drive_mime = 'application/vnd.google-apps.document'
elif ext == '.txt':
# Convert plain text to Google Docs
mime_type = 'text/plain'
drive_mime = 'application/vnd.google-apps.document'
elif ext == '.csv':
# Convert CSV to Google Sheets
mime_type = 'text/csv'
drive_mime = 'application/vnd.google-apps.spreadsheet'
else:
# Default to binary upload without conversion
mime_type = 'application/octet-stream'
drive_mime = None
print(f"MIME type determined: {mime_type}, Drive MIME: {drive_mime}")
return mime_type, drive_mime
def upload_content(service, folder_id, filename, content):
"""
Upload content directly to Google Drive as a Google Doc or Sheet.
"""
print(f"Initiating upload process for {filename} to Google Drive...")
mime_type, drive_mime = determine_mime_type(filename)
print(f"Determined MIME type: {mime_type}, Drive MIME: {drive_mime}")
if filename.endswith('.md'):
content = convert_md_to_docx(content)
if content is None:
print("Failed to convert Markdown to DOCX.")
return
# Prepare file metadata
file_metadata = {
'name': os.path.splitext(filename)[0],
'parents': [folder_id],
}
if drive_mime:
file_metadata['mimeType'] = drive_mime
print(f"Prepared file metadata: {file_metadata}")
try:
# Check if content is a JSON string and parse it
if isinstance(content, str):
try:
parsed_content = json.loads(content)
if isinstance(parsed_content, (dict, list)):
content = parsed_content
except json.JSONDecodeError:
pass
# Prepare media based on content type
if isinstance(content, pd.DataFrame):
# For DataFrame, convert to CSV string
csv_content = content.to_csv(index=False)
media = MediaIoBaseUpload(
io.BytesIO(csv_content.encode('utf-8')),
mimetype=mime_type,
resumable=True
)
elif isinstance(content, str):
# For string content (non-markdown)
if mime_type == 'text/plain':
media = MediaIoBaseUpload(
io.BytesIO(content.encode('utf-8')),
mimetype=mime_type,
resumable=True
)
else:
# For binary content (e.g., DOCX)
media = MediaIoBaseUpload(
io.BytesIO(content if isinstance(content, bytes) else content.encode('utf-8')),
mimetype=mime_type,
resumable=True
)
elif isinstance(content, bytes):
# For binary content (already bytes)
media = MediaIoBaseUpload(
io.BytesIO(content),
mimetype=mime_type,
resumable=True
)
elif isinstance(content, dict):
# Convert dict to JSON string
json_content = json.dumps(content, indent=2)
media = MediaIoBaseUpload(
io.BytesIO(json_content.encode('utf-8')),
mimetype='application/json',
resumable=True
)
elif isinstance(content, list):
# Convert list to a string representation
list_content = "\n".join(str(item) for item in content)
media = MediaIoBaseUpload(
io.BytesIO(list_content.encode('utf-8')),
mimetype='text/plain',
resumable=True
)
else:
raise ValueError("Unsupported content type for upload.")
print(f"Prepared media for upload: {media}")
file = service.files().create(
body=file_metadata,
media_body=media,
fields='id'
).execute()
print(f"Successfully uploaded {filename} to Google Drive with ID: {file.get('id')}")
except Exception as e:
print(f"An error occurred while uploading {filename}: {e}")
raise e
def upload_to_gdrive(project_name, progress=gr.Progress()):
print("Starting upload to Google Drive...")
service = authenticate_drive_service()
project = state.quotation_project
if project is None:
print("Error: quotation_project is not set")
parent_folder_id = DRIVE_FOLDER_ID
if not parent_folder_id:
return "Drive folder ID is not set."
if not project_name:
project_name = "Final Quotation"
folder_metadata = {
'name': f'{project_name}_{datetime.now().strftime("%y%m%d_%H%M%S")}',
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_folder_id]
}
subfolder = service.files().create(body=folder_metadata, fields='id').execute()
subfolder_id = subfolder.get('id')
print(f"Created subfolder with ID: {subfolder_id}")
progress(0.1, "Created subfolder and preparing files to upload.")
try:
# Define the attributes to upload with appropriate file extensions
attributes_to_upload = {
"generated_prd": "PRD.md",
"generated_plan_test_components": "plan_test_components.md",
"generated_page_dev_components": "page_dev_components.md",
"generated_engage_dev_components": "engage_dev_components.md",
"generated_intent_list": "intent_list.md",
"reformatted_dev_components": "reformatted_dev_components.md",
"generated_mvp_prd": "MVP_prd.md",
"combined_cost_summary": "cost_summary.md",
"generated_BD_SOW": "BD_SOW.md",
"generated_Tech_SOW": "Tech_SOW.md"
}
# Upload each attribute using the upload_content function
for attr, filename in attributes_to_upload.items():
try:
content = getattr(project, attr, None)
print(f"Uploading {attr} with filename {filename}...")
if content:
# Convert list of dicts to DataFrame if necessary
if isinstance(content, list) and all(isinstance(i, dict) for i in content):
content = pd.DataFrame(content)
# Handle JSON-like content for generated_Tech_SOW
if attr == "generated_Tech_SOW":
if isinstance(content, str):
try:
parsed_result = json.loads(content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON for {attr}: {e}")
continue # Skip this attribute if parsing fails
# Convert the JSON content to a formatted markdown string
if isinstance(parsed_result, dict):
content = "\n\n".join(f"## {key.replace('_', ' ').title()}\n{value}" for key, value in parsed_result.items())
print(f"Content for {filename}: {content}")
upload_content(service, subfolder_id, filename, content)
progress(0.1, f"Uploaded {filename}")
else:
print(f"No content found for {attr}")
except Exception as e:
print(f"Failed to upload {filename}: {e}")
# Handle the mandays csv results
try:
if project.mandays_results:
for result in project.mandays_results:
function_name = result['function_name']
result_data = result['result']
# Ensure result_data is a list of dictionaries
if isinstance(result_data, dict) and function_name in result_data:
actual_data = result_data[function_name]
df = pd.DataFrame(actual_data)
if not df.empty:
# Ensure CSV content is correctly formatted
csv_content = df.to_csv(index=False)
# Upload the CSV content to Google Drive
upload_content(service, subfolder_id, f"{function_name}.csv", csv_content)
progress(0.1, f"Uploaded {function_name}.csv")
else:
print(f"Unexpected result data format for {function_name}.")
if project.mvp_mandays_results:
for result in project.mvp_mandays_results:
function_name = result['function_name']
result_data = result['result']
# Check if result_data is a dictionary
if isinstance(result_data, dict):
for section_name, records in result_data.items():
if isinstance(records, list):
df = pd.DataFrame(records)
if not df.empty:
# Ensure CSV content is correctly formatted
csv_content = df.to_csv(index=False)
# Upload the CSV content to Google Drive
upload_content(service, subfolder_id, f"{function_name}_{section_name}.csv", csv_content)
progress(0.1, f"Uploaded {function_name}_{section_name}.csv")
else:
print(f"Unexpected data format for {section_name} in {function_name}.")
else:
print(f"Unexpected result data format for {function_name}.")
except Exception as e:
print(f"Failed to upload mandays results: {e}")
folder_url = f"https://drive.google.com/drive/folders/{subfolder_id}"
progress(1.0, "Upload complete")
return f"All files uploaded successfully. Folder URL: {folder_url}"
except Exception as e:
print(f"An error occurred: {e}")
return f"Failed to upload files. Error: {e}"
def upload_combined_content(service, subfolder_id, combined_cost_summary, generated_plan_test_components, reformatted_dev_components, generated_intent_list):
"""
Combine various content sections into a single Markdown document and upload it.
"""
# Combine the content into a single Markdown string
combined_content = f"""
# Final Cost Summary
{combined_cost_summary}
# Final Planning and Testing Component
{generated_plan_test_components}
# Final Development Component
{reformatted_dev_components}
# Final Intent List
{generated_intent_list}
"""
# Upload the combined content as a Markdown file
upload_content(service, subfolder_id, "quotation_document.md", combined_content)