import os from pydrive.auth import GoogleAuth from pydrive.drive import GoogleDrive # from google.colab import auth from oauth2client.client import GoogleCredentials from openai import OpenAI import time from datetime import datetime from SEL_COACH import assistant_config as active_config ASSISTANT_NAME = active_config.ASSISTANT_NAME ASSISTANT_DESCRIPTION = active_config.ASSISTANT_DESCRIPTION ASSISTANT_INSTRUCTION = active_config.ASSISTANT_INSTRUCTION ASSISTANT_MODEL = active_config.ASSISTANT_MODEL RESPONSE_FORMAT = active_config.RESPONSE_FORMAT client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) assistant_id = os.getenv('assistant_id') def create_assistant(): assistant = client.beta.assistants.create( name=ASSISTANT_NAME, description=ASSISTANT_DESCRIPTION, instructions=ASSISTANT_INSTRUCTION, model=ASSISTANT_MODEL, # tools=[{ # "type": "file_search", # "file_search": { # "max_num_results": 5 # Limit search results # } # }], # tool_resources={'file_search': {'vector_store_ids': [vector_store_id]}}, response_format=RESPONSE_FORMAT ) # show_json(assistant) return assistant.id # ============== def test(): assistant = client.beta.assistants.retrieve(assistant_id) thread = client.beta.threads.create() chat_id = thread.id # Add this to store the thread ID # thread = client.beta.threads.retrieve(chat_id) # integrated_message = "點選此按鈕開始設計教案" # Add a test message # integrated_message = "請按照你擁有的資料給我一個中文教案範例" # Add a test message integrated_message = "我要尋求班級經營建議" # Add a test message for SEL client.beta.threads.messages.create( thread_id=chat_id, role="user", content=integrated_message, ) # Get the complete response instead of streaming run = client.beta.threads.runs.create( thread_id=chat_id, assistant_id=assistant.id ) # Wait for the run to complete while run.status != "completed": run = client.beta.threads.runs.retrieve(thread_id=chat_id, run_id=run.id) if run.status in ["failed", "cancelled", "expired", "completed"]: print(f"\nRun ended with status: {run.status}") # Print token usage for all cases if hasattr(run, 'usage'): print("\nToken Usage:") print(f" Prompt tokens: {run.usage.prompt_tokens}") print(f" Completion tokens: {run.usage.completion_tokens}") print(f" Total tokens: {run.usage.total_tokens}") if hasattr(run.usage, 'prompt_token_details'): print(f" Cached tokens: {run.usage.prompt_token_details.get('cached_tokens', 0)}") # Print error details only for failed runs if run.status in ["failed", "cancelled", "expired"]: if hasattr(run, 'last_error'): print("\nError details:") print(f" Code: {run.last_error.code}") print(f" Message: {run.last_error.message}") print("\nFull run object:") print("=====") print(run) print("=====") break time.sleep(1) # Poll every second # Add this code to view the run steps (which shows context usage) run_steps = client.beta.threads.runs.steps.list( thread_id=chat_id, run_id=run.id ) print("\nRun Steps:") for step in run_steps.data: print(f"Step ID: {step.id}") print(f"Step Type: {step.type}") print(f"Step Status: {step.status}") # Print details about retrieval steps (context access) if step.type == "retrieval": print("Context retrieved:") if hasattr(step, 'retrieval_details'): for doc in step.retrieval_details.documents: print(f"- Document: {doc}") # Print details about tool calls if step.type == "tool_calls": for tool_call in step.step_details.tool_calls: print(f"Tool: {tool_call.type}") if tool_call.type == "retrieval": print(f"Retrieved content: {tool_call.retrieval.content}") if tool_call.type == "code_interpreter": print(f"Input: {tool_call.code_interpreter.input}") print(f"Output: {tool_call.code_interpreter.outputs}") print("---") # Retrieve the messages messages = client.beta.threads.messages.list(thread_id=chat_id) print(f"Total messages: {len(messages.data)}") # Add message count print("\nMessages by role:") role_counts = {} for msg in messages.data: role_counts[msg.role] = role_counts.get(msg.role, 0) + 1 print(role_counts) # Print the assistant's response for message in messages.data: if message.role == "assistant": print(f"Assistant: {message.content[0].text.value}") # Alternative streaming approach: # with client.beta.threads.runs.stream( # thread_id=chat_id, # assistant_id=assistant.id, # timeout=60 # ) as stream: # for chunk in stream: # print(chunk) def list_assistants(): """List all assistants in the OpenAI account with detailed information""" assistants = client.beta.assistants.list() print(f"Found {len(assistants.data)} assistants:") for assistant in assistants.data: print(f"\nID: {assistant.id}") print(f"Name: {assistant.name}") print(f"Model: {assistant.model}") # Convert the integer timestamp to a datetime object created_time = datetime.fromtimestamp(assistant.created_at).strftime("%Y-%m-%d %H:%M:%S") print(f"Created at: {created_time}") # Print tools and tool_resources if available if hasattr(assistant, 'tools') and assistant.tools: print(f"Tools: {assistant.tools}") if hasattr(assistant, 'tool_resources') and assistant.tool_resources: print(f"Tool Resources: {assistant.tool_resources}") # Try to access file search vector stores if available if 'file_search' in assistant.tool_resources and 'vector_store_ids' in assistant.tool_resources['file_search']: vector_store_ids = assistant.tool_resources['file_search']['vector_store_ids'] print(f"Vector Store IDs: {vector_store_ids}") # Try to get content from vector stores try: for vs_id in vector_store_ids: print(f"\nAttempting to fetch content from vector store: {vs_id}") # You would need appropriate API calls here to retrieve content # This functionality depends on OpenAI's API for vector stores # Example (may not be available or work as shown): # vector_store = client.beta.vector_stores.retrieve(vs_id) # print(f"Vector Store Content: {vector_store}") except Exception as e: print(f"Error accessing vector store content: {e}") # # Try to access files if available # try: # files = client.beta.assistants.files.list(assistant_id=assistant.id) # if hasattr(files, 'data') and files.data: # print(f"\nFiles attached to assistant:") # for file in files.data: # print(f" - File ID: {file.id}") # # Note: Getting actual file content requires additional API calls # # and may not be directly available for all file types # except Exception as e: # print(f"Error accessing files: {e}") return assistants.data def remove_vector_store_from_assistant(assistant_id, vector_store_id_to_remove): """ Remove a specific vector store from an assistant's tool resources Args: assistant_id (str): The ID of the assistant to update vector_store_id_to_remove (str): The vector store ID to remove Returns: The updated assistant object """ # First, retrieve the current assistant configuration assistant = client.beta.assistants.retrieve(assistant_id) # Check if the assistant has tool_resources with file_search if not hasattr(assistant, 'tool_resources') or not assistant.tool_resources: print(f"Assistant {assistant_id} has no tool resources.") return assistant if 'file_search' not in assistant.tool_resources: print(f"Assistant {assistant_id} has no file_search in tool resources.") return assistant if 'vector_store_ids' not in assistant.tool_resources['file_search']: print(f"Assistant {assistant_id} has no vector_store_ids.") return assistant # Get the current vector store IDs current_vector_store_ids = assistant.tool_resources['file_search']['vector_store_ids'] # Check if the vector store to remove exists if vector_store_id_to_remove not in current_vector_store_ids: print(f"Vector store ID {vector_store_id_to_remove} not found in assistant.") return assistant # Remove the specified vector store ID updated_vector_store_ids = [vs_id for vs_id in current_vector_store_ids if vs_id != vector_store_id_to_remove] # Prepare updated tool resources updated_tool_resources = assistant.tool_resources.copy() # If there are no more vector stores, you might want to remove file_search entirely if not updated_vector_store_ids: # Option 1: Remove file_search entirely updated_tool_resources.pop('file_search', None) # Also update the tools to remove the file_search tool if it exists updated_tools = [tool for tool in assistant.tools if not (hasattr(tool, 'type') and tool.type == 'file_search')] else: # Option 2: Keep file_search but with updated vector store IDs updated_tool_resources['file_search']['vector_store_ids'] = updated_vector_store_ids updated_tools = assistant.tools # Keep the existing tools # Update the assistant try: updated_assistant = client.beta.assistants.update( assistant_id=assistant_id, tools=updated_tools, tool_resources=updated_tool_resources ) print(f"Successfully removed vector store {vector_store_id_to_remove} from assistant {assistant_id}") return updated_assistant except Exception as e: print(f"Error updating assistant: {e}") return assistant # google_drive_folder_id = os.getenv('google_drive_folder_id') # spent 4m 50s downloading all 175 files def embed_from_drive(folder_id): # auth.authenticate_user() gauth = GoogleAuth() gauth.credentials = GoogleCredentials.get_application_default() drive = GoogleDrive(gauth) # Get all files in '定稿專案' folder: https://drive.google.com/drive/folders/1dlsf5BNjNczzUYKPZvYXd2mLW21QCLUK?usp=drive_link file_list = drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList() # Download files to local (`/content/`), since file_streams don't recieve google docs local_file_paths = [] for file1 in file_list: print('Processing file title: %s, id: %s' % (file1['title'], file1['id'])) local_path = f"/content/{file1['title']}.docx" if 'exportLinks' in file1: if 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in file1['exportLinks']: # update type if needed (application/vnd.openxmlformats-officedocument.wordprocessingml.document == .docx) export_url = file1['exportLinks']['application/vnd.openxmlformats-officedocument.wordprocessingml.document'] print(f"Downloading as Word document: {file1['title']}") downloaded_file = drive.CreateFile({'id': file1['id']}) downloaded_file.GetContentFile(local_path, mimetype='application/vnd.openxmlformats-officedocument.wordprocessingml.document') local_file_paths.append(local_path) else: print(f"No Word export available for: {file1['title']}") else: print(f"Skipping non-Google Docs file: {file1['title']}") for path in local_file_paths: print(f"Downloaded file: {path}") file_streams = [open(path, "rb") for path in local_file_paths] return file_streams # Embed files (downloaded from drive folder) def get_vector_store_id(vector_store_name, file_streams): vector_store = client.vector_stores.create(name=vector_store_name) # spent 51s batching all 175 files file_batch = client.vector_stores.file_batches.upload_and_poll( vector_store_id=vector_store.id, files=file_streams ) print("file_batch status",file_batch.status) print("file_counts",file_batch.file_counts) return vector_store.id def get_file_streams_from_folder(folder_path): """ Opens all files in the specified folder and returns a list of file stream objects. Args: folder_path (str): Path to the folder containing files Returns: list: List of opened file stream objects """ file_streams = [] try: # Get all files in the folder for filename in os.listdir(folder_path): file_path = os.path.join(folder_path, filename) # Skip directories, only process files if os.path.isfile(file_path): try: # Open file in binary mode file_stream = open(file_path, 'rb') file_streams.append(file_stream) except Exception as e: print(f"Error opening file {file_path}: {e}") return file_streams except Exception as e: print(f"Error accessing folder {folder_path}: {e}") return [] def create_vector_store(folder_path): file_streams = get_file_streams_from_folder(folder_path) vector_store_id = get_vector_store_id(file_streams) print(f"Vector store ID: {vector_store_id}") # Get vector store details print_vector_store_details(vector_store_id) return vector_store_id def print_vector_store_details(vector_store_id): """Print key information about a vector store.""" vector_store = client.vector_stores.retrieve(vector_store_id) print("\nVector Store Details:") print(f"ID: {vector_store.id}") print(f"Name: {vector_store.name}") print(f"Created at: {vector_store.created_at}") print(f"Status: {vector_store.status}") print(f"File count: {vector_store.file_counts}") print(f"usage_bytes: {vector_store.usage_bytes}") # Optionally print additional attributes if available for attr_name in dir(vector_store): if not attr_name.startswith('_') and attr_name not in ['id', 'name', 'created_at', 'status', 'file_counts', 'usage_bytes']: try: attr_value = getattr(vector_store, attr_name) # Only print if it's not a method or complex object if not callable(attr_value) and not isinstance(attr_value, (dict, list)) and attr_value is not None: print(f"{attr_name}: {attr_value}") except: pass # Update existing assistant through ID (please customize prefered inputs) def update_assistant(assistant_id): assistant = client.beta.assistants.update( assistant_id=assistant_id, # name="陪你師展魔法-Coach Chat", # description=ASSISTANT_DESCRIPTION, # instructions=ASSISTANT_INSTRUCTION, # model=ASSISTANT_MODEL, tools=[{ "type": "file_search", "file_search": { "max_num_results": 5 # Limit search results } }], # tool_resources={'file_search': {'vector_store_ids': ['vs_67e11690d1548191a21eeb15c317dc61']}}, # SEL # tool_resources={'file_search': {'vector_store_ids': ['vs_W1sSCS4uuIxhqN4WSdX4ObI0']}}, # NCSLM_LPD tool_resources={'file_search': {'vector_store_ids': []}}, # response_format=None ) def delete_assistant(assistant_id): """ Delete an assistant by its ID Args: assistant_id (str): The ID of the assistant to delete Returns: dict: The deletion response from the API """ try: response = client.beta.assistants.delete(assistant_id) print(f"Successfully deleted assistant {assistant_id}") print(f"Deletion response: {response}") return response except Exception as e: print(f"Error deleting assistant {assistant_id}: {e}") return None