QuotationChatbot_v5 / google_drive.py
ICAS03
- fix Tech SOW againnnn
11c3743
import os
import io
import markdown
import pandas as pd
from datetime import datetime
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseUpload
from google.oauth2 import service_account
from googleapiclient.errors import HttpError
from docx import Document
import re
from datetime import datetime
import gradio as gr
from state import state
import json
# Path to your Service Account key file
SERVICE_ACCOUNT_FILE = 'gdrive_service_account.json'
# Define the scopes. For full Drive access, use 'https://www.googleapis.com/auth/drive'
# For access limited to files created by the app, use 'https://www.googleapis.com/auth/drive.file'
SCOPES = ['https://www.googleapis.com/auth/drive']
# ID of the Google Drive folder shared with the Service Account
DRIVE_FOLDER_ID = '1ksgImxpdYor73BkYnp60oAbRiE1nHAyz'
def authenticate_drive_service():
"""Authenticate using Service Account and return the Drive service."""
print("Authenticating Drive service...")
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES)
# If you need to impersonate a user (optional, requires domain-wide delegation)
# credentials = credentials.with_subject('user@yourdomain.com')
service = build('drive', 'v3', credentials=credentials)
print("Drive service authenticated.")
return service
def add_bold_text(paragraph, text):
"""
Add text to a paragraph, handling bold formatting (text wrapped in **).
Removes the ** markers and applies bold formatting to the enclosed text.
"""
parts = re.split(r'(\*\*.+?\*\*)', text)
for part in parts:
if part.startswith('**') and part.endswith('**'):
paragraph.add_run(part[2:-2]).bold = True
else:
paragraph.add_run(part)
def process_table(doc, table_rows):
"""
Process a Markdown table and add it to the Word document.
Handles tables with proper formatting and alignment.
"""
# if not table_rows or len(table_rows) < 3: # Need at least header, separator, and one data row
# return
# Clean up table rows and extract content
cleaned_rows = []
for row in table_rows:
# Remove leading/trailing pipes and split by pipes
cells = [cell.strip() for cell in row.strip().strip('|').split('|')]
# Process each cell for bold formatting
processed_cells = []
for cell in cells:
# Check if the entire cell is bold
is_fully_bold = cell.startswith('**') and cell.endswith('**') and len(cell) >= 4
# Process bold formatting
if is_fully_bold:
# Remove the bold markers
content = cell[2:-2].strip()
processed_cells.append((content, True))
else:
# Handle regular cell content
processed_cells.append((cell, False))
cleaned_rows.append(processed_cells)
# Skip separator row (typically the second row)
# Determine if second row is a separator (contains only -, :, and spaces)
is_separator = all(cell[0].replace('-', '').replace(':', '').replace(' ', '') == ''
for cell in cleaned_rows[1]) if len(cleaned_rows) > 1 else False
# If we have a proper separator row, use the standard table format
if is_separator:
header_row = cleaned_rows[0]
data_rows = cleaned_rows[2:] # Skip header and separator
else:
# If no separator, treat first row as header and rest as data
header_row = cleaned_rows[0]
data_rows = cleaned_rows[1:]
# Determine number of columns
num_cols = len(header_row)
# Create the table
table = doc.add_table(rows=1, cols=num_cols)
table.style = 'Table Grid'
# Add header row
header_cells = table.rows[0].cells
for i, (cell_content, is_bold) in enumerate(header_row):
if i < len(header_cells):
paragraph = header_cells[i].paragraphs[0]
paragraph.runs.clear() # Clear existing runs
run = paragraph.add_run(cell_content)
run.bold = True # Headers are always bold
# Add data rows
for row_data in data_rows:
row_cells = table.add_row().cells
for i, (cell_content, is_bold) in enumerate(row_data):
if i < len(row_cells):
paragraph = row_cells[i].paragraphs[0]
paragraph.runs.clear()
# Handle line breaks in cell content
if '<br>' in cell_content:
parts = cell_content.split('<br>')
for j, part in enumerate(parts):
part = part.strip()
run = paragraph.add_run(part)
run.bold = is_bold
# Add line break between parts
if j < len(parts) - 1:
run.add_break()
else:
# Regular text
run = paragraph.add_run(cell_content)
run.bold = is_bold
def process_non_table_line(doc, stripped_line):
"""
Process a non-table line of markdown text, handling various formatting options.
"""
# Handle headings
if stripped_line.startswith('#'):
heading_level = 0
for char in stripped_line:
if char == '#':
heading_level += 1
else:
break
heading_text = stripped_line[heading_level:].strip()
if 1 <= heading_level <= 6:
# Map heading levels 4-6 to Word's maximum supported level 3
mapped_level = min(heading_level, 3)
heading = doc.add_paragraph()
heading.style = f'Heading {mapped_level}'
add_bold_text(heading, heading_text)
return
# Handle unordered lists
if stripped_line.startswith(('* ', '- ')):
list_text = stripped_line[2:].strip()
paragraph = doc.add_paragraph(style='List Bullet')
add_bold_text(paragraph, list_text)
return
# Handle ordered lists
if re.match(r'^\d+\.\s', stripped_line):
list_text = re.sub(r'^\d+\.\s', '', stripped_line)
paragraph = doc.add_paragraph(style='List Number')
add_bold_text(paragraph, list_text)
return
# Handle horizontal rules
if stripped_line in ('---', '***'):
doc.add_paragraph().add_run().add_break()
return
# Handle regular paragraphs
paragraph = doc.add_paragraph()
add_bold_text(paragraph, stripped_line)
def convert_df_to_text(df):
"""
Convert a DataFrame to a formatted Markdown table string.
Ensures consistent table formatting with pipe characters.
"""
if df is None or df.empty:
return ""
# Create header with proper markdown formatting
header = " | ".join(f"**{str(col)}**" for col in df.columns)
# Create separator
separator = "|".join("-" * len(col) for col in df.columns)
# Create rows with proper formatting
rows = []
for _, row in df.iterrows():
formatted_row = " | ".join(str(val) for val in row.items())
rows.append(f"| {formatted_row} |")
# Combine all parts with proper markdown table formatting
table = f"| {header} |\n|{separator}|\n" + "\n".join(rows)
return table
def convert_md_to_docx(md_content):
"""
Convert Markdown content to a DOCX document using python-docx.
Handles headings, lists, tables, and text formatting.
"""
doc = Document()
lines = md_content.split('\n')
in_table = False
table_rows = []
line_index = 0
while line_index < len(lines):
line = lines[line_index]
stripped_line = line.strip()
# Handle tables
if stripped_line.startswith('|') and stripped_line.endswith('|'):
if not in_table:
in_table = True
table_rows = [] # Reset table rows when starting a new table
table_rows.append(stripped_line)
line_index += 1
continue
# Handle end of table
if in_table:
# Process the table when we hit non-table content
if table_rows:
process_table(doc, table_rows)
table_rows = []
in_table = False
# Only add paragraph if we're not at an empty line
if not stripped_line:
doc.add_paragraph()
# Handle non-table content
if stripped_line:
process_non_table_line(doc, stripped_line)
elif not in_table: # Only add empty paragraph if we're not in a table
doc.add_paragraph()
line_index += 1
# Handle any remaining table at the end of the document
if in_table and table_rows:
process_table(doc, table_rows)
# Convert to bytes
output = io.BytesIO()
doc.save(output)
output.seek(0)
return output.getvalue()
def determine_mime_type(filename):
"""Determine MIME type based on file extension for Google Drive conversion."""
print(f"Determining MIME type for {filename}...")
ext = os.path.splitext(filename)[1].lower()
if ext == '.md':
# Convert Markdown to Google Docs by uploading as DOCX
mime_type = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
drive_mime = 'application/vnd.google-apps.document'
elif ext == '.txt':
# Convert plain text to Google Docs
mime_type = 'text/plain'
drive_mime = 'application/vnd.google-apps.document'
elif ext == '.csv':
# Convert CSV to Google Sheets
mime_type = 'text/csv'
drive_mime = 'application/vnd.google-apps.spreadsheet'
else:
# Default to binary upload without conversion
mime_type = 'application/octet-stream'
drive_mime = None
print(f"MIME type determined: {mime_type}, Drive MIME: {drive_mime}")
return mime_type, drive_mime
def upload_content(service, folder_id, filename, content):
"""
Upload content directly to Google Drive as a Google Doc or Sheet.
"""
print(f"Initiating upload process for {filename} to Google Drive...")
mime_type, drive_mime = determine_mime_type(filename)
print(f"Determined MIME type: {mime_type}, Drive MIME: {drive_mime}")
if filename.endswith('.md'):
content = convert_md_to_docx(content)
if content is None:
print("Failed to convert Markdown to DOCX.")
return
# Prepare file metadata
file_metadata = {
'name': os.path.splitext(filename)[0],
'parents': [folder_id],
}
if drive_mime:
file_metadata['mimeType'] = drive_mime
print(f"Prepared file metadata: {file_metadata}")
try:
# Check if content is a JSON string and parse it
if isinstance(content, str):
try:
parsed_content = json.loads(content)
if isinstance(parsed_content, (dict, list)):
content = parsed_content
except json.JSONDecodeError:
pass
# Prepare media based on content type
if isinstance(content, pd.DataFrame):
# For DataFrame, convert to CSV string
csv_content = content.to_csv(index=False)
media = MediaIoBaseUpload(
io.BytesIO(csv_content.encode('utf-8')),
mimetype=mime_type,
resumable=True
)
elif isinstance(content, str):
# For string content (non-markdown)
if mime_type == 'text/plain':
media = MediaIoBaseUpload(
io.BytesIO(content.encode('utf-8')),
mimetype=mime_type,
resumable=True
)
else:
# For binary content (e.g., DOCX)
media = MediaIoBaseUpload(
io.BytesIO(content if isinstance(content, bytes) else content.encode('utf-8')),
mimetype=mime_type,
resumable=True
)
elif isinstance(content, bytes):
# For binary content (already bytes)
media = MediaIoBaseUpload(
io.BytesIO(content),
mimetype=mime_type,
resumable=True
)
elif isinstance(content, dict):
# Convert dict to JSON string
json_content = json.dumps(content, indent=2)
media = MediaIoBaseUpload(
io.BytesIO(json_content.encode('utf-8')),
mimetype='application/json',
resumable=True
)
elif isinstance(content, list):
# Convert list to a string representation
list_content = "\n".join(str(item) for item in content)
media = MediaIoBaseUpload(
io.BytesIO(list_content.encode('utf-8')),
mimetype='text/plain',
resumable=True
)
else:
raise ValueError("Unsupported content type for upload.")
print(f"Prepared media for upload: {media}")
file = service.files().create(
body=file_metadata,
media_body=media,
fields='id'
).execute()
print(f"Successfully uploaded {filename} to Google Drive with ID: {file.get('id')}")
except Exception as e:
print(f"An error occurred while uploading {filename}: {e}")
raise e
def upload_to_gdrive(project_name, progress=gr.Progress()):
print("Starting upload to Google Drive...")
service = authenticate_drive_service()
project = state.quotation_project
if project is None:
print("Error: quotation_project is not set")
parent_folder_id = DRIVE_FOLDER_ID
if not parent_folder_id:
return "Drive folder ID is not set."
if not project_name:
project_name = "Final Quotation"
folder_metadata = {
'name': f'{project_name}_{datetime.now().strftime("%y%m%d_%H%M%S")}',
'mimeType': 'application/vnd.google-apps.folder',
'parents': [parent_folder_id]
}
subfolder = service.files().create(body=folder_metadata, fields='id').execute()
subfolder_id = subfolder.get('id')
print(f"Created subfolder with ID: {subfolder_id}")
progress(0.1, "Created subfolder and preparing files to upload.")
try:
# Define the attributes to upload with appropriate file extensions
attributes_to_upload = {
"generated_prd": "PRD.md",
"generated_plan_test_components": "plan_test_components.md",
"generated_page_dev_components": "page_dev_components.md",
"generated_engage_dev_components": "engage_dev_components.md",
"generated_intent_list": "intent_list.md",
"reformatted_dev_components": "reformatted_dev_components.md",
"generated_mvp_prd": "MVP_prd.md",
"combined_cost_summary": "cost_summary.md",
"generated_BD_SOW": "BD_SOW.md",
"generated_Tech_SOW": "Tech_SOW.md"
}
# Upload each attribute using the upload_content function
for attr, filename in attributes_to_upload.items():
try:
content = getattr(project, attr, None)
print(f"Uploading {attr} with filename {filename}...")
if content:
# Convert list of dicts to DataFrame if necessary
if isinstance(content, list) and all(isinstance(i, dict) for i in content):
content = pd.DataFrame(content)
# Handle JSON-like content for generated_Tech_SOW
if attr == "generated_Tech_SOW":
if isinstance(content, str):
try:
parsed_result = json.loads(content)
except json.JSONDecodeError as e:
print(f"Error parsing JSON for {attr}: {e}")
continue # Skip this attribute if parsing fails
# Convert the JSON content to a formatted markdown string
if isinstance(parsed_result, dict):
content = "\n\n".join(f"## {key.replace('_', ' ').title()}\n{value}" for key, value in parsed_result.items())
print(f"Content for {filename}: {content}")
upload_content(service, subfolder_id, filename, content)
progress(0.1, f"Uploaded {filename}")
else:
print(f"No content found for {attr}")
except Exception as e:
print(f"Failed to upload {filename}: {e}")
# Handle the mandays csv results
try:
if project.mandays_results:
for result in project.mandays_results:
function_name = result['function_name']
result_data = result['result']
# Ensure result_data is a list of dictionaries
if isinstance(result_data, dict) and function_name in result_data:
actual_data = result_data[function_name]
df = pd.DataFrame(actual_data)
if not df.empty:
# Ensure CSV content is correctly formatted
csv_content = df.to_csv(index=False)
# Upload the CSV content to Google Drive
upload_content(service, subfolder_id, f"{function_name}.csv", csv_content)
progress(0.1, f"Uploaded {function_name}.csv")
else:
print(f"Unexpected result data format for {function_name}.")
if project.mvp_mandays_results:
for result in project.mvp_mandays_results:
function_name = result['function_name']
result_data = result['result']
# Check if result_data is a dictionary
if isinstance(result_data, dict):
for section_name, records in result_data.items():
if isinstance(records, list):
df = pd.DataFrame(records)
if not df.empty:
# Ensure CSV content is correctly formatted
csv_content = df.to_csv(index=False)
# Upload the CSV content to Google Drive
upload_content(service, subfolder_id, f"{function_name}_{section_name}.csv", csv_content)
progress(0.1, f"Uploaded {function_name}_{section_name}.csv")
else:
print(f"Unexpected data format for {section_name} in {function_name}.")
else:
print(f"Unexpected result data format for {function_name}.")
except Exception as e:
print(f"Failed to upload mandays results: {e}")
folder_url = f"https://drive.google.com/drive/folders/{subfolder_id}"
progress(1.0, "Upload complete")
return f"All files uploaded successfully. Folder URL: {folder_url}"
except Exception as e:
print(f"An error occurred: {e}")
return f"Failed to upload files. Error: {e}"
def upload_combined_content(service, subfolder_id, combined_cost_summary, generated_plan_test_components, reformatted_dev_components, generated_intent_list):
"""
Combine various content sections into a single Markdown document and upload it.
"""
# Combine the content into a single Markdown string
combined_content = f"""
# Final Cost Summary
{combined_cost_summary}
# Final Planning and Testing Component
{generated_plan_test_components}
# Final Development Component
{reformatted_dev_components}
# Final Intent List
{generated_intent_list}
"""
# Upload the combined content as a Markdown file
upload_content(service, subfolder_id, "quotation_document.md", combined_content)