File size: 21,900 Bytes
272c4a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8313194
272c4a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
495a51d
272c4a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
495a51d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
import os
os.environ['TZ'] = 'Asia/Kolkata'
import time
import functools
from langchain.schema import HumanMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.tools import tool
import gradio as gr
from io import BytesIO
import json
from datetime import date, datetime
import random
import pandas as pd
from typing import List
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.messages import BaseMessage
from langchain_core.pydantic_v1 import BaseModel, Field
from PIL import Image
import base64
from gradio import ChatMessage

os.environ['TZ'] = 'Asia/Kolkata'

print(time.strftime('%X %x %Z'))

# ---------------------- Sticky Pad Persistence Functions ---------------------- #
def load_sticky_pad(username: str) -> str:
    """Load sticky pad content from the Sticky Notes directory."""
    directory = "Sticky Notes"
    if not os.path.exists(directory):
        os.makedirs(directory)
    filepath = os.path.join(directory, f"{username}-sticky.txt")
    if os.path.exists(filepath):
        with open(filepath, "r", encoding="utf-8") as f:
            return f.read()
    return ""

def save_sticky_pad(username: str, content: str) -> None:
    """Save sticky pad content to the Sticky Notes directory."""
    directory = "Sticky Notes"
    if not os.path.exists(directory):
        os.makedirs(directory)
    filepath = os.path.join(directory, f"{username}-sticky.txt")
    with open(filepath, "w", encoding="utf-8") as f:
        f.write(content)
# ---------------------------------------------------------------------------- #

def load_schedules():
    with open('schedules.json', 'r') as f:
        return json.load(f)

def save_schedules(schedules):
    with open('schedules.json', 'w') as f:
        json.dump(schedules, f, indent=4)

def cache_with_timeout(timeout: int):
    def decorator(func):
        cache = {}
        @functools.wraps(func)
        def wrapper(*args):
            if args in cache:
                result, timestamp = cache[args]
                if time.time() - timestamp < timeout:
                    return result
            result = func(*args)
            cache[args] = (result, time.time())
            return result
        return wrapper
    return decorator

class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    messages: List[BaseMessage] = Field(default_factory=list)
    def add_messages(self, messages: List[BaseMessage]) -> None:
        self.messages.extend(messages)
    def clear(self) -> None:
        self.messages = []

def encode_image_to_base64(image_path):
    image = Image.open(image_path)
    buffered = BytesIO()
    image.save(buffered, format=image.format)
    img_bytes = buffered.getvalue()
    return base64.b64encode(img_bytes).decode("utf-8")

def parse(history):
    p = '\n'
    if history == []:
        return "No Chat till now"
    for i in history:
        try:
            p += i['role'] + ': ' + i['content'] + '\n'
        except:
            pass
    return p

def decode_image(encoded_image):
    image_data = base64.b64decode(encoded_image)
    image = Image.open(BytesIO(image_data))
    global i
    filename = f"temp{i}.jpeg"
    i += 1
    image.save(filename)
    return filename

i = 0  # Global counter for image filenames

# Detailed persona descriptions
PERSONA_MAP = {
    "Creative Muse": "A warm, imaginative persona that encourages creative exploration and inspiring writing.",
    "Literary Critic": "A refined and insightful persona offering deep literary analysis and constructive feedback.",
    "Storyteller": "A narrative-driven persona that weaves engaging, immersive stories with a touch of humor.",
    "Academic Advisor": "A formal and knowledgeable persona that provides scholarly insights and structured guidance."
}

# A list of inspirational prompt examples (25 examples)
INSPIRATIONAL_PROMPTS = [
    "Write about a hidden garden that only appears at dusk.",
    "Imagine a world where dreams dictate reality.",
    "Describe a city that floats in the sky.",
    "Craft a story around a mysterious, timeless letter.",
    "Write a poem about the sound of rain on a tin roof.",
    "Imagine a dialogue between two ancient trees.",
    "Describe a sunset as if seen through an artist’s eyes.",
    "Craft a narrative about a forgotten melody.",
    "Write about an unexpected friendship in an unlikely place.",
    "Imagine a secret door in an ordinary room.",
    "Describe the journey of a single, determined raindrop.",
    "Write a story that begins with a chance encounter.",
    "Imagine a future where art and science merge seamlessly.",
    "Craft a tale inspired by the patterns of the stars.",
    "Describe the magic hidden in everyday moments.",
    "Write a narrative about a silent revolution of ideas.",
    "Imagine a world where time flows backward.",
    "Craft a poem celebrating the beauty of imperfections.",
    "Describe a character who finds solace in solitude.",
    "Write about a moment when everything suddenly made sense.",
    "Imagine a landscape painted by the emotions of its inhabitants.",
    "Craft a narrative that blurs the line between reality and fantasy.",
    "Describe a long-forgotten legend in a modern setting.",
    "Write a story inspired by the interplay of light and shadow.",
    "Imagine a conversation with your future self."
]

class User:
    def __init__(self, username: str, title: str, persona: str):
        self.username = username
        self.session_title = title
        self.persona = persona
        self.persona_description = PERSONA_MAP.get(persona, "A creative writing assistant.")
        # Store the last assistant response for the sticky pad.
        self.last_response = ""
        
        # Two Gemini 1.5 pro instances with tailored parameters:
        self.llm_main = ChatGoogleGenerativeAI(
            model="gemini-2.0-flash",
            temperature=1,
            max_tokens=1500,
            timeout=None,
            max_retries=5,
            google_api_key=os.getenv('API_KEY')
        )
        self.llm_inquiry = ChatGoogleGenerativeAI(
            model="gemini-2.0-flash",
            temperature=0.8,
            max_tokens=1024,
            timeout=None,
            max_retries=5,
            google_api_key=os.getenv('API_KEY')
        )
        
        self.template = f"""
You are a Vision Enabled Creative Writing Assistant named 'CreativeMind' with the persona: {self.persona} - {self.persona_description}.
Your role is to provide human-like, engaging, and insightful creative writing advice, literary analysis, and writing prompts.
You utilize multiple instances of Carefully Prompt Engineered LLMs: one for general conversation and tool orchestration, and a dedicated one for creative inquiries.
Call the necessary TOOLS as required.
REMEMBER USER CAN SEE ONLY YOUR RESPONSE AND THEY CAN'T SEE THE TOOLS OUTPUT.
"ALWAYS call the `add_reminder` tool for EVERY reminder request."
You can:
  1. Schedule a review session (tool provided)
  2. Answer creative writing inquiries and provide literary analysis (tool provided)
  3. Provide inspiring writing prompts and ideas.
  4. Set reminders for creative tasks using the reminder tool.

Engage warmly, include emojis, and provide detailed explanations.
Current Date (For scheduling ONLY, if no date is mentioned assume Today): {date.today()}
Name of the User: {self.username}
        """
        self.prompt = ChatPromptTemplate.from_messages(
            [
                ("system", self.template),
                ("placeholder", "{chat_history}"),
                ("placeholder", "{input}"),
                ("placeholder", "{agent_scratchpad}"),
            ]
        )
        self.store = {}

        @tool
        def creative_inquiry(question: str) -> str:
            """Answers creative writing queries and generates inspiring writing prompts using the dedicated Gemini 1.5 pro instance."""
            p=self.llm_inquiry.invoke(self.PI_prompt.format(question), config=self.config)
            return p.content

        @tool
        def add_reminder(time: str, name: str) -> str:
            """
            Adds a reminder for the given time and name.
            Parameters:
                - time: in format '%Y-%m-%d %H:%M'.
                - name: Name of the creative task or event.
            """
            reminders = self.load_reminders()
            reminder_entry = {"time": time, "name": name}
            reminders.append(reminder_entry)
            self.save_reminders(reminders)
            return f"Reminder set for '{name}' at {time}."

        @tool
        def schedule_review(query: str) -> str:
            """Schedules a creative review session.
            Parameter: A single string in the format `%Y-%m-%d %H:%M` ONLY"""
            schedules = load_schedules()
            query = query.replace("`", '')
            combined_time_str = datetime.strptime(query, "%Y-%m-%d %H:%M")
            if schedules.get(str(combined_time_str), "") == "":
                schedules[str(combined_time_str)] = self.username
                save_schedules(schedules)
                return f"Review session scheduled successfully for {self.username} at {combined_time_str}."
            else:
                return "The preferred time slot is unavailable. Please choose another time."

        self.PI_prompt = '''Context:

You are a creative writing assistant. When given a literary query or a request for a writing prompt, provide thoughtful, inspiring, and creative responses.

Example Query:
"Can you suggest a writing prompt involving a mysterious lighthouse?"

AI-powered Response:
"Imagine a weather-beaten lighthouse standing alone on a rocky shore, its beacon a relic of forgotten times. Write about a stormy night when the light flickers mysteriously, revealing secrets hidden beneath the crashing waves."

User Query:
{}

Note: Focus on creativity, literary flair, and thoughtful insights.
'''
        # Create the agent using the main instance.
        self.agent = create_tool_calling_agent(self.llm_main, [schedule_review, creative_inquiry, add_reminder], self.prompt)
        self.agent_executor = RunnableWithMessageHistory(
            AgentExecutor(agent=self.agent, tools=[schedule_review, creative_inquiry, add_reminder], verbose=True),
            self.get_by_session_id,
            input_messages_key="input",
            history_messages_key="chat_history",
        )
        self.config = {"configurable": {"session_id": self.username + "-" + self.session_title}}

    def get_by_session_id(self, session_id: str) -> BaseChatMessageHistory:
        if session_id not in self.store:
            self.store[session_id] = InMemoryHistory()
        return self.store[session_id]

    def load_reminders(self):
        try:
            with open(f'reminders/{self.username}-reminders.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return []

    def save_reminders(self, reminders):
        with open(f'reminders/{self.username}-reminders.json', 'w') as f:
            json.dump(reminders, f, indent=4)

    def save_conversation_history(self, history_data):
        if not os.path.exists(f'conv/{self.username}-conversation_history.json'):
            with open(f'conv/{self.username}-conversation_history.json', 'w') as f:
                json.dump({}, f)
        with open(f'conv/{self.username}-conversation_history.json', 'w') as f:
            json.dump(history_data, f, indent=4)

    def save_conversation(self, title, user_input, ai_response, images=None):
        history_data = self.load_conversation_history()
        conversation_entry = [{"role": "user", "content": user_input}]
        if images:
            for img in images:
                encoded_image = encode_image_to_base64(img)
                conversation_entry.append({"role": "user", "content": encoded_image, "type": "image"})
        conversation_entry.append({"role": "assistant", "content": ai_response, "persona": self.persona})
        if title in history_data:
            history_data[title].extend(conversation_entry)
        else:
            history_data[title] = conversation_entry
        self.save_conversation_history(history_data)

    def load_conversation_history(self):
        if not os.path.exists(f'conv/{self.username}-conversation_history.json'):
            with open(f'conv/{self.username}-conversation_history.json', 'w') as f:
                json.dump({}, f)
        with open(f'conv/{self.username}-conversation_history.json', 'r') as f:
            return json.load(f)

    def update_conversation_history(self, session_id, message_data):
        conversation_history = self.load_conversation_history()
        if session_id not in conversation_history:
            conversation_history[session_id] = []
        conversation_history[session_id].append(message_data)
        with open(f'conv/{self.username}-conversation_history.json', 'w') as f:
            json.dump(conversation_history, f, indent=4)

    def system_message_reminder(self):
        reminders = self.load_reminders()
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M')
        for reminder in reminders:
            if reminder['time'] == current_time:
                print("TIME UP!!")
                message = HumanMessage(content=[{"type": "text", "text": f"System: {reminder['time']} reached! Time for your creative task: {reminder['name']} 🎨"}])
                result = self.agent_executor.invoke({"input": [message]}, config=self.config)
                reminders.remove(reminder)
                self.save_reminders(reminders)
                response = result['output']
                gr.Info(response, duration=30)

    def load_selected_conversation(self, title):
        history_data = self.load_conversation_history()
        print(f"Title type: {type(title)}, Title: {title}, {history_data}")
        return history_data.get(title, [])

    def save_ai_response(self, response):
        if not os.path.exists("responses"):
            os.makedirs("responses")
        timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
        filename = f"responses/{self.username}-{self.session_title}-{timestamp}.txt"
        with open(filename, "w",encoding="utf-8") as f:
            f.write(response)

    def chatbot_response(self, history, query):
        extra_text = ""
        if query.get('files'):
            image_data = []
            for x in range(len(query["files"])):
                image = encode_image_to_base64(query['files'][x])
                image_data += [HumanMessage(
                    content=[{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}}]
                )]
            image_data += [HumanMessage(
                content=[{"type": "text", "text": "Invoke the necessary tools for the query: " + query['text'] + extra_text}]
            )]
            result = self.agent_executor.invoke({"input": image_data}, config=self.config)
            self.save_conversation(self.session_title, query['text'], result['output'], images=query['files'])
        else:
            message = HumanMessage(
                content=[{"type": "text", "text": "Invoke the necessary tools for the query: " + query['text'] + extra_text}]
            )
            result = self.agent_executor.invoke({"input": [message]}, config=self.config)
            self.save_conversation(self.session_title, query['text'], result['output'])
        response = result['output']
        self.last_response = response  # Store last response for the sticky pad.
        self.save_ai_response(response)
        return response

# ------------------ Modified Sticky Pad Function ------------------ #
def add_to_sticky(state, sticky_text):
    if state and hasattr(state[0], "last_response"):
        new_text = sticky_text + "\n" + state[0].last_response
    else:
        new_text = sticky_text
    # Save the updated sticky pad content to file
    save_sticky_pad(state[0].username, new_text)
    return new_text
# ------------------------------------------------------------------ #

# Function to update inspirational prompt; updates every 10 seconds.
def update_inspiration():
    prompt = random.choice(INSPIRATIONAL_PROMPTS)
    return prompt

# Gradio Interface

with gr.Blocks(theme=gr.themes.Soft(secondary_hue="green"),fill_width=True,fill_height=True,
                css="footer {visibility: hidden;} #login {display: flex; flex-direction: column; align-items: center; justify-content: center; padding: 20px; border: 1px solid #ccc; border-radius: 5px; background-color: #f0f0f0; width: 300px; margin: 15vh auto;}.message-row.svelte-1x5p6hu img{margin:0px !important;}.avatar-container.svelte-1x5p6hu:not(.thumbnail-item) img{padding: 0px !important;}") as app:
    # Login block with persona selection.
    with gr.Column(visible=True, min_width=400, elem_id="login") as input_block:
        gr.Markdown("# Login Page")
        with gr.Row():
            name_input = gr.Textbox(label="Name")
        with gr.Row():
            session_title_input = gr.Textbox(label="Session Title")
        with gr.Row():
            persona_dropdown = gr.Dropdown(choices=list(PERSONA_MAP.keys()),
                                           label="Select Persona",
                                           value="Creative Muse",
                                           interactive=True)
        with gr.Row():
            submit_button = gr.Button("Submit")
    
    with gr.Column(visible=False) as output_container:
        gr.Markdown("#  🌟 CreativeMind ✍️")
        gr.Markdown("### Your *Personalized* Creative Writing Companion")
        state = gr.State([])
        rem = gr.Timer(15)
        rem.tick(lambda state: state[0].system_message_reminder() if state else None, inputs=state, outputs=None, trigger_mode='once')
        history_dropdown = gr.Dropdown()
        
        # Arrange the main conversation area in two columns:
        with gr.Row():
            # Left Column: Chatbot and Query Input
            with gr.Column(scale=3):
                chatbot = gr.Chatbot(type="messages", avatar_images=("user.jpeg", "CreativeBuddy.jpg"), bubble_full_width=True)
                query_input = gr.MultimodalTextbox(interactive=True,
                                                  placeholder="Enter message or upload file...", show_label=False)
                query_input.submit(lambda state, chat, prompt: chatbot_interface(state, chat, prompt),
                                   inputs=[state, chatbot, query_input],
                                   outputs=[chatbot, query_input])
            # Right Column: Sticky Pad (top) and Inspirational Prompt (bottom)
            with gr.Column(scale=1):
                sticky_pad = gr.Textbox(label="Sticky Pad (Your Saved Inspirations)", lines=10, interactive=True, value="")
                add_sticky_btn = gr.Button("Add Last Response to Sticky Pad")
                inspiration_label = gr.Label(value="Your inspirational prompt will appear here...", show_label=True)
                insp_timer = gr.Timer(10)
                insp_timer.tick(fn=update_inspiration, outputs=inspiration_label)
                add_sticky_btn.click(fn=add_to_sticky, inputs=[state, sticky_pad], outputs=sticky_pad)
                
        def update_chatbot_with_history(state, chatbot, selected_title):
            print(selected_title)
            conversation = state[0].load_selected_conversation(selected_title)
            chatbot_list = []
            for message in conversation:
                if message.get("type") == "image":
                    f = decode_image(message['content'])
                    chatbot_list.append(ChatMessage(role=message['role'], content={"path": f, "mime_type": "image/png"}))
                else:
                    tooltip_text = message.get("persona", "") if message.get("role") == "assistant" else ""
                    chatbot_list.append(ChatMessage(role=message['role'], content=message['content']))
            return chatbot_list

        history_dropdown.change(fn=update_chatbot_with_history,
                                inputs=[state, chatbot, history_dropdown],
                                outputs=chatbot)
    
        def chatbot_interface(state, messages, prompt):
            response = state[0].chatbot_response(messages, prompt)
            for x in prompt["files"]:
                messages.append(ChatMessage(role="user", content={"path": x, "mime_type": "image/png"}))
            if prompt["text"] is not None:
                messages.append(ChatMessage(role="user", content=prompt['text']))
            messages.append(ChatMessage(role="assistant", content=response))
            return messages, gr.MultimodalTextbox(value=None, interactive=True)
    
        # ----------------- Modified Login Function ----------------- #
        submit_button.click(
            fn=lambda name, title, persona, chatbot, state: (
                gr.Dropdown(choices=[title] + list(User(name, title, persona).load_conversation_history().keys()),
                            label="Select Conversation to Load", allow_custom_value=True, value=title, interactive=True),
                state + [User(name, title, persona)],
                gr.update(visible=False, elem_id=""),
                gr.update(visible=True),
                load_sticky_pad(name)  # Load saved sticky pad content for the user
            ),
            inputs=[name_input, session_title_input, persona_dropdown, chatbot, state],
            outputs=[history_dropdown, state, input_block, output_container, sticky_pad]
        )

app.launch()