Spaces:
Sleeping
Sleeping
| #try using existing logic, but add ctx/memory that llamindex allows | |
| #do autonomous llamagents | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.llms.openai import OpenAI as LlamaOpenAI | |
| from dotenv import load_dotenv | |
| #from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI | |
| #from llama_index.llms.google_genai import GoogleGenAI | |
| from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent, ReActAgent #can also import ReActAgent or FunctionAgent from this | |
| from llama_index.core.tools import FunctionTool | |
| from llama_index.core.workflow import Context | |
| import os | |
| from functools import lru_cache | |
| import asyncio | |
| import requests | |
| from llama_index.core.agent.workflow import ( | |
| AgentInput, | |
| AgentOutput, | |
| ToolCall, | |
| ToolCallResult, | |
| AgentStream, | |
| ) | |
| import openai | |
| import tiktoken | |
| import requests | |
| import json | |
| import gradio as gr | |
| from openai import OpenAI | |
| #from llama_index.llms.google_gemini import GoogleGenAI | |
| #from google.genai import types | |
| load_dotenv() | |
| #llm = OpenAI(model="gpt-4o-mini") | |
| import google.generativeai as genai | |
| genai.configure(api_key=os.getenv("GEMINI_API_KEY")) | |
| #llmGeminiPro = GoogleGenAI(model="gemini-2.5-pro") | |
| #print("llmGeminiPro loaded!") | |
| #llmGeminiFlash = GoogleGenAI(model="gemini-2.5-flash") | |
| #print("llmGeminiFlash loaded!") | |
| llm = LlamaOpenAI( | |
| model="gpt-4o-mini", # or "gpt-3.5-turbo" | |
| api_key=os.getenv('OPENAI_API_KEY'), # You can also set this via the OPENAI_API_KEY environment variable | |
| streaming=True | |
| ) | |
| llmHigher = LlamaOpenAI( | |
| model="o3", | |
| api_key=os.getenv('OPENAI_API_KEY'), | |
| streaming=True | |
| ) | |
| client = OpenAI( | |
| api_key=os.getenv('OPENAI_API_KEY'), | |
| ) | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| #use gemini | |
| #set api_key in .env for gemini | |
| #llmGemini = GoogleGenAI(model="gemini-2.5-pro") | |
| #can use search as AI | |
| #google_search_tool = types.Tool( | |
| #google_search=types.GoogleSearch() | |
| #)#should be able to pass as tool? | |
| def get_chartmetric_access_token_cached() -> str | None: | |
| print("🔑 Fetching new Chartmetric token") | |
| return get_chartmetric_access_token_with_refresh() | |
| #@function_tool | |
| def get_chartmetric_access_token_with_refresh() -> str or None: | |
| """ | |
| Retrieves an access token from Chartmetric. You need to use this before you can use any other function involving chartmetric | |
| """ | |
| #current_state = await ctx.get('state') | |
| refresh_token = 'izPNc1uMM7A13dvWGs0Gij3rfMTKV0K24ADFfcHviaOPWxc35ZsNuYqlQNb5BVyG' | |
| endpoint = 'https://api.chartmetric.com/api/token' | |
| headers = { | |
| 'Content-Type': 'application/json' | |
| } | |
| payload = { | |
| 'refreshtoken': refresh_token | |
| } | |
| try: | |
| response = requests.post(endpoint, headers=headers, json=payload) | |
| if not response.ok: | |
| raise Exception(f"Token request failed: {response.status_code} {response.reason}") | |
| data = response.json() | |
| print("Access token retrieved:", data.get('token'),{}) | |
| #if "working_notes" not in current_state: | |
| #current_state["working_notes"] = {} | |
| access_token = data.get('token')# This is your bearer token for future API calls | |
| #current_state["working_notes"]["access_token"] = access_token | |
| #await ctx.set("state", current_state) | |
| return access_token | |
| except Exception as e: | |
| print("Error retrieving Chartmetric access token:", str(e)) | |
| return None | |
| #@function_tool | |
| async def find_artist_id_for_artist(ctx: Context, artist_name: str) -> int: | |
| """ | |
| Retrieves artist_id for the artist you want to search on the chartmetric system . | |
| """ | |
| current_state = await ctx.store.get('state') | |
| print(f"value of current_state on load inside of find_artist_id_for_artist is: {current_state}") | |
| access_token = get_chartmetric_access_token_cached() | |
| url = f'https://api.chartmetric.com/api/search?q={artist_name}&type=artists' | |
| headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers) | |
| if not response.ok: | |
| raise Exception(f"artist_id request failed: {response.status_code} {response.reason}") | |
| data = response.json() | |
| print("Raw response data:", data) | |
| # Safely access first matched artist | |
| artists = data.get("obj", {}).get("artists", []) | |
| if not artists: | |
| print(f"No artists found matching '{artist_name}'.") | |
| return None | |
| artist_id = artists[0].get('id',{}) | |
| # Update state and persist it | |
| if "working_notes" not in current_state: | |
| current_state["working_notes"] = {} | |
| current_state["working_notes"][f"artist_id_for_{artist_name}"] = artist_id | |
| await ctx.store.set("state", current_state) # 🟢 Save the updated state | |
| print(f"🧠 Updated working_notes in find_artist_id_for_artist: {json.dumps(current_state['working_notes'], indent=2)}") | |
| return artist_id | |
| except Exception as e: | |
| print("Error retrieving Chartmetric artist_id:", str(e)) | |
| return None | |
| #@function_tool | |
| async def get_similar_artists(ctx: Context, artist_id: int) -> dict: | |
| """ | |
| Retrieve a list of similar artists from Chartmetric based on a given artist ID. | |
| Parameters: | |
| - artist_id (int): The Chartmetric artist ID. | |
| Returns: | |
| - dict: A dictionary of similar artists (up to 5). | |
| Notes: | |
| - Results are stored in working memory under "similar_artists". | |
| """ | |
| current_state = await ctx.store.get('state') | |
| print(f"value of current_state on load inside of get_similar_artists is: {current_state}") | |
| access_token = get_chartmetric_access_token_cached() # Assuming this is defined elsewhere | |
| print("access_token for get_similar_artists api call obatined!") | |
| url = f"https://api.chartmetric.com/api/artist/{artist_id}/relatedartists?limit=3" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| try: | |
| response = requests.get(url, headers=headers) | |
| if not response.ok: | |
| raise Exception(f"Related artists request failed: {response.status_code} {response.reason}") | |
| data = response.json() | |
| print("data returned from get_similar_artists is:", data) | |
| similar_artists = data.get('obj', {}) | |
| if "working_notes" not in current_state: | |
| current_state["working_notes"] = {} | |
| current_state["working_notes"]["similar_artists"] = similar_artists | |
| await ctx.store.set('state', current_state) | |
| return similar_artists | |
| except Exception as e: | |
| print("Error retrieving similar artists:", str(e)) | |
| return None | |
| async def get_youtube_audience_data(ctx: Context, artist_id: str) -> dict: | |
| """ | |
| Retrieve Youtube audience data for a given artist, using Chartmetric API. | |
| Parameters: | |
| - artist_id (int): The Chartmetric artist ID. | |
| Returns: | |
| - dict: A dictionary of similar artists (up to 5). | |
| Notes: | |
| - Results are saved in working memory. | |
| """ | |
| current_state = await ctx.store.get('state') | |
| print(f"value of current_state on load inside of get_youtube_audience_data is: {current_state}") | |
| access_token = get_chartmetric_access_token_cached() | |
| print("🚀 Called get_Youtube with artist_id:", artist_id) | |
| print("🚀 Called get_Youtube with access_token:", access_token) | |
| url = f"https://api.chartmetric.com/api/artist/{artist_id}/youtube-audience-stats" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if not response.ok: | |
| if response.status_code == 404: | |
| print(f"⚠️ No YouTube data found for artist {artist_id}") | |
| return {} | |
| data = response.json() | |
| print(f"data from get_Youtube is: {data}") | |
| dataObj = data.get('obj',{}) | |
| print("Info from get_tiktok_audience_data is:", dataObj) | |
| compressed_notable_followers = [] | |
| for follower in dataObj["notable_subscribers"]: | |
| #pprint(f"follower in dataObj is: {follower}") | |
| new_data = {} | |
| new_data["custom_name"] = follower.get("custom_name", {}) | |
| new_data["subscribers"] = follower["subscribers"] | |
| new_data["engagements"] = follower["engagements"] | |
| compressed_notable_followers.append(new_data) | |
| dict_to_return = {"top_countries": dataObj["top_countries"], "audience_gender_by_age": dataObj["audience_genders_per_age"], "audience_genders": dataObj["audience_genders"], "top_followers": compressed_notable_followers, | |
| "subscribers": dataObj["subscribers"], "avg_likes_per_post": dataObj["avg_likes_per_post"], "avg_commments_per_post": dataObj["avg_commments_per_post"], | |
| "engagement_rate": dataObj["engagement_rate"] | |
| } | |
| if "working_notes" not in current_state: | |
| current_state["working_notes"] = {} | |
| youtube_audience_stats = dict_to_return | |
| print(f"youtube_audience_stats are: {youtube_audience_stats}") | |
| current_state["working_notes"][f"youtube_audience_data for artist {artist_id}"] = youtube_audience_stats | |
| await ctx.store.set('state', current_state) | |
| return { f"youtube_audience_data for artist {artist_id}": youtube_audience_stats} | |
| async def get_tiktok_audience_data(ctx: Context, artist_id: str) -> dict: | |
| """ | |
| Retrieve TikTok audience data for a given artist using Chartmetric API. | |
| Parameters: | |
| - artist_id (str): The Chartmetric artist ID. | |
| Returns: | |
| - dict: TikTok audience breakdown. | |
| Notes: | |
| - Results are saved in working memory. | |
| """ | |
| current_state = await ctx.store.get('state') | |
| print(f"value of current_state on load inside of get_tiktok_audience_data is: {current_state}") | |
| access_token = get_chartmetric_access_token_cached() | |
| print("🚀 Called get_tiktok_audience_data with artist_id:", artist_id) | |
| print("🚀 Called get_tiktok_audience_data with access_token:", access_token) | |
| url = f"https://api.chartmetric.com/api/artist/{artist_id}/tiktok-audience-stats" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if not response.ok: | |
| raise Exception(f"API request failed: {response.status_code} {response.reason}") | |
| data = response.json() | |
| #print(f"data from get_tiktok_audience_data is: {data}") | |
| dataObj = data.get('obj',{}) | |
| #print("Info from get_tiktok_audience_data is:", dataObj) | |
| compressed_notable_followers = [] | |
| for follower in dataObj.get("notable_followers", []): | |
| #print(f"follower in dataObj is: {follower}") | |
| new_data = {} | |
| new_data["username"] = follower["username"] | |
| new_data["followers"] = follower["followers"] | |
| new_data["engagement"] = follower["engagements"] | |
| compressed_notable_followers.append(new_data) | |
| dict_to_return = {"top_countries": dataObj["top_countries"], "audience_gender_by_age": dataObj["audience_genders_per_age"], "audience_genders": dataObj["audience_genders"], "top_followers": compressed_notable_followers, | |
| "followers": dataObj["followers"], "avg_likes_per_post": dataObj["avg_likes_per_post"], "avg_commments_per_post": dataObj["avg_commments_per_post"], | |
| "engagement_rate": dataObj["engagement_rate"] | |
| } | |
| if "working_notes" not in current_state: | |
| current_state["working_notes"] = {} | |
| tiktok_audience_stats = dict_to_return | |
| #print(f"tiktok_audience_data are: {tiktok_audience_stats}") | |
| current_state["working_notes"][f"tiktok_audience_data for artist {artist_id}"] = tiktok_audience_stats | |
| await ctx.store.set('state', current_state) | |
| return { f"tiktok_audience_data for artist {artist_id}": tiktok_audience_stats} | |
| #choose which parts to return | |
| #@function_tool | |
| async def get_instagram_audience_data(ctx: Context, artist_id: str) -> dict: | |
| """ | |
| Retrieve Instagram audience statistics for a given artist using Chartmetric. | |
| Parameters: | |
| - artist_id (str): The Chartmetric artist ID. | |
| Returns: | |
| - dict: Instagram audience breakdown. | |
| Notes: | |
| - Results are saved in working memory. | |
| """ | |
| #perhaps just have it get access_token inside here | |
| #access_token = get_chartmetric_access_token_with_refresh() | |
| current_state = await ctx.store.get('state') | |
| print(f"value of current_state on load inside of get_instagram_audience_stats is: {current_state}") | |
| access_token = get_chartmetric_access_token_cached() | |
| print("🚀 Called get_instagram_audience_stats with artist_id:", artist_id) | |
| print("🚀 Called get_instagram_audience_stats with access_token:", access_token) | |
| url = f"https://api.chartmetric.com/api/artist/{artist_id}/instagram-audience-stats" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if not response.ok: | |
| raise Exception(f"API request failed: {response.status_code} {response.reason}") | |
| data = response.json() | |
| #print(f"data from api call is: {data}") | |
| #print("Info from platform Instagram is:", data.get("obj")) | |
| if "working_notes" not in current_state: | |
| current_state["working_notes"] = {} | |
| instagram_audience_stats = data.get('obj', {}) | |
| current_state["working_notes"][f"instagram_audience_data for artist {artist_id}"] = instagram_audience_stats | |
| await ctx.store.set('state', current_state) | |
| return { f"instagram_audience_data for artist {artist_id}": instagram_audience_stats} | |
| async def get_charts(ctx: Context, artist_id: int, chart_type: str) -> dict: | |
| """ | |
| Retrieve chart data for a given artist using Chartmetric API. | |
| Parameters: | |
| - artist_id (str): The Chartmetric artist ID. | |
| - chart_type: The platform chart and sub-choice. Choose one from: | |
| [ | |
| "spotify_viral_daily", "spotify_viral_weekly", "spotify_top_daily", "spotify_top_weekly", | |
| "applemusic_top", "applemusic_daily", "applemusic_albums", | |
| "itunes_top", "itunes_albums", | |
| "shazam", "beatport", | |
| "youtube", "youtube_tracks", "youtube_videos", "youtube_trends", | |
| "amazon" | |
| ] | |
| Returns: | |
| - dict: Chart entries containing album name, rank, and peak info. | |
| Notes: | |
| - Results are saved in working memory. | |
| """ | |
| valid_chart_types = [ | |
| "spotify_viral_daily", "spotify_viral_weekly", "spotify_top_daily", "spotify_top_weekly", | |
| "applemusic_top", "applemusic_daily", "applemusic_albums", | |
| "itunes_top", "itunes_albums", "shazam", "beatport", | |
| "youtube", "youtube_tracks", "youtube_videos", "youtube_trends", "amazon" | |
| ] | |
| if chart_type not in valid_chart_types: | |
| raise ValueError(f"Invalid chart_type '{chart_type}'. Must be one of: {valid_chart_types}") | |
| current_state = await ctx.store.get('state') | |
| print(f"value of current_state on load inside of get_chart is: {current_state}") | |
| #https://api.chartmetric.com/api/artist/:id/:type/charts | |
| access_token = get_chartmetric_access_token_cached() | |
| print("🚀 Called get_charts with artist_id:", artist_id) | |
| print("🚀 Called get_charts with access_token:", access_token) | |
| ##shoukd make dates of the chart dynamic later | |
| ##need to give chart options in function description clearly | |
| url = f"https://api.chartmetric.com/api/artist/{artist_id}/{chart_type}/charts?since=2025-03-01&until=2025-07-04" | |
| headers = { | |
| "Authorization": f"Bearer {access_token}" | |
| } | |
| response = requests.get(url, headers=headers) | |
| if not response.ok: | |
| print(f"❌ Request failed with status {response.status_code}: {response.text}") | |
| return {} | |
| data = response.json() | |
| #print(f"data from get_charts is: {data}") | |
| print("🚀 data call to get_charts successfully made!") | |
| dataObj = data.get('obj',{}) | |
| #print(f"dataObj is {dataObj}") | |
| dataObjEntries = dataObj.get('data',{}) | |
| dataObjEntries2 = dataObjEntries.get('entries',{}) | |
| #print(f"dataObjEntries2 is {dataObjEntries2}") | |
| relevant_details = [] | |
| for entry in dataObjEntries2: | |
| print(f"entry is: {entry}") | |
| stuffToSave = { "album": entry["name"], "pre-rank": entry["pre_rank"], "peak": entry["peak_rank"], "peak_day": entry["peak_date"], "rank": entry["rank"] } | |
| print(f"stuff to save is: {stuffToSave}") | |
| relevant_details.append(stuffToSave) | |
| print(f"value of relevant_dtails is: {relevant_details}") | |
| if "working_notes" not in current_state: | |
| current_state["working_notes"] = {} | |
| if f"charts_data for {artist_id}" not in current_state["working_notes"]: | |
| current_state["working_notes"][f"charts_data for {artist_id}"] = {} | |
| current_state["working_notes"][f"charts_data for {artist_id}"][chart_type] = relevant_details | |
| await ctx.store.set('state', current_state) | |
| return { | |
| "artist_id": artist_id, | |
| "chart_data": relevant_details | |
| } | |
| #and that code which allows logging of every step of the memory/thought process | |
| #keep teh cahce of chartmetric api, attached to function that gets api_key, which is inserted into each relevant api | |
| #find_artist_id_for_artist_tool = FunctionTool.from_function(find_artist_id_for_artist) | |
| #get_instagram_audience_stats_tool = FunctionTool.from_function(get_instagram_audience_stats) | |
| #get_similar_artists = FunctionTool.from_function(get_similar_artists) | |
| # Wrap your function | |
| #find_artist_id_for_artist_tool = FunctionTool(fn=find_artist_id_for_artist) | |
| #get_instagram_audience_stats_tool = FunctionTool(fn=get_instagram_audience_stats) | |
| #get_similar_artists_tool = FunctionTool(fn=get_similar_artists) | |
| manager_agent = ReActAgent( | |
| name="ManagerAgent", | |
| description="Manager agent decides which other agents to use, and is decision maker", | |
| system_prompt=( | |
| "You are the manager agent. You do not collect data yourself. You delegate tasks to other agents.\n\n" | |
| "Your responsibilities are:\n" | |
| "- Receive the user’s question\n" | |
| "- Decide whether StreamingChartAgent or SocialMediaDataAgent or SimilarityAgent (or two or all) should handle the request\n" | |
| "+ If the question is about social media audience data (TikTok, Instagram, YouTube), use SocialMediaDataAgent." | |
| "+ If the question is about chart positions, chart history, or streaming rankings, use StreamingChartAgent." | |
| "- Wait for their responses and evaluate whether the question has been sufficiently answered\n" | |
| ), | |
| llm=llm, | |
| can_handoff_to=["SocialMediaDataAgent", "SimilarityAgent", "StreamingChartAgent"] | |
| ) | |
| streaming_chart_agent = ReActAgent( | |
| name="StreamingChartAgent", | |
| description="agent to retrieve streaming chart data for the artist being researched", | |
| system_prompt=("You are a research agent that retrieves streaming chart information about an artist"), | |
| llm=llm, | |
| tools=[get_charts, find_artist_id_for_artist], | |
| can_handoff_to=["ManagerAgent", "SimilarityAgent", "SocialMediaDataAgent"] | |
| ) | |
| social_media_data_agent = ReActAgent(#try with Function Agents first, change to ReAct agents if needed/performance is poor. | |
| name="SocialMediaDataAgent", | |
| description="agent to source data about artists from social media data, using chartmetric api", | |
| system_prompt=( | |
| "You are a research agent that uses social media data to analyze artist audiences via Chartmetric.\n" | |
| "- Always use **both** Instagram and TikTok and Youtube data as your default behavior when analyzing artists.\n" | |
| "- Do NOT choose one over the other unless explicitly told to focus on one.\n" | |
| "- Always call 'get_instagram_audience_stats' AND 'get_tiktok_audience_data' AND 'get_youtube_audience_data' when gathering audience data.\n" | |
| "- Do NOT assume artist names. Only use 'find_artist_id_for_artist' with real artist names provided by the user.\n" | |
| "- If the user needs information about similar artists, HAND OFF to the SimilarityAgent — do NOT attempt it yourself.\n" | |
| "- Your tools are only for Instagram and TikTok and Youtube data.\n" | |
| ) | |
| , | |
| llm=llmHigher, | |
| tools=[get_instagram_audience_data, find_artist_id_for_artist, get_tiktok_audience_data, get_youtube_audience_data], | |
| can_handoff_to=["ManagerAgent", "SimilarityAgent", "StreamingChartAgent"]#allow it to handoff to all other agents | |
| ) | |
| streaming_chart_agent = ReActAgent( | |
| name="StreamingChartAgent", | |
| description="agent to retrieve streaming chart data for the artist being researched", | |
| system_prompt=("You are a research agent that retrieves streaming chart information about an artist"), | |
| llm=llm, | |
| tools=[get_charts, find_artist_id_for_artist], | |
| can_handoff_to=["ManagerAgent", "SimilarityAgent", "SocialMediaDataAgent"] | |
| ) | |
| similarity_agent = ReActAgent( | |
| name="SimilarityAgent", | |
| description="agent to find similar artists to the artist being research, using chartmetric api", | |
| system_prompt=("You are a research agent that looks for similar artists to the artist you are researching, in order to understand how the artist can copy the growth of similar artists who are larger." | |
| "you can handoff to SocialMediaDataAgent, in order to find information about the followers of similar artists" | |
| ), | |
| llm=llm, | |
| tools=[get_similar_artists, find_artist_id_for_artist], | |
| can_handoff_to=["ManagerAgent", "SocialMediaDataAgent", "StreamingChartAgent"] | |
| ) | |
| async def main(chosen_artist, prompt): | |
| #response = await workflow.run(user_msg="What is Bertie Blackman's Chartmetric artist ID?" | |
| #, ctx=ctx) python llamaOaAgent.py | |
| #chosen_artist = "Kenan Doğulu" | |
| with open(r"overall_answersGemini.txt", "r", encoding="utf-8") as file: | |
| contents = file.read() | |
| overall_answers = contents | |
| final_prompt = prompt + f"This report is about artist {chosen_artist}" + f"your sole data source is: {overall_answers}" | |
| ##that should ensure that whatever prompt is entered, the correct artist and data source is still passed down. | |
| def count_tokens(text, model="gpt-4o"): | |
| encoding = tiktoken.encoding_for_model(model) | |
| return len(encoding.encode(text)) | |
| #count tokens anyway, for later usage: | |
| total_tokens = count_tokens(final_prompt) | |
| print(f"Total tokens of prompt: {total_tokens}") | |
| model = genai.GenerativeModel("models/gemini-2.5-pro") | |
| # Generate content | |
| responseGemini = model.generate_content(f"You are a precise music industry data analyst. Be structured, factual, and preserve all stats given. use: {final_prompt}") | |
| two_pager_gemini = responseGemini.text | |
| print(responseGemini.text) | |
| return two_pager_gemini | |
| demo = gr.Interface( | |
| fn=main, | |
| inputs=["text", "text"], #one for artist_name, other for prompt | |
| outputs="text", | |
| title="artist report generator", | |
| description="generate report for artist" | |
| ) | |
| demo.launch(share=True) | |
| #if __name__ == "__main__": | |
| #response = asyncio.run(main()) | |
| #then pass to llm to assemble formal response to formal questions | |
| # FunctionAgent works for LLMs with a function calling API. | |
| # ReActAgent works for any LLM. | |
| #can check logs: | |
| #async for ev in handler.stream_events(): | |
| #if isinstance(ev, ToolCallResult): | |
| #print("") | |
| #print("Called tool: ", ev.tool_name, ev.tool_kwargs, "=>", ev.tool_output) | |
| #elif isinstance(ev, AgentStream): # showing the thought process | |
| #print(ev.delta, end="", flush=True) | |