cz-base / assistant_util.py
chih.yikuan
feat: add SEL_COACH configuration and update app logic for new assistant
b19d5b4
import os
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
# from google.colab import auth
from oauth2client.client import GoogleCredentials
from openai import OpenAI
import time
from datetime import datetime
from SEL_COACH import assistant_config as active_config
ASSISTANT_NAME = active_config.ASSISTANT_NAME
ASSISTANT_DESCRIPTION = active_config.ASSISTANT_DESCRIPTION
ASSISTANT_INSTRUCTION = active_config.ASSISTANT_INSTRUCTION
ASSISTANT_MODEL = active_config.ASSISTANT_MODEL
RESPONSE_FORMAT = active_config.RESPONSE_FORMAT
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
assistant_id = os.getenv('assistant_id')
def create_assistant():
assistant = client.beta.assistants.create(
name=ASSISTANT_NAME,
description=ASSISTANT_DESCRIPTION,
instructions=ASSISTANT_INSTRUCTION,
model=ASSISTANT_MODEL,
# tools=[{
# "type": "file_search",
# "file_search": {
# "max_num_results": 5 # Limit search results
# }
# }],
# tool_resources={'file_search': {'vector_store_ids': [vector_store_id]}},
response_format=RESPONSE_FORMAT
)
# show_json(assistant)
return assistant.id
# ==============
def test():
assistant = client.beta.assistants.retrieve(assistant_id)
thread = client.beta.threads.create()
chat_id = thread.id # Add this to store the thread ID
# thread = client.beta.threads.retrieve(chat_id)
# integrated_message = "點選此按鈕開始設計教案" # Add a test message
# integrated_message = "請按照你擁有的資料給我一個中文教案範例" # Add a test message
integrated_message = "我要尋求班級經營建議" # Add a test message for SEL
client.beta.threads.messages.create(
thread_id=chat_id,
role="user",
content=integrated_message,
)
# Get the complete response instead of streaming
run = client.beta.threads.runs.create(
thread_id=chat_id,
assistant_id=assistant.id
)
# Wait for the run to complete
while run.status != "completed":
run = client.beta.threads.runs.retrieve(thread_id=chat_id, run_id=run.id)
if run.status in ["failed", "cancelled", "expired", "completed"]:
print(f"\nRun ended with status: {run.status}")
# Print token usage for all cases
if hasattr(run, 'usage'):
print("\nToken Usage:")
print(f" Prompt tokens: {run.usage.prompt_tokens}")
print(f" Completion tokens: {run.usage.completion_tokens}")
print(f" Total tokens: {run.usage.total_tokens}")
if hasattr(run.usage, 'prompt_token_details'):
print(f" Cached tokens: {run.usage.prompt_token_details.get('cached_tokens', 0)}")
# Print error details only for failed runs
if run.status in ["failed", "cancelled", "expired"]:
if hasattr(run, 'last_error'):
print("\nError details:")
print(f" Code: {run.last_error.code}")
print(f" Message: {run.last_error.message}")
print("\nFull run object:")
print("=====")
print(run)
print("=====")
break
time.sleep(1) # Poll every second
# Add this code to view the run steps (which shows context usage)
run_steps = client.beta.threads.runs.steps.list(
thread_id=chat_id,
run_id=run.id
)
print("\nRun Steps:")
for step in run_steps.data:
print(f"Step ID: {step.id}")
print(f"Step Type: {step.type}")
print(f"Step Status: {step.status}")
# Print details about retrieval steps (context access)
if step.type == "retrieval":
print("Context retrieved:")
if hasattr(step, 'retrieval_details'):
for doc in step.retrieval_details.documents:
print(f"- Document: {doc}")
# Print details about tool calls
if step.type == "tool_calls":
for tool_call in step.step_details.tool_calls:
print(f"Tool: {tool_call.type}")
if tool_call.type == "retrieval":
print(f"Retrieved content: {tool_call.retrieval.content}")
if tool_call.type == "code_interpreter":
print(f"Input: {tool_call.code_interpreter.input}")
print(f"Output: {tool_call.code_interpreter.outputs}")
print("---")
# Retrieve the messages
messages = client.beta.threads.messages.list(thread_id=chat_id)
print(f"Total messages: {len(messages.data)}") # Add message count
print("\nMessages by role:")
role_counts = {}
for msg in messages.data:
role_counts[msg.role] = role_counts.get(msg.role, 0) + 1
print(role_counts)
# Print the assistant's response
for message in messages.data:
if message.role == "assistant":
print(f"Assistant: {message.content[0].text.value}")
# Alternative streaming approach:
# with client.beta.threads.runs.stream(
# thread_id=chat_id,
# assistant_id=assistant.id,
# timeout=60
# ) as stream:
# for chunk in stream:
# print(chunk)
def list_assistants():
"""List all assistants in the OpenAI account with detailed information"""
assistants = client.beta.assistants.list()
print(f"Found {len(assistants.data)} assistants:")
for assistant in assistants.data:
print(f"\nID: {assistant.id}")
print(f"Name: {assistant.name}")
print(f"Model: {assistant.model}")
# Convert the integer timestamp to a datetime object
created_time = datetime.fromtimestamp(assistant.created_at).strftime("%Y-%m-%d %H:%M:%S")
print(f"Created at: {created_time}")
# Print tools and tool_resources if available
if hasattr(assistant, 'tools') and assistant.tools:
print(f"Tools: {assistant.tools}")
if hasattr(assistant, 'tool_resources') and assistant.tool_resources:
print(f"Tool Resources: {assistant.tool_resources}")
# Try to access file search vector stores if available
if 'file_search' in assistant.tool_resources and 'vector_store_ids' in assistant.tool_resources['file_search']:
vector_store_ids = assistant.tool_resources['file_search']['vector_store_ids']
print(f"Vector Store IDs: {vector_store_ids}")
# Try to get content from vector stores
try:
for vs_id in vector_store_ids:
print(f"\nAttempting to fetch content from vector store: {vs_id}")
# You would need appropriate API calls here to retrieve content
# This functionality depends on OpenAI's API for vector stores
# Example (may not be available or work as shown):
# vector_store = client.beta.vector_stores.retrieve(vs_id)
# print(f"Vector Store Content: {vector_store}")
except Exception as e:
print(f"Error accessing vector store content: {e}")
# # Try to access files if available
# try:
# files = client.beta.assistants.files.list(assistant_id=assistant.id)
# if hasattr(files, 'data') and files.data:
# print(f"\nFiles attached to assistant:")
# for file in files.data:
# print(f" - File ID: {file.id}")
# # Note: Getting actual file content requires additional API calls
# # and may not be directly available for all file types
# except Exception as e:
# print(f"Error accessing files: {e}")
return assistants.data
def remove_vector_store_from_assistant(assistant_id, vector_store_id_to_remove):
"""
Remove a specific vector store from an assistant's tool resources
Args:
assistant_id (str): The ID of the assistant to update
vector_store_id_to_remove (str): The vector store ID to remove
Returns:
The updated assistant object
"""
# First, retrieve the current assistant configuration
assistant = client.beta.assistants.retrieve(assistant_id)
# Check if the assistant has tool_resources with file_search
if not hasattr(assistant, 'tool_resources') or not assistant.tool_resources:
print(f"Assistant {assistant_id} has no tool resources.")
return assistant
if 'file_search' not in assistant.tool_resources:
print(f"Assistant {assistant_id} has no file_search in tool resources.")
return assistant
if 'vector_store_ids' not in assistant.tool_resources['file_search']:
print(f"Assistant {assistant_id} has no vector_store_ids.")
return assistant
# Get the current vector store IDs
current_vector_store_ids = assistant.tool_resources['file_search']['vector_store_ids']
# Check if the vector store to remove exists
if vector_store_id_to_remove not in current_vector_store_ids:
print(f"Vector store ID {vector_store_id_to_remove} not found in assistant.")
return assistant
# Remove the specified vector store ID
updated_vector_store_ids = [vs_id for vs_id in current_vector_store_ids
if vs_id != vector_store_id_to_remove]
# Prepare updated tool resources
updated_tool_resources = assistant.tool_resources.copy()
# If there are no more vector stores, you might want to remove file_search entirely
if not updated_vector_store_ids:
# Option 1: Remove file_search entirely
updated_tool_resources.pop('file_search', None)
# Also update the tools to remove the file_search tool if it exists
updated_tools = [tool for tool in assistant.tools if
not (hasattr(tool, 'type') and tool.type == 'file_search')]
else:
# Option 2: Keep file_search but with updated vector store IDs
updated_tool_resources['file_search']['vector_store_ids'] = updated_vector_store_ids
updated_tools = assistant.tools # Keep the existing tools
# Update the assistant
try:
updated_assistant = client.beta.assistants.update(
assistant_id=assistant_id,
tools=updated_tools,
tool_resources=updated_tool_resources
)
print(f"Successfully removed vector store {vector_store_id_to_remove} from assistant {assistant_id}")
return updated_assistant
except Exception as e:
print(f"Error updating assistant: {e}")
return assistant
# google_drive_folder_id = os.getenv('google_drive_folder_id')
# spent 4m 50s downloading all 175 files
def embed_from_drive(folder_id):
# auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)
# Get all files in '定稿專案' folder: https://drive.google.com/drive/folders/1dlsf5BNjNczzUYKPZvYXd2mLW21QCLUK?usp=drive_link
file_list = drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList()
# Download files to local (`/content/`), since file_streams don't recieve google docs
local_file_paths = []
for file1 in file_list:
print('Processing file title: %s, id: %s' % (file1['title'], file1['id']))
local_path = f"/content/{file1['title']}.docx"
if 'exportLinks' in file1:
if 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in file1['exportLinks']:
# update type if needed (application/vnd.openxmlformats-officedocument.wordprocessingml.document == .docx)
export_url = file1['exportLinks']['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
print(f"Downloading as Word document: {file1['title']}")
downloaded_file = drive.CreateFile({'id': file1['id']})
downloaded_file.GetContentFile(local_path, mimetype='application/vnd.openxmlformats-officedocument.wordprocessingml.document')
local_file_paths.append(local_path)
else:
print(f"No Word export available for: {file1['title']}")
else:
print(f"Skipping non-Google Docs file: {file1['title']}")
for path in local_file_paths:
print(f"Downloaded file: {path}")
file_streams = [open(path, "rb") for path in local_file_paths]
return file_streams
# Embed files (downloaded from drive folder)
def get_vector_store_id(vector_store_name, file_streams):
vector_store = client.vector_stores.create(name=vector_store_name)
# spent 51s batching all 175 files
file_batch = client.vector_stores.file_batches.upload_and_poll(
vector_store_id=vector_store.id, files=file_streams
)
print("file_batch status",file_batch.status)
print("file_counts",file_batch.file_counts)
return vector_store.id
def get_file_streams_from_folder(folder_path):
"""
Opens all files in the specified folder and returns a list of file stream objects.
Args:
folder_path (str): Path to the folder containing files
Returns:
list: List of opened file stream objects
"""
file_streams = []
try:
# Get all files in the folder
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
# Skip directories, only process files
if os.path.isfile(file_path):
try:
# Open file in binary mode
file_stream = open(file_path, 'rb')
file_streams.append(file_stream)
except Exception as e:
print(f"Error opening file {file_path}: {e}")
return file_streams
except Exception as e:
print(f"Error accessing folder {folder_path}: {e}")
return []
def create_vector_store(folder_path):
file_streams = get_file_streams_from_folder(folder_path)
vector_store_id = get_vector_store_id(file_streams)
print(f"Vector store ID: {vector_store_id}")
# Get vector store details
print_vector_store_details(vector_store_id)
return vector_store_id
def print_vector_store_details(vector_store_id):
"""Print key information about a vector store."""
vector_store = client.vector_stores.retrieve(vector_store_id)
print("\nVector Store Details:")
print(f"ID: {vector_store.id}")
print(f"Name: {vector_store.name}")
print(f"Created at: {vector_store.created_at}")
print(f"Status: {vector_store.status}")
print(f"File count: {vector_store.file_counts}")
print(f"usage_bytes: {vector_store.usage_bytes}")
# Optionally print additional attributes if available
for attr_name in dir(vector_store):
if not attr_name.startswith('_') and attr_name not in ['id', 'name', 'created_at', 'status', 'file_counts', 'usage_bytes']:
try:
attr_value = getattr(vector_store, attr_name)
# Only print if it's not a method or complex object
if not callable(attr_value) and not isinstance(attr_value, (dict, list)) and attr_value is not None:
print(f"{attr_name}: {attr_value}")
except:
pass
# Update existing assistant through ID (please customize prefered inputs)
def update_assistant(assistant_id):
assistant = client.beta.assistants.update(
assistant_id=assistant_id,
# name="陪你師展魔法-Coach Chat",
# description=ASSISTANT_DESCRIPTION,
# instructions=ASSISTANT_INSTRUCTION,
# model=ASSISTANT_MODEL,
tools=[{
"type": "file_search",
"file_search": {
"max_num_results": 5 # Limit search results
}
}],
# tool_resources={'file_search': {'vector_store_ids': ['vs_67e11690d1548191a21eeb15c317dc61']}}, # SEL
# tool_resources={'file_search': {'vector_store_ids': ['vs_W1sSCS4uuIxhqN4WSdX4ObI0']}}, # NCSLM_LPD
tool_resources={'file_search': {'vector_store_ids': []}},
# response_format=None
)
def delete_assistant(assistant_id):
"""
Delete an assistant by its ID
Args:
assistant_id (str): The ID of the assistant to delete
Returns:
dict: The deletion response from the API
"""
try:
response = client.beta.assistants.delete(assistant_id)
print(f"Successfully deleted assistant {assistant_id}")
print(f"Deletion response: {response}")
return response
except Exception as e:
print(f"Error deleting assistant {assistant_id}: {e}")
return None