Spaces:
Sleeping
Sleeping
| import re | |
| import time | |
| import tempfile | |
| import requests | |
| import json | |
| from google import genai | |
| from google.genai import types | |
| import google.generativeai as genai | |
| import io | |
| import base64 | |
| import numpy as np | |
| import cv2 | |
| import logging | |
| import uuid | |
| import subprocess | |
| from pathlib import Path | |
| import wikipedia # using the PyPI wikipedia package | |
| import urllib.parse | |
| import pandas as pd | |
| from PyPDF2 import PdfReader | |
| import plotly.graph_objects as go | |
| import matplotlib.pyplot as plt | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| # For PandasAI using a single dataframe | |
| from pandasai import SmartDataframe | |
| from pandasai.responses.response_parser import ResponseParser | |
| #from langchain_community.chat_models.sambanova import ChatSambaNovaCloud | |
| from pandasai.exceptions import InvalidOutputValueMismatch | |
| import base64 | |
| import os | |
| import uuid | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| from io import BytesIO | |
| import dataframe_image as dfi | |
| import uuid | |
| from supadata import Supadata, SupadataError | |
| from PIL import ImageFont, ImageDraw, Image | |
| import seaborn as sns | |
| from flask import jsonify | |
| # ----------------------- | |
| # Configuration and Logging | |
| # ----------------------- | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| guid = uuid.uuid4() | |
| new_filename = f"{guid}" | |
| user_defined_path = os.path.join("/exports/charts", new_filename) | |
| class FlaskResponse(ResponseParser): | |
| def __init__(self, context): | |
| super().__init__(context) | |
| def format_dataframe(self, result): | |
| return result["value"].to_html() | |
| def format_plot(self, result): | |
| val = result["value"] | |
| # If val is a matplotlib figure, handle it accordingly. | |
| if hasattr(val, "savefig"): | |
| try: | |
| buf = io.BytesIO() | |
| val.savefig(buf, format="png") | |
| buf.seek(0) | |
| image_base64 = base64.b64encode(buf.read()).decode("utf-8") | |
| return f"data:image/png;base64,{image_base64}" | |
| except Exception as e: | |
| print("Error processing figure:", e) | |
| return str(val) | |
| # If val is a string and is a valid file path, read and encode it. | |
| if isinstance(val, str) and os.path.isfile(os.path.join(val)): | |
| image_path = os.path.join(val) | |
| print("My image path:", image_path) | |
| with open(image_path, "rb") as file: | |
| data = file.read() | |
| base64_data = base64.b64encode(data).decode("utf-8") | |
| return f"data:image/png;base64,{base64_data}" | |
| # Fallback: return as a string. | |
| return str(val) | |
| def format_other(self, result): | |
| # For non-image responses, simply return the value as a string. | |
| return str(result["value"]) | |
| # Pandasai gemini | |
| llm1 = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-thinking-exp", | |
| temperature=0, | |
| max_tokens=None, | |
| timeout=1000, | |
| max_retries=2 | |
| ) | |
| # Initialize the supdata client | |
| SUPADATA = os.getenv('SUPADATA') | |
| supadata = Supadata(api_key=f"{SUPADATA}") | |
| # ----------------------- | |
| # Utility Constants | |
| # ----------------------- | |
| MAX_CHARACTERS = 200000 # Approximate token limit: 50,000 tokens ~ 200,000 characters | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| def configure_gemini(api_key): | |
| try: | |
| genai.configure(api_key=api_key) | |
| return genai.GenerativeModel('gemini-2.0-flash-thinking-exp') | |
| except Exception as e: | |
| logger.error(f"Error configuring Gemini: {str(e)}") | |
| raise | |
| # Initialize Gemini model for story generation | |
| model = configure_gemini(GOOGLE_API_KEY) | |
| # ----------------------- | |
| # File Upload Helpers | |
| # ----------------------- | |
| def get_pdf_text(pdf_file): | |
| """Extract text from a PDF file and enforce token limit.""" | |
| text = "" | |
| pdf_reader = PdfReader(pdf_file) | |
| for page in pdf_reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| if len(text) > MAX_CHARACTERS: | |
| text = text[:MAX_CHARACTERS] | |
| return text | |
| def get_df(uploaded_file, ext): | |
| """ | |
| Reads an uploaded file into a pandas DataFrame if the extension is csv, xlsx, or xls. | |
| Args: | |
| uploaded_file: The uploaded file object. | |
| ext (str): The extension of the uploaded file. | |
| Returns: | |
| pandas.DataFrame: The DataFrame if the file is successfully read, otherwise None. | |
| """ | |
| if ext in ["csv", "xlsx", "xls"]: | |
| try: | |
| if ext == "csv": | |
| df = pd.read_csv(uploaded_file) | |
| else: | |
| df = pd.read_excel(uploaded_file) | |
| return df | |
| except Exception as e: | |
| print(f"Error reading file: {e}") | |
| return None | |
| else: | |
| print(f"Unsupported file extension: {ext}. Please upload a csv, xlsx, or xls file.") | |
| return None | |
| # ----------------------- | |
| # Audio Transcription | |
| # ----------------------- | |
| def transcribe_audio(audio_file): | |
| """ | |
| Transcribe audio using DeepGram's API (model: nova-3). | |
| Expects a WAV audio file. | |
| """ | |
| deepgram_api_key = os.getenv("DeepGram") | |
| if not deepgram_api_key: | |
| st.error("DeepGram API Key is missing. Please set DEEPGRAM_API_KEY in environment variables.") | |
| return None | |
| headers_transcribe = { | |
| "Authorization": f"Token {deepgram_api_key}", | |
| "Content-Type": "audio/wav" | |
| } | |
| url = "https://api.deepgram.com/v1/listen?model=nova-3" | |
| try: | |
| audio_bytes = audio_file.read() | |
| response = requests.post(url, headers=headers_transcribe, data=audio_bytes) | |
| if response.status_code == 200: | |
| data = response.json() | |
| transcription = data.get("text", "") | |
| return transcription | |
| else: | |
| print(f"Deepgram transcription error: {response.status_code}") | |
| return None | |
| except Exception as e: | |
| print(f"Error during transcription: {e}") | |
| return None | |
| # ----------------------- | |
| # PandasAI Response for DataFrame (using SmartDataframe and ChatSambaNovaCloud) | |
| # ----------------------- | |
| def generateResponse(prompt, df): | |
| """ | |
| Return either a base64-encoded PNG string like 'data:image/png;base64,...' | |
| if the answer is a chart, or a fallback string if the answer is something else. | |
| """ | |
| pandas_agent = SmartDataframe( | |
| df, | |
| config={ | |
| "llm": llm, | |
| "response_parser": FlaskResponse, # You can still use it for internal logic | |
| "custom_whitelisted_dependencies": [ | |
| "os", "io", "sys", "chr", "glob", "b64decoder", | |
| "collections", "geopy", "geopandas", "wordcloud", "builtins" | |
| ], | |
| "security": "none", | |
| "save_charts_path": user_defined_path, | |
| "save_charts": False, | |
| "enable_cache": False, | |
| } | |
| ) | |
| answer = pandas_agent.chat(prompt) | |
| # Convert 'answer' into a base64 string or fallback | |
| if isinstance(answer, pd.DataFrame): | |
| return answer.to_html() | |
| elif hasattr(answer, "savefig"): # e.g. a Matplotlib figure | |
| try: | |
| buf = io.BytesIO() | |
| answer.savefig(buf, format="png") | |
| buf.seek(0) | |
| image_base64 = base64.b64encode(buf.read()).decode("utf-8") | |
| return f"data:image/png;base64,{image_base64}" | |
| except Exception as e: | |
| print("Error processing figure:", e) | |
| return None | |
| elif isinstance(answer, str): | |
| # Could be a file path or just a textual answer | |
| if os.path.isfile(answer): | |
| with open(answer, "rb") as f: | |
| data = f.read() | |
| b64 = base64.b64encode(data).decode("utf-8") | |
| return f"data:image/png;base64,{b64}" | |
| else: | |
| return answer | |
| else: | |
| # fallback | |
| return str(answer) | |
| # ----------------------- | |
| # DataFrame-Based Story Generation (for CSV/Excel files) | |
| # ----------------------- | |
| # ----------------------- | |
| def generate_story_from_dataframe(df, story_type): | |
| """ | |
| Generate a data-based story from a CSV/Excel file. | |
| The dataframe is converted to a JSON string and used as input in a prompt that instructs the model to produce | |
| exactly 5 sections. Each section includes a brief analysis and an image description inside <>. | |
| For dataframe stories, the image descriptions should be chart prompts based on the data. | |
| """ | |
| df_json = json.dumps(df.to_dict()) | |
| prompts = { | |
| "free_form": "You are a professional storyteller. Using the following dataset in JSON format: " + df_json + | |
| ", create an engaging and concise story. ", | |
| "children": "You are a professional storyteller writing stories for children. Using the following dataset in JSON format: " + df_json + | |
| ", create a fun, factual, and concise story appropriate for children. ", | |
| "education": "You are a professional storyteller writing educational content. Using the following dataset in JSON format: " + df_json + | |
| ", create an informative, engaging, and concise educational story. Include interesting facts while keeping it engaging. ", | |
| "business": "You are a professional storyteller specializing in business narratives. Using the following dataset in JSON format: " + df_json + | |
| ", create a professional, concise business story with practical insights. ", | |
| "entertainment": "You are a professional storyteller writing creative entertaining stories. Using the following dataset in JSON format: " + df_json + | |
| ", create an engaging and concise entertaining story. Include interesting facts while keeping it engaging. " | |
| } | |
| story_prompt = prompts.get(story_type, prompts["free_form"]) | |
| full_prompt = ( | |
| story_prompt + | |
| "Write a story for a narrator meaning no labels of pages or sections the story should just flow. Divide your story into exactly 5 very short and concise sections separated by [break]. " + | |
| "Aim for a maximum of 3 sentences per section to ensure a quicker narration. " + | |
| "For each section, provide a brief narrative analysis and include, within angle brackets <>, a clear and plain-text description of a chart visualization that would represent the data. " + | |
| "Limit the descriptions by specifying only charts. " + | |
| "Ensure that your response contains only natural language descriptions examples: 'bar chart of', 'pie chart of' , 'histogram of', 'scatterplot of', 'boxplot of' , 'heatmap of etc' and nothing else." | |
| ) | |
| # | |
| try: | |
| response = model.generate_content(full_prompt) | |
| if not response or not response.text: | |
| return None | |
| # Ensure exactly 5 sections | |
| sections = response.text.split("[break]") | |
| sections = [s.strip() for s in sections if s.strip()] # Remove empty sections | |
| if len(sections) < 5: | |
| sections += ["(Placeholder section)"] * (5 - len(sections)) # Fill missing sections | |
| elif len(sections) > 5: | |
| sections = sections[:5] # Trim excess sections | |
| return "[break]".join(sections) | |
| except Exception as e: | |
| print(f"Error generating story from dataframe: {e}") | |
| return None | |
| # ----------------------- | |
| # Existing Story Generation Functions (Text, Wikipedia, Bible, Youtube(new)) | |
| # ----------------------- | |
| def generate_story_from_text(prompt_text, story_type): | |
| prompts = { | |
| "free_form": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create an engaging and concise story. ", | |
| "children": "You are a professional storyteller for children. Based on the prompt: " + prompt_text + ", create a fun and concise story. ", | |
| "education": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create an educational and engaging story. ", | |
| "business": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create a professional business story. ", | |
| "entertainment": "You are a professional storyteller. Based on the prompt: " + prompt_text + ", create an entertaining and concise story. " | |
| } | |
| story_prompt = prompts.get(story_type, prompts["free_form"]) | |
| response = model.generate_content( | |
| story_prompt + | |
| "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. Aim for a maximum of 3 sentences per section. For each section, include a brief image description inside <>." | |
| ) | |
| return response.text if response else None | |
| def generate_story_from_wiki(wiki_url, story_type): | |
| try: | |
| page_title = wiki_url.rstrip("/").split("/")[-1] | |
| wikipedia.set_lang("en") | |
| page = wikipedia.page(page_title) | |
| wiki_text = page.summary | |
| prompts = { | |
| "free_form": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text + | |
| ", create an engaging and concise story. ", | |
| "children": "You are a professional storyteller for children. Using the following Wikipedia info: " + wiki_text + | |
| ", create a fun and concise story. ", | |
| "education": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text + | |
| ", create an educational and engaging story. ", | |
| "business": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text + | |
| ", create a professional business story. ", | |
| "entertainment": "You are a professional storyteller. Using the following Wikipedia info: " + wiki_text + | |
| ", create an entertaining and concise story. " | |
| } | |
| story_prompt = prompts.get(story_type, prompts["free_form"]) | |
| response = model.generate_content( | |
| story_prompt + | |
| "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. Aim for a maximum of 3 sentences per section. For each section, include a brief image description inside <>." | |
| ) | |
| return response.text if response else None | |
| except Exception as e: | |
| print(f"Error generating story from Wikipedia: {e}") | |
| return None | |
| def fetch_bible_text(reference): | |
| m = re.match(r"(?P<book>[1-3]?\s*\w+(?:\s+\w+)*)\s+(?P<chapter>\d+)(?::(?P<verse_start>\d+)(?:-(?P<verse_end>\d+))?)?", reference) | |
| if not m: | |
| print("Bible reference format invalid. Use format like 'Genesis 1:1-5' or 'Psalms 23'.") | |
| return None | |
| book = m.group("book").strip().lower().replace(" ", "") | |
| chapter = m.group("chapter") | |
| verse_start = m.group("verse_start") | |
| verse_end = m.group("verse_end") | |
| if verse_start: | |
| if verse_end is None: | |
| verse_range = [verse_start] | |
| else: | |
| verse_range = [str(v) for v in range(int(verse_start), int(verse_end) + 1)] | |
| verses_text = [] | |
| for verse in verse_range: | |
| url = f"https://cdn.jsdelivr.net/gh/wldeh/bible-api/bibles/en-asv/books/{book}/chapters/{chapter}/verses/{verse}.json" | |
| try: | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| verses_text.append(data.get("text", "")) | |
| else: | |
| verses_text.append(f"[Error fetching verse {verse}]") | |
| except Exception as e: | |
| verses_text.append(f"[Exception fetching verse {verse}: {e}]") | |
| return " ".join(verses_text) | |
| else: | |
| url = f"https://cdn.jsdelivr.net/gh/wldeh/bible-api/bibles/en-asv/books/{book}/chapters/{chapter}.json" | |
| try: | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if isinstance(data, list): | |
| verses = [verse.get("text", "") for verse in data] | |
| return " ".join(verses) | |
| elif isinstance(data, dict) and "verses" in data: | |
| verses = [verse.get("text", "") for verse in data["verses"]] | |
| return " ".join(verses) | |
| else: | |
| return str(data) | |
| else: | |
| print("Error fetching chapter text.") | |
| return None | |
| except Exception as e: | |
| print(f"Exception fetching chapter: {e}") | |
| return None | |
| def generate_story_from_bible(reference, story_type): | |
| bible_text = fetch_bible_text(reference) | |
| if bible_text is None: | |
| return None | |
| prompts = { | |
| "free_form": "You are a professional storyteller. Using the following Bible text: " + bible_text + | |
| ", create an engaging and concise story. ", | |
| "children": "You are a professional storyteller for children. Using the following Bible text: " + bible_text + | |
| ", create a fun and concise story. ", | |
| "education": "You are a professional storyteller. Using the following Bible text: " + bible_text + | |
| ", create an educational and engaging story. ", | |
| "business": "You are a professional storyteller. Using the following Bible text: " + bible_text + | |
| ", create a professional business story. ", | |
| "entertainment": "You are a professional storyteller. Using the following Bible text: " + bible_text + | |
| ", create an entertaining and concise story. " | |
| } | |
| story_prompt = prompts.get(story_type, prompts["free_form"]) | |
| response = model.generate_content( | |
| story_prompt + | |
| "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. Aim for a maximum of 3 sentences per section. For each section, include a brief image description inside <>." | |
| ) | |
| return response.text if response else None | |
| def generate_story_from_youtube(youtube_url, story_type): | |
| try: | |
| # Extract video_id from the URL | |
| if "v=" in youtube_url: | |
| video_id = youtube_url.split("v=")[1].split("&")[0] | |
| elif "youtu.be/" in youtube_url: | |
| video_id = youtube_url.split("youtu.be/")[1].split("?")[0] | |
| else: | |
| raise ValueError("Invalid YouTube URL provided.") | |
| # Retrieve the transcript as a list of dictionaries | |
| transcript_res = supadata.youtube.transcript( | |
| video_id=video_id, | |
| text=True | |
| ) | |
| transcript_text = transcript_res.content | |
| # Define story prompts based on story_type, similar to the Wikipedia function | |
| prompts = { | |
| "free_form": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text + | |
| ", create an engaging and concise story. ", | |
| "children": "You are a professional storyteller for children. Using the following YouTube transcript: " + transcript_text + | |
| ", create a fun and concise story. ", | |
| "education": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text + | |
| ", create an educational and engaging story. ", | |
| "business": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text + | |
| ", create a professional business story. ", | |
| "entertainment": "You are a professional storyteller. Using the following YouTube transcript: " + transcript_text + | |
| ", create an entertaining and concise story. " | |
| } | |
| # Use the provided story_type, defaulting to free_form if not found | |
| story_prompt = prompts.get(story_type, prompts["free_form"]) | |
| # Append additional instructions for story structure | |
| full_prompt = story_prompt + ( | |
| "Write a short story for a narrator meaning no labels of pages or sections the story should just flow and narrated in 2 minutes or less. Divide your story into exactly 5 very short and concise sections separated by [break]. " | |
| "Aim for a maximum of 3 sentences per section. " | |
| "For each section, include an image description inside <>." | |
| ) | |
| # Generate content using your model (assumes model.generate_content is available) | |
| response = model.generate_content(full_prompt) | |
| return response.text if response else None | |
| except Exception as e: | |
| print(f"Error generating story from YouTube transcript: {e}") | |
| return None | |
| # ----------------------- | |
| # Extract Image Prompts and Story Sections | |
| # ----------------------- | |
| def extract_image_prompts_and_story(story_text): | |
| pages = [] | |
| image_prompts = [] | |
| parts = re.split(r"\[break\]", story_text) | |
| for part in parts: | |
| if not part.strip(): | |
| continue | |
| img_match = re.search(r"<(.*?)>", part) | |
| if img_match: | |
| image_prompts.append(img_match.group(1).strip()) | |
| pages.append(re.sub(r"<(.*?)>", "", part).strip()) | |
| else: | |
| snippet = part.strip()[:100] | |
| pages.append(snippet) | |
| image_prompts.append(f"A concise illustration of {snippet}") | |
| return pages, image_prompts | |