Derfel2025's picture
fix count_tokens argument mistake
cf0838b
#try using existing logic, but add ctx/memory that llamindex allows
#do autonomous llamagents
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI as LlamaOpenAI
from dotenv import load_dotenv
#from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
#from llama_index.llms.google_genai import GoogleGenAI
from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent, ReActAgent #can also import ReActAgent or FunctionAgent from this
from llama_index.core.tools import FunctionTool
from llama_index.core.workflow import Context
import os
from functools import lru_cache
import asyncio
import requests
from llama_index.core.agent.workflow import (
AgentInput,
AgentOutput,
ToolCall,
ToolCallResult,
AgentStream,
)
import openai
import tiktoken
import requests
import json
import gradio as gr
from openai import OpenAI
#from llama_index.llms.google_gemini import GoogleGenAI
#from google.genai import types
load_dotenv()
#llm = OpenAI(model="gpt-4o-mini")
import google.generativeai as genai
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
#llmGeminiPro = GoogleGenAI(model="gemini-2.5-pro")
#print("llmGeminiPro loaded!")
#llmGeminiFlash = GoogleGenAI(model="gemini-2.5-flash")
#print("llmGeminiFlash loaded!")
llm = LlamaOpenAI(
model="gpt-4o-mini", # or "gpt-3.5-turbo"
api_key=os.getenv('OPENAI_API_KEY'), # You can also set this via the OPENAI_API_KEY environment variable
streaming=True
)
llmHigher = LlamaOpenAI(
model="o3",
api_key=os.getenv('OPENAI_API_KEY'),
streaming=True
)
client = OpenAI(
api_key=os.getenv('OPENAI_API_KEY'),
)
openai.api_key = os.getenv("OPENAI_API_KEY")
#use gemini
#set api_key in .env for gemini
#llmGemini = GoogleGenAI(model="gemini-2.5-pro")
#can use search as AI
#google_search_tool = types.Tool(
#google_search=types.GoogleSearch()
#)#should be able to pass as tool?
@lru_cache(maxsize=1)
def get_chartmetric_access_token_cached() -> str | None:
print("🔑 Fetching new Chartmetric token")
return get_chartmetric_access_token_with_refresh()
#@function_tool
def get_chartmetric_access_token_with_refresh() -> str or None:
"""
Retrieves an access token from Chartmetric. You need to use this before you can use any other function involving chartmetric
"""
#current_state = await ctx.get('state')
refresh_token = 'izPNc1uMM7A13dvWGs0Gij3rfMTKV0K24ADFfcHviaOPWxc35ZsNuYqlQNb5BVyG'
endpoint = 'https://api.chartmetric.com/api/token'
headers = {
'Content-Type': 'application/json'
}
payload = {
'refreshtoken': refresh_token
}
try:
response = requests.post(endpoint, headers=headers, json=payload)
if not response.ok:
raise Exception(f"Token request failed: {response.status_code} {response.reason}")
data = response.json()
print("Access token retrieved:", data.get('token'),{})
#if "working_notes" not in current_state:
#current_state["working_notes"] = {}
access_token = data.get('token')# This is your bearer token for future API calls
#current_state["working_notes"]["access_token"] = access_token
#await ctx.set("state", current_state)
return access_token
except Exception as e:
print("Error retrieving Chartmetric access token:", str(e))
return None
#@function_tool
async def find_artist_id_for_artist(ctx: Context, artist_name: str) -> int:
"""
Retrieves artist_id for the artist you want to search on the chartmetric system .
"""
current_state = await ctx.store.get('state')
print(f"value of current_state on load inside of find_artist_id_for_artist is: {current_state}")
access_token = get_chartmetric_access_token_cached()
url = f'https://api.chartmetric.com/api/search?q={artist_name}&type=artists'
headers = {
"Authorization": f"Bearer {access_token}"
}
try:
response = requests.get(url, headers=headers)
if not response.ok:
raise Exception(f"artist_id request failed: {response.status_code} {response.reason}")
data = response.json()
print("Raw response data:", data)
# Safely access first matched artist
artists = data.get("obj", {}).get("artists", [])
if not artists:
print(f"No artists found matching '{artist_name}'.")
return None
artist_id = artists[0].get('id',{})
# Update state and persist it
if "working_notes" not in current_state:
current_state["working_notes"] = {}
current_state["working_notes"][f"artist_id_for_{artist_name}"] = artist_id
await ctx.store.set("state", current_state) # 🟢 Save the updated state
print(f"🧠 Updated working_notes in find_artist_id_for_artist: {json.dumps(current_state['working_notes'], indent=2)}")
return artist_id
except Exception as e:
print("Error retrieving Chartmetric artist_id:", str(e))
return None
#@function_tool
async def get_similar_artists(ctx: Context, artist_id: int) -> dict:
"""
Retrieve a list of similar artists from Chartmetric based on a given artist ID.
Parameters:
- artist_id (int): The Chartmetric artist ID.
Returns:
- dict: A dictionary of similar artists (up to 5).
Notes:
- Results are stored in working memory under "similar_artists".
"""
current_state = await ctx.store.get('state')
print(f"value of current_state on load inside of get_similar_artists is: {current_state}")
access_token = get_chartmetric_access_token_cached() # Assuming this is defined elsewhere
print("access_token for get_similar_artists api call obatined!")
url = f"https://api.chartmetric.com/api/artist/{artist_id}/relatedartists?limit=3"
headers = {
"Authorization": f"Bearer {access_token}"
}
try:
response = requests.get(url, headers=headers)
if not response.ok:
raise Exception(f"Related artists request failed: {response.status_code} {response.reason}")
data = response.json()
print("data returned from get_similar_artists is:", data)
similar_artists = data.get('obj', {})
if "working_notes" not in current_state:
current_state["working_notes"] = {}
current_state["working_notes"]["similar_artists"] = similar_artists
await ctx.store.set('state', current_state)
return similar_artists
except Exception as e:
print("Error retrieving similar artists:", str(e))
return None
async def get_youtube_audience_data(ctx: Context, artist_id: str) -> dict:
"""
Retrieve Youtube audience data for a given artist, using Chartmetric API.
Parameters:
- artist_id (int): The Chartmetric artist ID.
Returns:
- dict: A dictionary of similar artists (up to 5).
Notes:
- Results are saved in working memory.
"""
current_state = await ctx.store.get('state')
print(f"value of current_state on load inside of get_youtube_audience_data is: {current_state}")
access_token = get_chartmetric_access_token_cached()
print("🚀 Called get_Youtube with artist_id:", artist_id)
print("🚀 Called get_Youtube with access_token:", access_token)
url = f"https://api.chartmetric.com/api/artist/{artist_id}/youtube-audience-stats"
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(url, headers=headers)
if not response.ok:
if response.status_code == 404:
print(f"⚠️ No YouTube data found for artist {artist_id}")
return {}
data = response.json()
print(f"data from get_Youtube is: {data}")
dataObj = data.get('obj',{})
print("Info from get_tiktok_audience_data is:", dataObj)
compressed_notable_followers = []
for follower in dataObj["notable_subscribers"]:
#pprint(f"follower in dataObj is: {follower}")
new_data = {}
new_data["custom_name"] = follower.get("custom_name", {})
new_data["subscribers"] = follower["subscribers"]
new_data["engagements"] = follower["engagements"]
compressed_notable_followers.append(new_data)
dict_to_return = {"top_countries": dataObj["top_countries"], "audience_gender_by_age": dataObj["audience_genders_per_age"], "audience_genders": dataObj["audience_genders"], "top_followers": compressed_notable_followers,
"subscribers": dataObj["subscribers"], "avg_likes_per_post": dataObj["avg_likes_per_post"], "avg_commments_per_post": dataObj["avg_commments_per_post"],
"engagement_rate": dataObj["engagement_rate"]
}
if "working_notes" not in current_state:
current_state["working_notes"] = {}
youtube_audience_stats = dict_to_return
print(f"youtube_audience_stats are: {youtube_audience_stats}")
current_state["working_notes"][f"youtube_audience_data for artist {artist_id}"] = youtube_audience_stats
await ctx.store.set('state', current_state)
return { f"youtube_audience_data for artist {artist_id}": youtube_audience_stats}
async def get_tiktok_audience_data(ctx: Context, artist_id: str) -> dict:
"""
Retrieve TikTok audience data for a given artist using Chartmetric API.
Parameters:
- artist_id (str): The Chartmetric artist ID.
Returns:
- dict: TikTok audience breakdown.
Notes:
- Results are saved in working memory.
"""
current_state = await ctx.store.get('state')
print(f"value of current_state on load inside of get_tiktok_audience_data is: {current_state}")
access_token = get_chartmetric_access_token_cached()
print("🚀 Called get_tiktok_audience_data with artist_id:", artist_id)
print("🚀 Called get_tiktok_audience_data with access_token:", access_token)
url = f"https://api.chartmetric.com/api/artist/{artist_id}/tiktok-audience-stats"
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(url, headers=headers)
if not response.ok:
raise Exception(f"API request failed: {response.status_code} {response.reason}")
data = response.json()
#print(f"data from get_tiktok_audience_data is: {data}")
dataObj = data.get('obj',{})
#print("Info from get_tiktok_audience_data is:", dataObj)
compressed_notable_followers = []
for follower in dataObj.get("notable_followers", []):
#print(f"follower in dataObj is: {follower}")
new_data = {}
new_data["username"] = follower["username"]
new_data["followers"] = follower["followers"]
new_data["engagement"] = follower["engagements"]
compressed_notable_followers.append(new_data)
dict_to_return = {"top_countries": dataObj["top_countries"], "audience_gender_by_age": dataObj["audience_genders_per_age"], "audience_genders": dataObj["audience_genders"], "top_followers": compressed_notable_followers,
"followers": dataObj["followers"], "avg_likes_per_post": dataObj["avg_likes_per_post"], "avg_commments_per_post": dataObj["avg_commments_per_post"],
"engagement_rate": dataObj["engagement_rate"]
}
if "working_notes" not in current_state:
current_state["working_notes"] = {}
tiktok_audience_stats = dict_to_return
#print(f"tiktok_audience_data are: {tiktok_audience_stats}")
current_state["working_notes"][f"tiktok_audience_data for artist {artist_id}"] = tiktok_audience_stats
await ctx.store.set('state', current_state)
return { f"tiktok_audience_data for artist {artist_id}": tiktok_audience_stats}
#choose which parts to return
#@function_tool
async def get_instagram_audience_data(ctx: Context, artist_id: str) -> dict:
"""
Retrieve Instagram audience statistics for a given artist using Chartmetric.
Parameters:
- artist_id (str): The Chartmetric artist ID.
Returns:
- dict: Instagram audience breakdown.
Notes:
- Results are saved in working memory.
"""
#perhaps just have it get access_token inside here
#access_token = get_chartmetric_access_token_with_refresh()
current_state = await ctx.store.get('state')
print(f"value of current_state on load inside of get_instagram_audience_stats is: {current_state}")
access_token = get_chartmetric_access_token_cached()
print("🚀 Called get_instagram_audience_stats with artist_id:", artist_id)
print("🚀 Called get_instagram_audience_stats with access_token:", access_token)
url = f"https://api.chartmetric.com/api/artist/{artist_id}/instagram-audience-stats"
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(url, headers=headers)
if not response.ok:
raise Exception(f"API request failed: {response.status_code} {response.reason}")
data = response.json()
#print(f"data from api call is: {data}")
#print("Info from platform Instagram is:", data.get("obj"))
if "working_notes" not in current_state:
current_state["working_notes"] = {}
instagram_audience_stats = data.get('obj', {})
current_state["working_notes"][f"instagram_audience_data for artist {artist_id}"] = instagram_audience_stats
await ctx.store.set('state', current_state)
return { f"instagram_audience_data for artist {artist_id}": instagram_audience_stats}
async def get_charts(ctx: Context, artist_id: int, chart_type: str) -> dict:
"""
Retrieve chart data for a given artist using Chartmetric API.
Parameters:
- artist_id (str): The Chartmetric artist ID.
- chart_type: The platform chart and sub-choice. Choose one from:
[
"spotify_viral_daily", "spotify_viral_weekly", "spotify_top_daily", "spotify_top_weekly",
"applemusic_top", "applemusic_daily", "applemusic_albums",
"itunes_top", "itunes_albums",
"shazam", "beatport",
"youtube", "youtube_tracks", "youtube_videos", "youtube_trends",
"amazon"
]
Returns:
- dict: Chart entries containing album name, rank, and peak info.
Notes:
- Results are saved in working memory.
"""
valid_chart_types = [
"spotify_viral_daily", "spotify_viral_weekly", "spotify_top_daily", "spotify_top_weekly",
"applemusic_top", "applemusic_daily", "applemusic_albums",
"itunes_top", "itunes_albums", "shazam", "beatport",
"youtube", "youtube_tracks", "youtube_videos", "youtube_trends", "amazon"
]
if chart_type not in valid_chart_types:
raise ValueError(f"Invalid chart_type '{chart_type}'. Must be one of: {valid_chart_types}")
current_state = await ctx.store.get('state')
print(f"value of current_state on load inside of get_chart is: {current_state}")
#https://api.chartmetric.com/api/artist/:id/:type/charts
access_token = get_chartmetric_access_token_cached()
print("🚀 Called get_charts with artist_id:", artist_id)
print("🚀 Called get_charts with access_token:", access_token)
##shoukd make dates of the chart dynamic later
##need to give chart options in function description clearly
url = f"https://api.chartmetric.com/api/artist/{artist_id}/{chart_type}/charts?since=2025-03-01&until=2025-07-04"
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(url, headers=headers)
if not response.ok:
print(f"❌ Request failed with status {response.status_code}: {response.text}")
return {}
data = response.json()
#print(f"data from get_charts is: {data}")
print("🚀 data call to get_charts successfully made!")
dataObj = data.get('obj',{})
#print(f"dataObj is {dataObj}")
dataObjEntries = dataObj.get('data',{})
dataObjEntries2 = dataObjEntries.get('entries',{})
#print(f"dataObjEntries2 is {dataObjEntries2}")
relevant_details = []
for entry in dataObjEntries2:
print(f"entry is: {entry}")
stuffToSave = { "album": entry["name"], "pre-rank": entry["pre_rank"], "peak": entry["peak_rank"], "peak_day": entry["peak_date"], "rank": entry["rank"] }
print(f"stuff to save is: {stuffToSave}")
relevant_details.append(stuffToSave)
print(f"value of relevant_dtails is: {relevant_details}")
if "working_notes" not in current_state:
current_state["working_notes"] = {}
if f"charts_data for {artist_id}" not in current_state["working_notes"]:
current_state["working_notes"][f"charts_data for {artist_id}"] = {}
current_state["working_notes"][f"charts_data for {artist_id}"][chart_type] = relevant_details
await ctx.store.set('state', current_state)
return {
"artist_id": artist_id,
"chart_data": relevant_details
}
#and that code which allows logging of every step of the memory/thought process
#keep teh cahce of chartmetric api, attached to function that gets api_key, which is inserted into each relevant api
#find_artist_id_for_artist_tool = FunctionTool.from_function(find_artist_id_for_artist)
#get_instagram_audience_stats_tool = FunctionTool.from_function(get_instagram_audience_stats)
#get_similar_artists = FunctionTool.from_function(get_similar_artists)
# Wrap your function
#find_artist_id_for_artist_tool = FunctionTool(fn=find_artist_id_for_artist)
#get_instagram_audience_stats_tool = FunctionTool(fn=get_instagram_audience_stats)
#get_similar_artists_tool = FunctionTool(fn=get_similar_artists)
manager_agent = ReActAgent(
name="ManagerAgent",
description="Manager agent decides which other agents to use, and is decision maker",
system_prompt=(
"You are the manager agent. You do not collect data yourself. You delegate tasks to other agents.\n\n"
"Your responsibilities are:\n"
"- Receive the user’s question\n"
"- Decide whether StreamingChartAgent or SocialMediaDataAgent or SimilarityAgent (or two or all) should handle the request\n"
"+ If the question is about social media audience data (TikTok, Instagram, YouTube), use SocialMediaDataAgent."
"+ If the question is about chart positions, chart history, or streaming rankings, use StreamingChartAgent."
"- Wait for their responses and evaluate whether the question has been sufficiently answered\n"
),
llm=llm,
can_handoff_to=["SocialMediaDataAgent", "SimilarityAgent", "StreamingChartAgent"]
)
streaming_chart_agent = ReActAgent(
name="StreamingChartAgent",
description="agent to retrieve streaming chart data for the artist being researched",
system_prompt=("You are a research agent that retrieves streaming chart information about an artist"),
llm=llm,
tools=[get_charts, find_artist_id_for_artist],
can_handoff_to=["ManagerAgent", "SimilarityAgent", "SocialMediaDataAgent"]
)
social_media_data_agent = ReActAgent(#try with Function Agents first, change to ReAct agents if needed/performance is poor.
name="SocialMediaDataAgent",
description="agent to source data about artists from social media data, using chartmetric api",
system_prompt=(
"You are a research agent that uses social media data to analyze artist audiences via Chartmetric.\n"
"- Always use **both** Instagram and TikTok and Youtube data as your default behavior when analyzing artists.\n"
"- Do NOT choose one over the other unless explicitly told to focus on one.\n"
"- Always call 'get_instagram_audience_stats' AND 'get_tiktok_audience_data' AND 'get_youtube_audience_data' when gathering audience data.\n"
"- Do NOT assume artist names. Only use 'find_artist_id_for_artist' with real artist names provided by the user.\n"
"- If the user needs information about similar artists, HAND OFF to the SimilarityAgent — do NOT attempt it yourself.\n"
"- Your tools are only for Instagram and TikTok and Youtube data.\n"
)
,
llm=llmHigher,
tools=[get_instagram_audience_data, find_artist_id_for_artist, get_tiktok_audience_data, get_youtube_audience_data],
can_handoff_to=["ManagerAgent", "SimilarityAgent", "StreamingChartAgent"]#allow it to handoff to all other agents
)
streaming_chart_agent = ReActAgent(
name="StreamingChartAgent",
description="agent to retrieve streaming chart data for the artist being researched",
system_prompt=("You are a research agent that retrieves streaming chart information about an artist"),
llm=llm,
tools=[get_charts, find_artist_id_for_artist],
can_handoff_to=["ManagerAgent", "SimilarityAgent", "SocialMediaDataAgent"]
)
similarity_agent = ReActAgent(
name="SimilarityAgent",
description="agent to find similar artists to the artist being research, using chartmetric api",
system_prompt=("You are a research agent that looks for similar artists to the artist you are researching, in order to understand how the artist can copy the growth of similar artists who are larger."
"you can handoff to SocialMediaDataAgent, in order to find information about the followers of similar artists"
),
llm=llm,
tools=[get_similar_artists, find_artist_id_for_artist],
can_handoff_to=["ManagerAgent", "SocialMediaDataAgent", "StreamingChartAgent"]
)
async def main(chosen_artist, prompt):
#response = await workflow.run(user_msg="What is Bertie Blackman's Chartmetric artist ID?"
#, ctx=ctx) python llamaOaAgent.py
#chosen_artist = "Kenan Doğulu"
with open(r"overall_answersGemini.txt", "r", encoding="utf-8") as file:
contents = file.read()
overall_answers = contents
final_prompt = prompt + f"This report is about artist {chosen_artist}" + f"your sole data source is: {overall_answers}"
##that should ensure that whatever prompt is entered, the correct artist and data source is still passed down.
def count_tokens(text, model="gpt-4o"):
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
#count tokens anyway, for later usage:
total_tokens = count_tokens(final_prompt)
print(f"Total tokens of prompt: {total_tokens}")
model = genai.GenerativeModel("models/gemini-2.5-pro")
# Generate content
responseGemini = model.generate_content(f"You are a precise music industry data analyst. Be structured, factual, and preserve all stats given. use: {final_prompt}")
two_pager_gemini = responseGemini.text
print(responseGemini.text)
return two_pager_gemini
demo = gr.Interface(
fn=main,
inputs=["text", "text"], #one for artist_name, other for prompt
outputs="text",
title="artist report generator",
description="generate report for artist"
)
demo.launch(share=True)
#if __name__ == "__main__":
#response = asyncio.run(main())
#then pass to llm to assemble formal response to formal questions
# FunctionAgent works for LLMs with a function calling API.
# ReActAgent works for any LLM.
#can check logs:
#async for ev in handler.stream_events():
#if isinstance(ev, ToolCallResult):
#print("")
#print("Called tool: ", ev.tool_name, ev.tool_kwargs, "=>", ev.tool_output)
#elif isinstance(ev, AgentStream): # showing the thought process
#print(ev.delta, end="", flush=True)