Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from datetime import datetime, timedelta | |
| from collections import defaultdict, Counter | |
| from llama_index.llms.openai import OpenAI | |
| from composio_llamaindex import ComposioToolSet, App, Action | |
| import os | |
| import json | |
| from dotenv import load_dotenv | |
| # Load environment variables | |
| load_dotenv() | |
| class CalendarService: | |
| def __init__(self): | |
| self.toolset = ComposioToolSet(api_key=os.getenv('COMPOSIO_API_KEY')) | |
| self.connection_request = None | |
| # [Previous CalendarService methods remain the same] | |
| def analyze_calendar_events(self, response_data): | |
| """ | |
| Analyze calendar events and return statistics about meetings. | |
| """ | |
| current_year = datetime.now().year | |
| meetings = [] | |
| participants = [] | |
| meeting_times = [] | |
| total_duration = timedelta() | |
| monthly_meetings = defaultdict(int) | |
| daily_meetings = defaultdict(int) | |
| events = response_data.get('data', {}).get('event_data', {}).get('event_data', []) | |
| for event in events: | |
| start_data = event.get('start', {}) | |
| end_data = event.get('end', {}) | |
| try: | |
| start = datetime.fromisoformat(start_data.get('dateTime').replace('Z', '+00:00')) | |
| end = datetime.fromisoformat(end_data.get('dateTime').replace('Z', '+00:00')) | |
| if start.year == current_year: | |
| duration = end - start | |
| total_duration += duration | |
| monthly_meetings[start.strftime('%B')] += 1 | |
| daily_meetings[start.strftime('%A')] += 1 | |
| meeting_times.append(start.strftime('%H:%M')) | |
| if 'attendees' in event: | |
| for attendee in event['attendees']: | |
| if attendee.get('responseStatus') != 'declined': | |
| participants.append(attendee.get('email')) | |
| organizer_email = event.get('organizer', {}).get('email') | |
| if organizer_email: | |
| participants.append(organizer_email) | |
| meetings.append({ | |
| 'start': start, | |
| 'duration': duration, | |
| 'summary': event.get('summary', 'No Title') | |
| }) | |
| except (ValueError, TypeError, AttributeError) as e: | |
| st.error(f"Error processing event: {e}") | |
| continue | |
| total_meetings = len(meetings) | |
| stats = { | |
| "total_meetings_this_year": total_meetings | |
| } | |
| if total_meetings > 0: | |
| stats.update({ | |
| "total_time_spent": str(total_duration), | |
| "busiest_month": max(monthly_meetings.items(), key=lambda x: x[1])[0] if monthly_meetings else "N/A", | |
| "busiest_day": max(daily_meetings.items(), key=lambda x: x[1])[0] if daily_meetings else "N/A", | |
| "most_frequent_participant": Counter(participants).most_common(1)[0][0] if participants else "N/A", | |
| "average_meeting_duration": str(total_duration / total_meetings), | |
| "most_common_meeting_time": Counter(meeting_times).most_common(1)[0][0] if meeting_times else "N/A", | |
| "monthly_breakdown": dict(monthly_meetings), | |
| "daily_breakdown": dict(daily_meetings) | |
| }) | |
| else: | |
| stats.update({ | |
| "total_time_spent": "0:00:00", | |
| "busiest_month": "N/A", | |
| "busiest_day": "N/A", | |
| "most_frequent_participant": "N/A", | |
| "average_meeting_duration": "0:00:00", | |
| "most_common_meeting_time": "N/A", | |
| "monthly_breakdown": {}, | |
| "daily_breakdown": {} | |
| }) | |
| return stats | |
| def initiate_connection(self, entity_id: str, redirect_url: str = "https://calendar-wrapped-eight.vercel.app/") -> dict: | |
| try: | |
| self.connection_request = self.toolset.initiate_connection( | |
| entity_id=entity_id, | |
| app=App.GOOGLECALENDAR, | |
| ) | |
| return { | |
| 'success': True, | |
| 'data': { | |
| 'redirect_url': self.connection_request.redirectUrl, | |
| 'message': "Please authenticate using the provided link." | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def check_connection_status(self, entity_id: str) -> dict: | |
| try: | |
| entity_id = self.toolset.get_entity(id=entity_id) | |
| connection = entity_id.get_connection(app=App.GOOGLECALENDAR) | |
| status = connection.status | |
| return { | |
| 'success': True, | |
| 'data': { | |
| 'status': status, | |
| 'message': f"Connection status: {status}" | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def generate_wrapped(self, entity_id: str) -> dict: | |
| try: | |
| current_year = datetime.now().year | |
| request_params = { | |
| "calendar_id": "primary", | |
| "timeMin": f"{current_year},1,1,0,0,0", | |
| "timeMax": f"{current_year},12,31,23,59,59", | |
| "single_events": True, | |
| "max_results": 2500, | |
| "order_by": "startTime" | |
| } | |
| events_response = self.toolset.execute_action( | |
| action=Action.GOOGLECALENDAR_FIND_EVENT, | |
| params=request_params, | |
| entity_id=entity_id | |
| ) | |
| if events_response["successfull"]: | |
| stats = self.analyze_calendar_events(events_response) | |
| llm = OpenAI(model='gpt-4', api_key=os.getenv('OPENAI_API_KEY')) | |
| billionaire_prompt = f"""Based on these calendar stats, which tech billionaire's schedule does this most resemble and why? | |
| Stats: | |
| - {stats['total_meetings_this_year']} total meetings | |
| - {stats['total_time_spent']} total time in meetings | |
| - Most active on {stats['busiest_day']}s | |
| - Busiest month is {stats['busiest_month']} | |
| - Average meeting duration: {stats['average_meeting_duration']} | |
| Suggest a different billionaire each time, dont say elon. | |
| Return as JSON with format: {{"name": "billionaire name", "reason": "explanation"}} | |
| """ | |
| stats_prompt = f"""Analyze these calendar stats and write a brief, insightful one-sentence comment for each metric: | |
| - Total meetings: {stats['total_meetings_this_year']} | |
| - Total time in meetings: {stats['total_time_spent']} | |
| - Busiest month: {stats['busiest_month']} | |
| - Busiest day: {stats['busiest_day']} | |
| - Average meeting duration: {stats['average_meeting_duration']} | |
| - Most common meeting time: {stats['most_common_meeting_time']} | |
| - Most frequent participant: {stats['most_frequent_participant']} | |
| Return as JSON with format: {{"total_meetings_comment": "", "time_spent_comment": "", "busiest_times_comment": "", "collaborator_comment": "", "habits_comment": ""}} | |
| """ | |
| try: | |
| billionaire_response = json.loads(llm.complete(billionaire_prompt).text) | |
| stats_comments = json.loads(llm.complete(stats_prompt).text) | |
| stats["schedule_analysis"] = billionaire_response | |
| stats["metric_insights"] = stats_comments | |
| except Exception as e: | |
| st.error(f"Error processing LLM responses: {e}") | |
| stats["schedule_analysis"] = {"name": "Unknown", "reason": "Analysis unavailable"} | |
| stats["metric_insights"] = { | |
| "total_meetings_comment": "", | |
| "time_spent_comment": "", | |
| "busiest_times_comment": "", | |
| "collaborator_comment": "", | |
| "habits_comment": "" | |
| } | |
| return { | |
| 'success': True, | |
| 'data': stats | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': events_response.get("error", "Failed to fetch calendar events") | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e) | |
| } | |
| def main(): | |
| st.set_page_config(page_title="Calendar Wrapped", layout="wide") | |
| st.title("Calendar Wrapped") | |
| service = CalendarService() | |
| tab1, tab2, tab3 = st.tabs(["Connect", "Check Status", "Generate Wrapped"]) | |
| with tab1: | |
| st.header("Initialize Connection") | |
| entity_id = st.text_input("Entity ID") | |
| redirect_url = st.text_input("Redirect URL", value="https://calendar-wrapped-eight.vercel.app/") | |
| if st.button("Initialize Connection"): | |
| if entity_id: | |
| result = service.initiate_connection(entity_id, redirect_url) | |
| if result['success']: | |
| st.success(result['data']['message']) | |
| st.markdown(f"[Click here to authenticate]({result['data']['redirect_url']})") | |
| else: | |
| st.error(f"Error: {result['error']}") | |
| else: | |
| st.warning("Please enter an Entity ID") | |
| with tab2: | |
| st.header("Check Connection Status") | |
| status_entity_id = st.text_input("Entity ID", key="status_entity_id") | |
| if st.button("Check Status"): | |
| if status_entity_id: | |
| result = service.check_connection_status(status_entity_id) | |
| if result['success']: | |
| st.success(result['data']['message']) | |
| else: | |
| st.error(f"Error: {result['error']}") | |
| else: | |
| st.warning("Please enter an Entity ID") | |
| with tab3: | |
| st.header("Generate Calendar Wrapped") | |
| wrapped_entity_id = st.text_input("Entity ID", key="wrapped_entity_id") | |
| if st.button("Generate Wrapped"): | |
| if wrapped_entity_id: | |
| with st.spinner("Generating your Calendar Wrapped..."): | |
| result = service.generate_wrapped(wrapped_entity_id) | |
| if result['success']: | |
| data = result['data'] | |
| # Display basic stats | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Total Meetings", data['total_meetings_this_year']) | |
| with col2: | |
| st.metric("Total Time in Meetings", data['total_time_spent']) | |
| with col3: | |
| st.metric("Average Meeting Duration", data['average_meeting_duration']) | |
| # Display schedule analysis | |
| st.subheader("Schedule Analysis") | |
| if data.get('schedule_analysis'): | |
| st.write(f"Your schedule most resembles: **{data['schedule_analysis']['name']}**") | |
| st.write(f"*{data['schedule_analysis']['reason']}*") | |
| # Display insights | |
| st.subheader("Insights") | |
| if data.get('metric_insights'): | |
| insights = data['metric_insights'] | |
| for key, value in insights.items(): | |
| if value: # Only display non-empty insights | |
| st.write(f"- {value}") | |
| # Display breakdowns | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("Monthly Breakdown") | |
| st.bar_chart(data['monthly_breakdown']) | |
| with col2: | |
| st.subheader("Daily Breakdown") | |
| st.bar_chart(data['daily_breakdown']) | |
| else: | |
| st.error(f"Error: {result['error']}") | |
| else: | |
| st.warning("Please enter an Entity ID") | |
| if __name__ == "__main__": | |
| main() |