Spaces:
Build error
Build error
| import os | |
| os.environ['TZ'] = 'Asia/Kolkata' | |
| import time | |
| import functools | |
| from langchain.schema import HumanMessage | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain.agents import AgentExecutor, create_tool_calling_agent | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.runnables.history import RunnableWithMessageHistory | |
| from langchain_core.tools import tool | |
| import gradio as gr | |
| from io import BytesIO | |
| import json | |
| from datetime import date, datetime | |
| import random | |
| import pandas as pd | |
| from typing import List | |
| from langchain_core.chat_history import BaseChatMessageHistory | |
| from langchain_core.messages import BaseMessage | |
| from langchain_core.pydantic_v1 import BaseModel, Field | |
| from PIL import Image | |
| import base64 | |
| from gradio import ChatMessage | |
| os.environ['TZ'] = 'Asia/Kolkata' | |
| print(time.strftime('%X %x %Z')) | |
| # ---------------------- Sticky Pad Persistence Functions ---------------------- # | |
| def load_sticky_pad(username: str) -> str: | |
| """Load sticky pad content from the Sticky Notes directory.""" | |
| directory = "Sticky Notes" | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| filepath = os.path.join(directory, f"{username}-sticky.txt") | |
| if os.path.exists(filepath): | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| return f.read() | |
| return "" | |
| def save_sticky_pad(username: str, content: str) -> None: | |
| """Save sticky pad content to the Sticky Notes directory.""" | |
| directory = "Sticky Notes" | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| filepath = os.path.join(directory, f"{username}-sticky.txt") | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| # ---------------------------------------------------------------------------- # | |
| def load_schedules(): | |
| with open('schedules.json', 'r') as f: | |
| return json.load(f) | |
| def save_schedules(schedules): | |
| with open('schedules.json', 'w') as f: | |
| json.dump(schedules, f, indent=4) | |
| def cache_with_timeout(timeout: int): | |
| def decorator(func): | |
| cache = {} | |
| def wrapper(*args): | |
| if args in cache: | |
| result, timestamp = cache[args] | |
| if time.time() - timestamp < timeout: | |
| return result | |
| result = func(*args) | |
| cache[args] = (result, time.time()) | |
| return result | |
| return wrapper | |
| return decorator | |
| class InMemoryHistory(BaseChatMessageHistory, BaseModel): | |
| messages: List[BaseMessage] = Field(default_factory=list) | |
| def add_messages(self, messages: List[BaseMessage]) -> None: | |
| self.messages.extend(messages) | |
| def clear(self) -> None: | |
| self.messages = [] | |
| def encode_image_to_base64(image_path): | |
| image = Image.open(image_path) | |
| buffered = BytesIO() | |
| image.save(buffered, format=image.format) | |
| img_bytes = buffered.getvalue() | |
| return base64.b64encode(img_bytes).decode("utf-8") | |
| def parse(history): | |
| p = '\n' | |
| if history == []: | |
| return "No Chat till now" | |
| for i in history: | |
| try: | |
| p += i['role'] + ': ' + i['content'] + '\n' | |
| except: | |
| pass | |
| return p | |
| def decode_image(encoded_image): | |
| image_data = base64.b64decode(encoded_image) | |
| image = Image.open(BytesIO(image_data)) | |
| global i | |
| filename = f"temp{i}.jpeg" | |
| i += 1 | |
| image.save(filename) | |
| return filename | |
| i = 0 # Global counter for image filenames | |
| # Detailed persona descriptions | |
| PERSONA_MAP = { | |
| "Creative Muse": "A warm, imaginative persona that encourages creative exploration and inspiring writing.", | |
| "Literary Critic": "A refined and insightful persona offering deep literary analysis and constructive feedback.", | |
| "Storyteller": "A narrative-driven persona that weaves engaging, immersive stories with a touch of humor.", | |
| "Academic Advisor": "A formal and knowledgeable persona that provides scholarly insights and structured guidance." | |
| } | |
| # A list of inspirational prompt examples (25 examples) | |
| INSPIRATIONAL_PROMPTS = [ | |
| "Write about a hidden garden that only appears at dusk.", | |
| "Imagine a world where dreams dictate reality.", | |
| "Describe a city that floats in the sky.", | |
| "Craft a story around a mysterious, timeless letter.", | |
| "Write a poem about the sound of rain on a tin roof.", | |
| "Imagine a dialogue between two ancient trees.", | |
| "Describe a sunset as if seen through an artist’s eyes.", | |
| "Craft a narrative about a forgotten melody.", | |
| "Write about an unexpected friendship in an unlikely place.", | |
| "Imagine a secret door in an ordinary room.", | |
| "Describe the journey of a single, determined raindrop.", | |
| "Write a story that begins with a chance encounter.", | |
| "Imagine a future where art and science merge seamlessly.", | |
| "Craft a tale inspired by the patterns of the stars.", | |
| "Describe the magic hidden in everyday moments.", | |
| "Write a narrative about a silent revolution of ideas.", | |
| "Imagine a world where time flows backward.", | |
| "Craft a poem celebrating the beauty of imperfections.", | |
| "Describe a character who finds solace in solitude.", | |
| "Write about a moment when everything suddenly made sense.", | |
| "Imagine a landscape painted by the emotions of its inhabitants.", | |
| "Craft a narrative that blurs the line between reality and fantasy.", | |
| "Describe a long-forgotten legend in a modern setting.", | |
| "Write a story inspired by the interplay of light and shadow.", | |
| "Imagine a conversation with your future self." | |
| ] | |
| class User: | |
| def __init__(self, username: str, title: str, persona: str): | |
| self.username = username | |
| self.session_title = title | |
| self.persona = persona | |
| self.persona_description = PERSONA_MAP.get(persona, "A creative writing assistant.") | |
| # Store the last assistant response for the sticky pad. | |
| self.last_response = "" | |
| # Two Gemini 1.5 pro instances with tailored parameters: | |
| self.llm_main = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash", | |
| temperature=1, | |
| max_tokens=1500, | |
| timeout=None, | |
| max_retries=5, | |
| google_api_key=os.getenv('API_KEY') | |
| ) | |
| self.llm_inquiry = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash", | |
| temperature=0.8, | |
| max_tokens=1024, | |
| timeout=None, | |
| max_retries=5, | |
| google_api_key=os.getenv('API_KEY') | |
| ) | |
| self.template = f""" | |
| You are a Vision Enabled Creative Writing Assistant named 'CreativeMind' with the persona: {self.persona} - {self.persona_description}. | |
| Your role is to provide human-like, engaging, and insightful creative writing advice, literary analysis, and writing prompts. | |
| You utilize multiple instances of Carefully Prompt Engineered LLMs: one for general conversation and tool orchestration, and a dedicated one for creative inquiries. | |
| Call the necessary TOOLS as required. | |
| REMEMBER USER CAN SEE ONLY YOUR RESPONSE AND THEY CAN'T SEE THE TOOLS OUTPUT. | |
| "ALWAYS call the `add_reminder` tool for EVERY reminder request." | |
| You can: | |
| 1. Schedule a review session (tool provided) | |
| 2. Answer creative writing inquiries and provide literary analysis (tool provided) | |
| 3. Provide inspiring writing prompts and ideas. | |
| 4. Set reminders for creative tasks using the reminder tool. | |
| Engage warmly, include emojis, and provide detailed explanations. | |
| Current Date (For scheduling ONLY, if no date is mentioned assume Today): {date.today()} | |
| Name of the User: {self.username} | |
| """ | |
| self.prompt = ChatPromptTemplate.from_messages( | |
| [ | |
| ("system", self.template), | |
| ("placeholder", "{chat_history}"), | |
| ("placeholder", "{input}"), | |
| ("placeholder", "{agent_scratchpad}"), | |
| ] | |
| ) | |
| self.store = {} | |
| def creative_inquiry(question: str) -> str: | |
| """Answers creative writing queries and generates inspiring writing prompts using the dedicated Gemini 1.5 pro instance.""" | |
| p=self.llm_inquiry.invoke(self.PI_prompt.format(question), config=self.config) | |
| return p.content | |
| def add_reminder(time: str, name: str) -> str: | |
| """ | |
| Adds a reminder for the given time and name. | |
| Parameters: | |
| - time: in format '%Y-%m-%d %H:%M'. | |
| - name: Name of the creative task or event. | |
| """ | |
| reminders = self.load_reminders() | |
| reminder_entry = {"time": time, "name": name} | |
| reminders.append(reminder_entry) | |
| self.save_reminders(reminders) | |
| return f"Reminder set for '{name}' at {time}." | |
| def schedule_review(query: str) -> str: | |
| """Schedules a creative review session. | |
| Parameter: A single string in the format `%Y-%m-%d %H:%M` ONLY""" | |
| schedules = load_schedules() | |
| query = query.replace("`", '') | |
| combined_time_str = datetime.strptime(query, "%Y-%m-%d %H:%M") | |
| if schedules.get(str(combined_time_str), "") == "": | |
| schedules[str(combined_time_str)] = self.username | |
| save_schedules(schedules) | |
| return f"Review session scheduled successfully for {self.username} at {combined_time_str}." | |
| else: | |
| return "The preferred time slot is unavailable. Please choose another time." | |
| self.PI_prompt = '''Context: | |
| You are a creative writing assistant. When given a literary query or a request for a writing prompt, provide thoughtful, inspiring, and creative responses. | |
| Example Query: | |
| "Can you suggest a writing prompt involving a mysterious lighthouse?" | |
| AI-powered Response: | |
| "Imagine a weather-beaten lighthouse standing alone on a rocky shore, its beacon a relic of forgotten times. Write about a stormy night when the light flickers mysteriously, revealing secrets hidden beneath the crashing waves." | |
| User Query: | |
| {} | |
| Note: Focus on creativity, literary flair, and thoughtful insights. | |
| ''' | |
| # Create the agent using the main instance. | |
| self.agent = create_tool_calling_agent(self.llm_main, [schedule_review, creative_inquiry, add_reminder], self.prompt) | |
| self.agent_executor = RunnableWithMessageHistory( | |
| AgentExecutor(agent=self.agent, tools=[schedule_review, creative_inquiry, add_reminder], verbose=True), | |
| self.get_by_session_id, | |
| input_messages_key="input", | |
| history_messages_key="chat_history", | |
| ) | |
| self.config = {"configurable": {"session_id": self.username + "-" + self.session_title}} | |
| def get_by_session_id(self, session_id: str) -> BaseChatMessageHistory: | |
| if session_id not in self.store: | |
| self.store[session_id] = InMemoryHistory() | |
| return self.store[session_id] | |
| def load_reminders(self): | |
| try: | |
| with open(f'reminders/{self.username}-reminders.json', 'r') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| return [] | |
| def save_reminders(self, reminders): | |
| with open(f'reminders/{self.username}-reminders.json', 'w') as f: | |
| json.dump(reminders, f, indent=4) | |
| def save_conversation_history(self, history_data): | |
| if not os.path.exists(f'conv/{self.username}-conversation_history.json'): | |
| with open(f'conv/{self.username}-conversation_history.json', 'w') as f: | |
| json.dump({}, f) | |
| with open(f'conv/{self.username}-conversation_history.json', 'w') as f: | |
| json.dump(history_data, f, indent=4) | |
| def save_conversation(self, title, user_input, ai_response, images=None): | |
| history_data = self.load_conversation_history() | |
| conversation_entry = [{"role": "user", "content": user_input}] | |
| if images: | |
| for img in images: | |
| encoded_image = encode_image_to_base64(img) | |
| conversation_entry.append({"role": "user", "content": encoded_image, "type": "image"}) | |
| conversation_entry.append({"role": "assistant", "content": ai_response, "persona": self.persona}) | |
| if title in history_data: | |
| history_data[title].extend(conversation_entry) | |
| else: | |
| history_data[title] = conversation_entry | |
| self.save_conversation_history(history_data) | |
| def load_conversation_history(self): | |
| if not os.path.exists(f'conv/{self.username}-conversation_history.json'): | |
| with open(f'conv/{self.username}-conversation_history.json', 'w') as f: | |
| json.dump({}, f) | |
| with open(f'conv/{self.username}-conversation_history.json', 'r') as f: | |
| return json.load(f) | |
| def update_conversation_history(self, session_id, message_data): | |
| conversation_history = self.load_conversation_history() | |
| if session_id not in conversation_history: | |
| conversation_history[session_id] = [] | |
| conversation_history[session_id].append(message_data) | |
| with open(f'conv/{self.username}-conversation_history.json', 'w') as f: | |
| json.dump(conversation_history, f, indent=4) | |
| def system_message_reminder(self): | |
| reminders = self.load_reminders() | |
| current_time = datetime.now().strftime('%Y-%m-%d %H:%M') | |
| for reminder in reminders: | |
| if reminder['time'] == current_time: | |
| print("TIME UP!!") | |
| message = HumanMessage(content=[{"type": "text", "text": f"System: {reminder['time']} reached! Time for your creative task: {reminder['name']} 🎨"}]) | |
| result = self.agent_executor.invoke({"input": [message]}, config=self.config) | |
| reminders.remove(reminder) | |
| self.save_reminders(reminders) | |
| response = result['output'] | |
| gr.Info(response, duration=30) | |
| def load_selected_conversation(self, title): | |
| history_data = self.load_conversation_history() | |
| print(f"Title type: {type(title)}, Title: {title}, {history_data}") | |
| return history_data.get(title, []) | |
| def save_ai_response(self, response): | |
| if not os.path.exists("responses"): | |
| os.makedirs("responses") | |
| timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| filename = f"responses/{self.username}-{self.session_title}-{timestamp}.txt" | |
| with open(filename, "w",encoding="utf-8") as f: | |
| f.write(response) | |
| def chatbot_response(self, history, query): | |
| extra_text = "" | |
| if query.get('files'): | |
| image_data = [] | |
| for x in range(len(query["files"])): | |
| image = encode_image_to_base64(query['files'][x]) | |
| image_data += [HumanMessage( | |
| content=[{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}}] | |
| )] | |
| image_data += [HumanMessage( | |
| content=[{"type": "text", "text": "Invoke the necessary tools for the query: " + query['text'] + extra_text}] | |
| )] | |
| result = self.agent_executor.invoke({"input": image_data}, config=self.config) | |
| self.save_conversation(self.session_title, query['text'], result['output'], images=query['files']) | |
| else: | |
| message = HumanMessage( | |
| content=[{"type": "text", "text": "Invoke the necessary tools for the query: " + query['text'] + extra_text}] | |
| ) | |
| result = self.agent_executor.invoke({"input": [message]}, config=self.config) | |
| self.save_conversation(self.session_title, query['text'], result['output']) | |
| response = result['output'] | |
| self.last_response = response # Store last response for the sticky pad. | |
| self.save_ai_response(response) | |
| return response | |
| # ------------------ Modified Sticky Pad Function ------------------ # | |
| def add_to_sticky(state, sticky_text): | |
| if state and hasattr(state[0], "last_response"): | |
| new_text = sticky_text + "\n" + state[0].last_response | |
| else: | |
| new_text = sticky_text | |
| # Save the updated sticky pad content to file | |
| save_sticky_pad(state[0].username, new_text) | |
| return new_text | |
| # ------------------------------------------------------------------ # | |
| # Function to update inspirational prompt; updates every 10 seconds. | |
| def update_inspiration(): | |
| prompt = random.choice(INSPIRATIONAL_PROMPTS) | |
| return prompt | |
| # Gradio Interface | |
| with gr.Blocks(theme=gr.themes.Soft(secondary_hue="green"),fill_width=True,fill_height=True, | |
| css="footer {visibility: hidden;} #login {display: flex; flex-direction: column; align-items: center; justify-content: center; padding: 20px; border: 1px solid #ccc; border-radius: 5px; background-color: #f0f0f0; width: 300px; margin: 15vh auto;}.message-row.svelte-1x5p6hu img{margin:0px !important;}.avatar-container.svelte-1x5p6hu:not(.thumbnail-item) img{padding: 0px !important;}") as app: | |
| # Login block with persona selection. | |
| with gr.Column(visible=True, min_width=400, elem_id="login") as input_block: | |
| gr.Markdown("# Login Page") | |
| with gr.Row(): | |
| name_input = gr.Textbox(label="Name") | |
| with gr.Row(): | |
| session_title_input = gr.Textbox(label="Session Title") | |
| with gr.Row(): | |
| persona_dropdown = gr.Dropdown(choices=list(PERSONA_MAP.keys()), | |
| label="Select Persona", | |
| value="Creative Muse", | |
| interactive=True) | |
| with gr.Row(): | |
| submit_button = gr.Button("Submit") | |
| with gr.Column(visible=False) as output_container: | |
| gr.Markdown("# 🌟 CreativeMind ✍️") | |
| gr.Markdown("### Your *Personalized* Creative Writing Companion") | |
| state = gr.State([]) | |
| rem = gr.Timer(15) | |
| rem.tick(lambda state: state[0].system_message_reminder() if state else None, inputs=state, outputs=None, trigger_mode='once') | |
| history_dropdown = gr.Dropdown() | |
| # Arrange the main conversation area in two columns: | |
| with gr.Row(): | |
| # Left Column: Chatbot and Query Input | |
| with gr.Column(scale=3): | |
| chatbot = gr.Chatbot(type="messages", avatar_images=("user.jpeg", "CreativeBuddy.jpg"), bubble_full_width=True) | |
| query_input = gr.MultimodalTextbox(interactive=True, | |
| placeholder="Enter message or upload file...", show_label=False) | |
| query_input.submit(lambda state, chat, prompt: chatbot_interface(state, chat, prompt), | |
| inputs=[state, chatbot, query_input], | |
| outputs=[chatbot, query_input]) | |
| # Right Column: Sticky Pad (top) and Inspirational Prompt (bottom) | |
| with gr.Column(scale=1): | |
| sticky_pad = gr.Textbox(label="Sticky Pad (Your Saved Inspirations)", lines=10, interactive=True, value="") | |
| add_sticky_btn = gr.Button("Add Last Response to Sticky Pad") | |
| inspiration_label = gr.Label(value="Your inspirational prompt will appear here...", show_label=True) | |
| insp_timer = gr.Timer(10) | |
| insp_timer.tick(fn=update_inspiration, outputs=inspiration_label) | |
| add_sticky_btn.click(fn=add_to_sticky, inputs=[state, sticky_pad], outputs=sticky_pad) | |
| def update_chatbot_with_history(state, chatbot, selected_title): | |
| print(selected_title) | |
| conversation = state[0].load_selected_conversation(selected_title) | |
| chatbot_list = [] | |
| for message in conversation: | |
| if message.get("type") == "image": | |
| f = decode_image(message['content']) | |
| chatbot_list.append(ChatMessage(role=message['role'], content={"path": f, "mime_type": "image/png"})) | |
| else: | |
| tooltip_text = message.get("persona", "") if message.get("role") == "assistant" else "" | |
| chatbot_list.append(ChatMessage(role=message['role'], content=message['content'])) | |
| return chatbot_list | |
| history_dropdown.change(fn=update_chatbot_with_history, | |
| inputs=[state, chatbot, history_dropdown], | |
| outputs=chatbot) | |
| def chatbot_interface(state, messages, prompt): | |
| response = state[0].chatbot_response(messages, prompt) | |
| for x in prompt["files"]: | |
| messages.append(ChatMessage(role="user", content={"path": x, "mime_type": "image/png"})) | |
| if prompt["text"] is not None: | |
| messages.append(ChatMessage(role="user", content=prompt['text'])) | |
| messages.append(ChatMessage(role="assistant", content=response)) | |
| return messages, gr.MultimodalTextbox(value=None, interactive=True) | |
| # ----------------- Modified Login Function ----------------- # | |
| submit_button.click( | |
| fn=lambda name, title, persona, chatbot, state: ( | |
| gr.Dropdown(choices=[title] + list(User(name, title, persona).load_conversation_history().keys()), | |
| label="Select Conversation to Load", allow_custom_value=True, value=title, interactive=True), | |
| state + [User(name, title, persona)], | |
| gr.update(visible=False, elem_id=""), | |
| gr.update(visible=True), | |
| load_sticky_pad(name) # Load saved sticky pad content for the user | |
| ), | |
| inputs=[name_input, session_title_input, persona_dropdown, chatbot, state], | |
| outputs=[history_dropdown, state, input_block, output_container, sticky_pad] | |
| ) | |
| app.launch() | |