Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from simple_salesforce import Salesforce | |
| import os | |
| import base64 | |
| import datetime | |
| from dotenv import load_dotenv | |
| from fpdf import FPDF | |
| import shutil | |
| import html | |
| import matplotlib.pyplot as plt | |
| import io | |
| # Load environment variables | |
| load_dotenv() | |
| required_env_vars = ['SF_USERNAME', 'SF_PASSWORD', 'SF_SECURITY_TOKEN'] | |
| missing_vars = [var for var in required_env_vars if not os.getenv(var)] | |
| if missing_vars: | |
| raise EnvironmentError(f"Missing required environment variables: {missing_vars}") | |
| # Load model and tokenizer | |
| model_name = "distilgpt2" | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True) | |
| model = AutoModelForCausalLM.from_pretrained(model_name, low_cpu_mem_usage=True) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids(tokenizer.pad_token) | |
| model.config.pad_token_id = tokenizer.pad_token_id | |
| # Prompt template | |
| PROMPT_TEMPLATE = """You are an AI assistant for construction supervisors. Given the role, project, milestones, and a reflection log, generate: | |
| 1. A Daily Checklist with clear and concise tasks based on the role and milestones. | |
| Split the checklist into day-by-day tasks for a specified time period (e.g., one week). | |
| 2. Focus Suggestions based on concerns or keywords in the reflection log. Provide at least 2 suggestions. | |
| Inputs: | |
| Role: {role} | |
| Project ID: {project_id} | |
| Milestones: {milestones} | |
| Reflection Log: {reflection} | |
| Output Format: | |
| Checklist (Day-by-Day): | |
| - Day 1: | |
| - Task 1 | |
| - Task 2 | |
| - Day 2: | |
| - Task 1 | |
| - Task 2 | |
| ... | |
| Suggestions: | |
| - | |
| """ | |
| def clean_text_for_pdf(text): | |
| return html.unescape(text).encode('latin-1', 'replace').decode('latin-1') | |
| def save_report_as_pdf(role, supervisor_name, project_id, checklist, suggestions): | |
| now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
| filename = f"report_{supervisor_name}_{project_id}_{now}.pdf" | |
| file_path = f"./reports/{filename}" | |
| os.makedirs("reports", exist_ok=True) | |
| pdf = FPDF() | |
| pdf.add_page() | |
| pdf.set_font("Arial", 'B', 14) | |
| pdf.cell(200, 10, txt="Supervisor Daily Report", ln=True, align="C") | |
| pdf.set_font("Arial", size=12) | |
| pdf.cell(200, 10, txt=clean_text_for_pdf(f"Role: {role}"), ln=True) | |
| pdf.cell(200, 10, txt=clean_text_for_pdf(f"Supervisor: {supervisor_name}"), ln=True) | |
| pdf.cell(200, 10, txt=clean_text_for_pdf(f"Project ID: {project_id}"), ln=True) | |
| pdf.ln(5) | |
| pdf.set_font("Arial", 'B', 12) | |
| pdf.cell(200, 10, txt="Daily Checklist", ln=True) | |
| pdf.set_font("Arial", size=12) | |
| for line in checklist.split("\n"): | |
| pdf.multi_cell(0, 10, clean_text_for_pdf(line)) | |
| pdf.ln(5) | |
| pdf.set_font("Arial", 'B', 12) | |
| pdf.cell(200, 10, txt="Focus Suggestions", ln=True) | |
| pdf.set_font("Arial", size=12) | |
| for line in suggestions.split("\n"): | |
| pdf.multi_cell(0, 10, clean_text_for_pdf(line)) | |
| pdf.output(file_path) | |
| temp_pdf_path = "/tmp/" + os.path.basename(file_path) | |
| shutil.copy(file_path, temp_pdf_path) | |
| return temp_pdf_path, filename | |
| def upload_pdf_to_salesforce_and_update_link(supervisor_name, project_id, pdf_path, pdf_name, checklist, suggestions): | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv('SF_USERNAME'), | |
| password=os.getenv('SF_PASSWORD'), | |
| security_token=os.getenv('SF_SECURITY_TOKEN'), | |
| domain=os.getenv('SF_DOMAIN', 'login') | |
| ) | |
| with open(pdf_path, "rb") as f: | |
| encoded = base64.b64encode(f.read()).decode() | |
| content = sf.ContentVersion.create({ | |
| 'Title': pdf_name, | |
| 'PathOnClient': pdf_name, | |
| 'VersionData': encoded | |
| }) | |
| content_id = content['id'] | |
| download_url = f"https://{sf.sf_instance}/sfc/servlet.shepherd/version/download/{content_id}" | |
| query = sf.query(f"SELECT Id FROM Supervisor__c WHERE Name = '{supervisor_name}' LIMIT 1") | |
| if query['totalSize'] == 0: | |
| return "" | |
| supervisor_id = query['records'][0]['Id'] | |
| project_query = sf.query(f"SELECT Id FROM Project__c WHERE Name = '{project_id}' LIMIT 1") | |
| if project_query['totalSize'] == 0: | |
| return "" | |
| project_id_sf = project_query['records'][0]['Id'] | |
| sf.Supervisor_AI_Coaching__c.create({ | |
| 'Project_ID__c': project_id_sf, | |
| 'Supervisor_ID__c': supervisor_id, | |
| 'Daily_Checklist__c': checklist, | |
| 'Suggested_Tips__c': suggestions, | |
| 'Download_Link__c': download_url | |
| }) | |
| return download_url | |
| except Exception as e: | |
| return "" | |
| def get_roles_from_salesforce(): | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv('SF_USERNAME'), | |
| password=os.getenv('SF_PASSWORD'), | |
| security_token=os.getenv('SF_SECURITY_TOKEN'), | |
| domain=os.getenv('SF_DOMAIN', 'login') | |
| ) | |
| result = sf.query("SELECT Role__c FROM Supervisor__c WHERE Role__c != NULL") | |
| return list(set(record['Role__c'] for record in result['records'])) | |
| except Exception as e: | |
| return [] | |
| def get_supervisor_name_by_role(role): | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv('SF_USERNAME'), | |
| password=os.getenv('SF_PASSWORD'), | |
| security_token=os.getenv('SF_SECURITY_TOKEN'), | |
| domain=os.getenv('SF_DOMAIN', 'login') | |
| ) | |
| result = sf.query(f"SELECT Name FROM Supervisor__c WHERE Role__c = '{role}'") | |
| return [record['Name'] for record in result['records']] | |
| except Exception as e: | |
| return [] | |
| def get_projects_for_supervisor(supervisor_name): | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv('SF_USERNAME'), | |
| password=os.getenv('SF_PASSWORD'), | |
| security_token=os.getenv('SF_SECURITY_TOKEN'), | |
| domain=os.getenv('SF_DOMAIN', 'login') | |
| ) | |
| result = sf.query(f"SELECT Id FROM Supervisor__c WHERE Name = '{supervisor_name}' LIMIT 1") | |
| if result['totalSize'] == 0: | |
| return "" | |
| supervisor_id = result['records'][0]['Id'] | |
| project_result = sf.query(f"SELECT Name FROM Project__c WHERE Supervisor_ID__c = '{supervisor_id}' LIMIT 1") | |
| return project_result['records'][0]['Name'] if project_result['totalSize'] > 0 else "" | |
| except Exception as e: | |
| return "" | |
| def get_dashboard_data_from_salesforce(supervisor_name, project_id): | |
| try: | |
| sf = Salesforce( | |
| username=os.getenv('SF_USERNAME'), | |
| password=os.getenv('SF_PASSWORD'), | |
| security_token=os.getenv('SF_SECURITY_TOKEN'), | |
| domain=os.getenv('SF_DOMAIN', 'login') | |
| ) | |
| query = f""" | |
| SELECT Daily_Checklist__c, Suggested_Tips__c, Download_Link__c, CreatedDate | |
| FROM Supervisor_AI_Coaching__c | |
| WHERE Supervisor_ID__r.Name = '{supervisor_name}' AND Project_ID__r.Name = '{project_id}' | |
| ORDER BY CreatedDate DESC | |
| LIMIT 1 | |
| """ | |
| result = sf.query(query) | |
| if result['totalSize'] == 0: | |
| return "No dashboard data found.", "", "", "" | |
| record = result['records'][0] | |
| return ( | |
| record.get('Daily_Checklist__c', 'N/A'), | |
| record.get('Suggested_Tips__c', 'N/A'), | |
| record.get('Download_Link__c', ''), | |
| record.get('CreatedDate', 'Unknown') | |
| ) | |
| except Exception as e: | |
| return f"Error loading dashboard: {e}", "", "", "" | |
| # Function to compare start and end date | |
| def compare_dates(start_date, end_date): | |
| current_date = datetime.datetime.now() | |
| if current_date >= end_date: | |
| return "Project Completed" | |
| elif current_date < start_date: | |
| return "Not Started" | |
| else: | |
| return "In Progress" | |
| # Render bar chart (updated) | |
| def render_bar_chart(completed, total): | |
| fig, ax = plt.subplots(figsize=(4, 3)) | |
| ax.bar(["Completed", "Remaining"], [completed, max(0, total - completed)], color=["green", "gray"]) | |
| ax.set_title("Checklist Progress") | |
| buf = io.BytesIO() | |
| plt.tight_layout() | |
| fig.savefig(buf, format="png") | |
| plt.close(fig) | |
| buf.seek(0) | |
| encoded = base64.b64encode(buf.read()).decode("utf-8") | |
| return f"<img src='data:image/png;base64,{encoded}'/>" | |
| # Updated show_dashboard_html function | |
| def show_dashboard_html(sup_name, proj_id): | |
| checklist, suggestions, link, date = get_dashboard_data_from_salesforce(sup_name, proj_id) | |
| # Parse date from Salesforce (assuming 'CreatedDate' is the project end date) | |
| end_date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f+0000") # Adjust format if needed | |
| start_date = datetime.datetime.now() # Example: assume project starts today | |
| # Compare the start and end dates | |
| status = compare_dates(start_date, end_date) | |
| # Progress calculation (dummy logic here) | |
| completed_tasks = checklist.count("- ") # Count bullet points | |
| total_tasks = 5 # You can replace this with actual task count from the checklist | |
| chart_html = render_bar_chart(completed_tasks, total_tasks) | |
| # Create the final HTML content | |
| link_html = f'<a href="{link}" target="_blank">Download PDF</a>' if link else "No report available" | |
| html_card = f""" | |
| <div style='border:1px solid #ccc; border-radius:10px; padding:20px; background-color:#f9f9f9'> | |
| <h3>π <u>Dashboard Summary</u></h3> | |
| <p><b>Supervisor:</b> {sup_name}</p> | |
| <p><b>Project ID:</b> {proj_id}</p> | |
| <p><b>ποΈ Status:</b> {status}</p> | |
| <hr> | |
| <h4>β <u>Daily Checklist</u></h4> | |
| <pre style='background:#eef; padding:10px; border-radius:5px;'>{checklist}</pre> | |
| <h4>π‘ <u>Focus Suggestions</u></h4> | |
| <pre style='background:#ffe; padding:10px; border-radius:5px;'>{suggestions}</pre> | |
| <p>{link_html}</p> | |
| <hr> | |
| <h4>π <u>Progress Chart</u></h4> | |
| {chart_html} | |
| </div> | |
| """ | |
| return html_card | |
| # Main Gradio Interface | |
| def create_interface(): | |
| roles = get_roles_from_salesforce() | |
| with gr.Blocks(theme="soft", css=".footer { display: none; }") as demo: | |
| gr.Markdown("## π§ AI-Powered Supervisor Assistant") | |
| with gr.Tab("βοΈ Generate Checklist"): | |
| with gr.Row(): | |
| role = gr.Dropdown(choices=roles, label="Role") | |
| supervisor_name = gr.Dropdown(choices=[], label="Supervisor Name") | |
| project_id = gr.Textbox(label="Project ID", interactive=False) | |
| milestones = gr.Textbox(label="Milestones (comma-separated KPIs)") | |
| reflection = gr.Textbox(label="Reflection Log", lines=4) | |
| with gr.Row(): | |
| generate = gr.Button("Generate") | |
| clear = gr.Button("Clear") | |
| refresh = gr.Button("π Refresh Roles") | |
| checklist_output = gr.Textbox(label="β Daily Checklist") | |
| suggestions_output = gr.Textbox(label="π‘ Focus Suggestions") | |
| download_button = gr.File(label="β¬ Download Report") | |
| pdf_link = gr.HTML() | |
| role.change(fn=lambda r: gr.update(choices=get_supervisor_name_by_role(r)), inputs=role, outputs=supervisor_name) | |
| supervisor_name.change(fn=get_projects_for_supervisor, inputs=supervisor_name, outputs=project_id) | |
| def handle_generate(role, supervisor_name, project_id, milestones, reflection): | |
| checklist, suggestions, pdf_path, pdf_name = generate_outputs(role, supervisor_name, project_id, milestones, reflection) | |
| return checklist, suggestions, pdf_path, pdf_name | |
| generate.click(fn=handle_generate, | |
| inputs=[role, supervisor_name, project_id, milestones, reflection], | |
| outputs=[checklist_output, suggestions_output, download_button, pdf_link]) | |
| clear.click(fn=lambda: ("", "", "", "", ""), | |
| inputs=None, | |
| outputs=[role, supervisor_name, project_id, milestones, reflection]) | |
| refresh.click(fn=lambda: gr.update(choices=get_roles_from_salesforce()), outputs=role) | |
| with gr.Tab("π Supervisor Dashboard"): | |
| dash_supervisor = gr.Textbox(label="Supervisor Name", placeholder="e.g., SUP-056") | |
| dash_project = gr.Textbox(label="Project ID", placeholder="e.g., PROJ-078") | |
| load_dash = gr.Button("π₯ Load Dashboard") | |
| dash_output = gr.HTML() | |
| def show_dashboard_html(sup_name, proj_id): | |
| checklist, suggestions, link, date = get_dashboard_data_from_salesforce(sup_name, proj_id) | |
| link_html = f'<a href="{link}" target="_blank">Download PDF</a>' if link else "No report available" | |
| completed_tasks = checklist.count("- ") | |
| total_tasks = 5 | |
| chart_html = render_bar_chart(completed_tasks, total_tasks) | |
| html_card = f""" | |
| <div style='border:1px solid #ccc; border-radius:10px; padding:20px; background-color:#f9f9f9'> | |
| <h3>π <u>Dashboard Summary</u></h3> | |
| <p><b>Supervisor:</b> {sup_name}</p> | |
| <p><b>Project ID:</b> {proj_id}</p> | |
| <p><b>ποΈ Last Updated:</b> {date}</p> | |
| <hr> | |
| <h4>β <u>Daily Checklist</u></h4> | |
| <pre style='background:#eef; padding:10px; border-radius:5px;'>{checklist}</pre> | |
| <h4>π‘ <u>Focus Suggestions</u></h4> | |
| <pre style='background:#ffe; padding:10px; border-radius:5px;'>{suggestions}</pre> | |
| <p>{link_html}</p> | |
| <hr> | |
| <h4>π <u>Progress Chart</u></h4> | |
| {chart_html} | |
| </div> | |
| """ | |
| return html_card | |
| load_dash.click(fn=show_dashboard_html, inputs=[dash_supervisor, dash_project], outputs=dash_output) | |
| return demo | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.launch() | |