File size: 21,620 Bytes
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e22b17
 
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9cda0fc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b154db
b47e518
 
9b154db
b47e518
 
 
 
 
 
1e67e77
 
 
 
b47e518
 
 
 
1e67e77
b47e518
1e67e77
 
b47e518
1e67e77
 
 
 
b47e518
 
 
1e67e77
b47e518
1e67e77
b47e518
1e67e77
b47e518
1e67e77
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b47e518
 
 
 
cdfbca8
b47e518
 
 
 
 
 
 
 
 
 
 
 
cdfbca8
b47e518
 
 
cdfbca8
b47e518
cdfbca8
b47e518
 
 
 
cdfbca8
 
b47e518
b2d7435
 
cdfbca8
b47e518
b2d7435
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b154db
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cdfbca8
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
cdfbca8
b47e518
 
 
cdfbca8
b47e518
cdfbca8
b47e518
 
 
 
cdfbca8
b47e518
 
 
9b154db
b47e518
 
 
 
 
9b154db
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99e4baa
b47e518
 
9b154db
b47e518
 
 
 
 
 
 
 
 
 
cdfbca8
b47e518
 
 
cdfbca8
b47e518
cdfbca8
b47e518
 
 
 
cdfbca8
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
cdfbca8
b47e518
 
cdfbca8
 
 
b47e518
 
 
 
 
 
cdfbca8
b47e518
 
 
cdfbca8
b47e518
cdfbca8
b47e518
 
 
cdfbca8
b47e518
 
cdfbca8
 
b47e518
 
cdfbca8
b47e518
 
 
 
 
9b154db
b47e518
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
import re
import time
import tempfile
import requests
import json
from google import genai
from google.genai import types
import google.generativeai as genai
import io
import base64
import numpy as np
import cv2
import logging
import uuid
import subprocess
from pathlib import Path
import wikipedia  # using the PyPI wikipedia package
import urllib.parse
import pandas as pd
from PyPDF2 import PdfReader
import plotly.graph_objects as go
import matplotlib.pyplot as plt
from langchain_google_genai import ChatGoogleGenerativeAI
# For PandasAI using a single dataframe
from pandasai import SmartDataframe
from pandasai.responses.response_parser import ResponseParser
#from langchain_community.chat_models.sambanova import ChatSambaNovaCloud
from pandasai.exceptions import InvalidOutputValueMismatch
import base64
import os
import uuid
import matplotlib
import matplotlib.pyplot as plt
from io import BytesIO
import dataframe_image as dfi
import uuid
from supadata import Supadata, SupadataError
from PIL import ImageFont, ImageDraw, Image
import seaborn as sns
from flask import jsonify


# -----------------------
# Configuration and Logging
# -----------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

guid = uuid.uuid4()
new_filename = f"{guid}"
user_defined_path = os.path.join("/exports/charts", new_filename)

class FlaskResponse(ResponseParser):
    def __init__(self, context):
        super().__init__(context)

    def format_dataframe(self, result):
        return result["value"].to_html()

    def format_plot(self, result):
        val = result["value"]
        # If val is a matplotlib figure, handle it accordingly.
        if hasattr(val, "savefig"):
            try:
                buf = io.BytesIO()
                val.savefig(buf, format="png")
                buf.seek(0)
                image_base64 = base64.b64encode(buf.read()).decode("utf-8")
                return f"data:image/png;base64,{image_base64}"
            except Exception as e:
                print("Error processing figure:", e)
                return str(val)
        # If val is a string and is a valid file path, read and encode it.
        if isinstance(val, str) and os.path.isfile(os.path.join(val)):
            image_path = os.path.join(val)
            print("My image path:", image_path)
            with open(image_path, "rb") as file:
                data = file.read()
            base64_data = base64.b64encode(data).decode("utf-8")
            return f"data:image/png;base64,{base64_data}"
        # Fallback: return as a string.
        return str(val)

    def format_other(self, result):
        # For non-image responses, simply return the value as a string.
        return str(result["value"])

# Pandasai gemini
llm1 = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash-thinking-exp",
    temperature=0,
    max_tokens=None,
    timeout=1000,
    max_retries=2
)

# Initialize the supdata client
SUPADATA = os.getenv('SUPADATA')
supadata = Supadata(api_key=f"{SUPADATA}")
# -----------------------
# Utility Constants
# -----------------------
MAX_CHARACTERS = 200000  # Approximate token limit: 50,000 tokens ~ 200,000 characters

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")

def configure_gemini(api_key):
    try:
        genai.configure(api_key=api_key)
        return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
    except Exception as e:
        logger.error(f"Error configuring Gemini: {str(e)}")
        raise

# Initialize Gemini model for story generation
model = configure_gemini(GOOGLE_API_KEY)

# -----------------------
# File Upload Helpers
# -----------------------
def get_pdf_text(pdf_file):
    """Extract text from a PDF file and enforce token limit."""
    text = ""
    pdf_reader = PdfReader(pdf_file)
    for page in pdf_reader.pages:
        page_text = page.extract_text()
        if page_text:
            text += page_text + "\n"
    if len(text) > MAX_CHARACTERS:
        text = text[:MAX_CHARACTERS]
    return text

def get_df(uploaded_file, ext):
    """
    Reads an uploaded file into a pandas DataFrame if the extension is csv, xlsx, or xls.

    Args:
        uploaded_file: The uploaded file object.
        ext (str): The extension of the uploaded file.

    Returns:
        pandas.DataFrame: The DataFrame if the file is successfully read, otherwise None.
    """
    if ext in ["csv", "xlsx", "xls"]:
        try:
            if ext == "csv":
                df = pd.read_csv(uploaded_file)
            else:
                df = pd.read_excel(uploaded_file)
            return df
        except Exception as e:
            print(f"Error reading file: {e}")
            return None
    else:
        print(f"Unsupported file extension: {ext}. Please upload a csv, xlsx, or xls file.")
        return None

# -----------------------
# Audio Transcription
# -----------------------

def transcribe_audio(audio_file):
    """
    Transcribe audio using DeepGram's API (model: nova-3).
    Expects a WAV audio file.
    """
    deepgram_api_key = os.getenv("DeepGram")
    if not deepgram_api_key:
        st.error("DeepGram API Key is missing. Please set DEEPGRAM_API_KEY in environment variables.")
        return None
    headers_transcribe = {
        "Authorization": f"Token {deepgram_api_key}",
        "Content-Type": "audio/wav"
    }
    url = "https://api.deepgram.com/v1/listen?model=nova-3"
    try:
        audio_bytes = audio_file.read()
        response = requests.post(url, headers=headers_transcribe, data=audio_bytes)
        if response.status_code == 200:
            data = response.json()
            transcription = data.get("text", "")
            return transcription
        else:
            print(f"Deepgram transcription error: {response.status_code}")
            return None
    except Exception as e:
        print(f"Error during transcription: {e}")
        return None

# -----------------------
# PandasAI Response for DataFrame (using SmartDataframe and ChatSambaNovaCloud)
# -----------------------
def generateResponse(prompt, df):
    """
    Return either a base64-encoded PNG string like 'data:image/png;base64,...'
    if the answer is a chart, or a fallback string if the answer is something else.
    """
    pandas_agent = SmartDataframe(
        df,
        config={
            "llm": llm,
            "response_parser": FlaskResponse,  # You can still use it for internal logic
            "custom_whitelisted_dependencies": [
                "os", "io", "sys", "chr", "glob", "b64decoder",
                "collections", "geopy", "geopandas", "wordcloud", "builtins"
            ],
            "security": "none",
            "save_charts_path": user_defined_path,
            "save_charts": False,
            "enable_cache": False,
        }
    )

    answer = pandas_agent.chat(prompt)

    # Convert 'answer' into a base64 string or fallback
    if isinstance(answer, pd.DataFrame):
        return answer.to_html()

    elif hasattr(answer, "savefig"):  # e.g. a Matplotlib figure
        try:
            buf = io.BytesIO()
            answer.savefig(buf, format="png")
            buf.seek(0)
            image_base64 = base64.b64encode(buf.read()).decode("utf-8")
            return f"data:image/png;base64,{image_base64}"
        except Exception as e:
            print("Error processing figure:", e)
            return None

    elif isinstance(answer, str):
        # Could be a file path or just a textual answer
        if os.path.isfile(answer):
            with open(answer, "rb") as f:
                data = f.read()
            b64 = base64.b64encode(data).decode("utf-8")
            return f"data:image/png;base64,{b64}"
        else:
            return answer
    else:
        # fallback
        return str(answer)

# -----------------------
# DataFrame-Based Story Generation (for CSV/Excel files)
# -----------------------
# -----------------------
def generate_story_from_dataframe(df, story_type):
    """
    Generate a data-based story from a CSV/Excel file.
    The dataframe is converted to a JSON string and used as input in a prompt that instructs the model to produce
    exactly 5 sections. Each section includes a brief analysis and an image description inside <>.
    For dataframe stories, the image descriptions should be chart prompts based on the data.
    """
    df_json = json.dumps(df.to_dict())
    prompts = {
        "free_form": "You are a professional storyteller. Using the following dataset in JSON format: " + df_json +
                     ", create an engaging and concise story. ",
        "children": "You are a professional storyteller writing stories for children. Using the following dataset in JSON format: " + df_json +
                    ", create a fun, factual, and concise story appropriate for children. ",
        "education": "You are a professional storyteller writing educational content. Using the following dataset in JSON format: " + df_json +
                     ", create an informative, engaging, and concise educational story. Include interesting facts while keeping it engaging. ",
        "business": "You are a professional storyteller specializing in business narratives. Using the following dataset in JSON format: " + df_json +
                    ", create a professional, concise business story with practical insights. ",
        "entertainment": "You are a professional storyteller writing creative entertaining stories. Using the following dataset in JSON format: " + df_json +
                           ", create an engaging and concise entertaining story. Include interesting facts while keeping it engaging. "
    }
    story_prompt = prompts.get(story_type, prompts["free_form"])
    full_prompt = (
        story_prompt +
        "Write a story for a narrator meaning no labels of pages or sections the story should just flow. Divide your story into exactly 5 very short and concise sections separated by [break]. " +
        "Aim for a maximum of 3 sentences per section to ensure a quicker narration. " +
        "For each section, provide a brief narrative analysis and include, within angle brackets <>, a clear and plain-text description of a chart visualization that would represent the data. " +
        "Limit the descriptions by specifying only charts. " +
        "Ensure that your response contains only natural language descriptions examples: 'bar chart of', 'pie chart of' , 'histogram of', 'scatterplot of', 'boxplot of' , 'heatmap of etc' and nothing else."

    )
#
    try:
        response = model.generate_content(full_prompt)
        if not response or not response.text:
            return None

        # Ensure exactly 5 sections
        sections = response.text.split("[break]")
        sections = [s.strip() for s in sections if s.strip()]  # Remove empty sections

        if len(sections) < 5:
            sections += ["(Placeholder section)"] * (5 - len(sections))  # Fill missing sections
        elif len(sections) > 5:
            sections = sections[:5]  # Trim excess sections

        return "[break]".join(sections)

    except Exception as e:
        print(f"Error generating story from dataframe: {e}")
        return None


# -----------------------
# Existing Story Generation Functions (Text, Wikipedia, Bible, Youtube(new))
# -----------------------
def generate_story_from_text(prompt_text, story_type):
    prompts = {
        "free_form": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create an engaging and concise story. ",
        "children": "You are a professional storyteller for children. Based on the prompt: " + prompt_text + ", create a fun and concise story. ",
        "education": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create an educational and engaging story. ",
        "business": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create a professional business story. ",
        "entertainment": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create an entertaining and concise story. "
    }
    story_prompt = prompts.get(story_type, prompts["free_form"])
    response = model.generate_content(
        story_prompt +
        "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. Aim for a maximum of 3 sentences per section. For each section, include a brief image description inside <>."
    )
    return response.text if response else None

def generate_story_from_wiki(wiki_url, story_type):
    try:
        page_title = wiki_url.rstrip("/").split("/")[-1]
        wikipedia.set_lang("en")
        page = wikipedia.page(page_title)
        wiki_text = page.summary
        prompts = {
            "free_form": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text +
                         ", create an engaging and concise story. ",
            "children": "You are a professional storyteller for children. Using the following Wikipedia info: " + wiki_text +
                        ", create a fun and concise story. ",
            "education": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text +
                         ", create an educational and engaging story. ",
            "business": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text +
                        ", create a professional business story. ",
            "entertainment": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text +
                             ", create an entertaining and concise story. "
        }
        story_prompt = prompts.get(story_type, prompts["free_form"])
        response = model.generate_content(
            story_prompt +
            "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. Aim for a maximum of 3 sentences per section. For each section, include a brief image description inside <>."
        )
        return response.text if response else None
    except Exception as e:
        print(f"Error generating story from Wikipedia: {e}")
        return None

def fetch_bible_text(reference):
    m = re.match(r"(?P<book>[1-3]?\s*\w+(?:\s+\w+)*)\s+(?P<chapter>\d+)(?::(?P<verse_start>\d+)(?:-(?P<verse_end>\d+))?)?", reference)
    if not m:
        print("Bible reference format invalid. Use format like 'Genesis 1:1-5' or 'Psalms 23'.")
        return None
    book = m.group("book").strip().lower().replace(" ", "")
    chapter = m.group("chapter")
    verse_start = m.group("verse_start")
    verse_end = m.group("verse_end")
    if verse_start:
        if verse_end is None:
            verse_range = [verse_start]
        else:
            verse_range = [str(v) for v in range(int(verse_start), int(verse_end) + 1)]
        verses_text = []
        for verse in verse_range:
            url = f"https://cdn.jsdelivr.net/gh/wldeh/bible-api/bibles/en-asv/books/{book}/chapters/{chapter}/verses/{verse}.json"
            try:
                response = requests.get(url)
                if response.status_code == 200:
                    data = response.json()
                    verses_text.append(data.get("text", ""))
                else:
                    verses_text.append(f"[Error fetching verse {verse}]")
            except Exception as e:
                verses_text.append(f"[Exception fetching verse {verse}: {e}]")
        return " ".join(verses_text)
    else:
        url = f"https://cdn.jsdelivr.net/gh/wldeh/bible-api/bibles/en-asv/books/{book}/chapters/{chapter}.json"
        try:
            response = requests.get(url)
            if response.status_code == 200:
                data = response.json()
                if isinstance(data, list):
                    verses = [verse.get("text", "") for verse in data]
                    return " ".join(verses)
                elif isinstance(data, dict) and "verses" in data:
                    verses = [verse.get("text", "") for verse in data["verses"]]
                    return " ".join(verses)
                else:
                    return str(data)
            else:
                print("Error fetching chapter text.")
                return None
        except Exception as e:
            print(f"Exception fetching chapter: {e}")
            return None

def generate_story_from_bible(reference, story_type):
    bible_text = fetch_bible_text(reference)
    if bible_text is None:
        return None
    prompts = {
        "free_form": "You are a professional storyteller. Using the following Bible text: " + bible_text +
                     ", create an engaging and concise story. ",
        "children": "You are a professional storyteller for children. Using the following Bible text: " + bible_text +
                    ", create a fun and concise story. ",
        "education": "You are a professional storyteller. Using the following Bible text: " + bible_text +
                     ", create an educational and engaging story. ",
        "business": "You are a professional storyteller. Using the following Bible text: " + bible_text +
                    ", create a professional business story. ",
        "entertainment": "You are a professional storyteller. Using the following Bible text: " + bible_text +
                         ", create an entertaining and concise story. "
    }
    story_prompt = prompts.get(story_type, prompts["free_form"])
    response = model.generate_content(
        story_prompt +
        "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. Aim for a maximum of 3 sentences per section. For each section, include a brief image description inside <>."
    )
    return response.text if response else None


def generate_story_from_youtube(youtube_url, story_type):
    try:
        # Extract video_id from the URL
        if "v=" in youtube_url:
            video_id = youtube_url.split("v=")[1].split("&")[0]
        elif "youtu.be/" in youtube_url:
            video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
        else:
            raise ValueError("Invalid YouTube URL provided.")

        # Retrieve the transcript as a list of dictionaries
        transcript_res = supadata.youtube.transcript(
            video_id=video_id,
            text=True
        )
        transcript_text = transcript_res.content
        # Define story prompts based on story_type, similar to the Wikipedia function
        prompts = {
            "free_form": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text +
                         ", create an engaging and concise story. ",
            "children": "You are a professional storyteller for children. Using the following YouTube transcript: " + transcript_text +
                        ", create a fun and concise story. ",
            "education": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text +
                         ", create an educational and engaging story. ",
            "business": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text +
                        ", create a professional business story. ",
            "entertainment": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text +
                             ", create an entertaining and concise story. "
        }
        # Use the provided story_type, defaulting to free_form if not found
        story_prompt = prompts.get(story_type, prompts["free_form"])

        # Append additional instructions for story structure
        full_prompt = story_prompt + (
            "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. "
            "Aim for a maximum of 3 sentences per section. "
            "For each section, include an image description inside <>."
        )

        # Generate content using your model (assumes model.generate_content is available)
        response = model.generate_content(full_prompt)
        return response.text if response else None

    except Exception as e:
        print(f"Error generating story from YouTube transcript: {e}")
        return None

# -----------------------
# Extract Image Prompts and Story Sections
# -----------------------
def extract_image_prompts_and_story(story_text):
    pages = []
    image_prompts = []
    parts = re.split(r"\[break\]", story_text)
    for part in parts:
        if not part.strip():
            continue
        img_match = re.search(r"<(.*?)>", part)
        if img_match:
            image_prompts.append(img_match.group(1).strip())
            pages.append(re.sub(r"<(.*?)>", "", part).strip())
        else:
            snippet = part.strip()[:100]
            pages.append(snippet)
            image_prompts.append(f"A concise illustration of {snippet}")
    return pages, image_prompts