Spaces:
Sleeping
Sleeping
| import os | |
| from pydrive.auth import GoogleAuth | |
| from pydrive.drive import GoogleDrive | |
| # from google.colab import auth | |
| from oauth2client.client import GoogleCredentials | |
| from openai import OpenAI | |
| import time | |
| from datetime import datetime | |
| from SEL_COACH import assistant_config as active_config | |
| ASSISTANT_NAME = active_config.ASSISTANT_NAME | |
| ASSISTANT_DESCRIPTION = active_config.ASSISTANT_DESCRIPTION | |
| ASSISTANT_INSTRUCTION = active_config.ASSISTANT_INSTRUCTION | |
| ASSISTANT_MODEL = active_config.ASSISTANT_MODEL | |
| RESPONSE_FORMAT = active_config.RESPONSE_FORMAT | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| assistant_id = os.getenv('assistant_id') | |
| def create_assistant(): | |
| assistant = client.beta.assistants.create( | |
| name=ASSISTANT_NAME, | |
| description=ASSISTANT_DESCRIPTION, | |
| instructions=ASSISTANT_INSTRUCTION, | |
| model=ASSISTANT_MODEL, | |
| # tools=[{ | |
| # "type": "file_search", | |
| # "file_search": { | |
| # "max_num_results": 5 # Limit search results | |
| # } | |
| # }], | |
| # tool_resources={'file_search': {'vector_store_ids': [vector_store_id]}}, | |
| response_format=RESPONSE_FORMAT | |
| ) | |
| # show_json(assistant) | |
| return assistant.id | |
| # ============== | |
| def test(): | |
| assistant = client.beta.assistants.retrieve(assistant_id) | |
| thread = client.beta.threads.create() | |
| chat_id = thread.id # Add this to store the thread ID | |
| # thread = client.beta.threads.retrieve(chat_id) | |
| # integrated_message = "點選此按鈕開始設計教案" # Add a test message | |
| # integrated_message = "請按照你擁有的資料給我一個中文教案範例" # Add a test message | |
| integrated_message = "我要尋求班級經營建議" # Add a test message for SEL | |
| client.beta.threads.messages.create( | |
| thread_id=chat_id, | |
| role="user", | |
| content=integrated_message, | |
| ) | |
| # Get the complete response instead of streaming | |
| run = client.beta.threads.runs.create( | |
| thread_id=chat_id, | |
| assistant_id=assistant.id | |
| ) | |
| # Wait for the run to complete | |
| while run.status != "completed": | |
| run = client.beta.threads.runs.retrieve(thread_id=chat_id, run_id=run.id) | |
| if run.status in ["failed", "cancelled", "expired", "completed"]: | |
| print(f"\nRun ended with status: {run.status}") | |
| # Print token usage for all cases | |
| if hasattr(run, 'usage'): | |
| print("\nToken Usage:") | |
| print(f" Prompt tokens: {run.usage.prompt_tokens}") | |
| print(f" Completion tokens: {run.usage.completion_tokens}") | |
| print(f" Total tokens: {run.usage.total_tokens}") | |
| if hasattr(run.usage, 'prompt_token_details'): | |
| print(f" Cached tokens: {run.usage.prompt_token_details.get('cached_tokens', 0)}") | |
| # Print error details only for failed runs | |
| if run.status in ["failed", "cancelled", "expired"]: | |
| if hasattr(run, 'last_error'): | |
| print("\nError details:") | |
| print(f" Code: {run.last_error.code}") | |
| print(f" Message: {run.last_error.message}") | |
| print("\nFull run object:") | |
| print("=====") | |
| print(run) | |
| print("=====") | |
| break | |
| time.sleep(1) # Poll every second | |
| # Add this code to view the run steps (which shows context usage) | |
| run_steps = client.beta.threads.runs.steps.list( | |
| thread_id=chat_id, | |
| run_id=run.id | |
| ) | |
| print("\nRun Steps:") | |
| for step in run_steps.data: | |
| print(f"Step ID: {step.id}") | |
| print(f"Step Type: {step.type}") | |
| print(f"Step Status: {step.status}") | |
| # Print details about retrieval steps (context access) | |
| if step.type == "retrieval": | |
| print("Context retrieved:") | |
| if hasattr(step, 'retrieval_details'): | |
| for doc in step.retrieval_details.documents: | |
| print(f"- Document: {doc}") | |
| # Print details about tool calls | |
| if step.type == "tool_calls": | |
| for tool_call in step.step_details.tool_calls: | |
| print(f"Tool: {tool_call.type}") | |
| if tool_call.type == "retrieval": | |
| print(f"Retrieved content: {tool_call.retrieval.content}") | |
| if tool_call.type == "code_interpreter": | |
| print(f"Input: {tool_call.code_interpreter.input}") | |
| print(f"Output: {tool_call.code_interpreter.outputs}") | |
| print("---") | |
| # Retrieve the messages | |
| messages = client.beta.threads.messages.list(thread_id=chat_id) | |
| print(f"Total messages: {len(messages.data)}") # Add message count | |
| print("\nMessages by role:") | |
| role_counts = {} | |
| for msg in messages.data: | |
| role_counts[msg.role] = role_counts.get(msg.role, 0) + 1 | |
| print(role_counts) | |
| # Print the assistant's response | |
| for message in messages.data: | |
| if message.role == "assistant": | |
| print(f"Assistant: {message.content[0].text.value}") | |
| # Alternative streaming approach: | |
| # with client.beta.threads.runs.stream( | |
| # thread_id=chat_id, | |
| # assistant_id=assistant.id, | |
| # timeout=60 | |
| # ) as stream: | |
| # for chunk in stream: | |
| # print(chunk) | |
| def list_assistants(): | |
| """List all assistants in the OpenAI account with detailed information""" | |
| assistants = client.beta.assistants.list() | |
| print(f"Found {len(assistants.data)} assistants:") | |
| for assistant in assistants.data: | |
| print(f"\nID: {assistant.id}") | |
| print(f"Name: {assistant.name}") | |
| print(f"Model: {assistant.model}") | |
| # Convert the integer timestamp to a datetime object | |
| created_time = datetime.fromtimestamp(assistant.created_at).strftime("%Y-%m-%d %H:%M:%S") | |
| print(f"Created at: {created_time}") | |
| # Print tools and tool_resources if available | |
| if hasattr(assistant, 'tools') and assistant.tools: | |
| print(f"Tools: {assistant.tools}") | |
| if hasattr(assistant, 'tool_resources') and assistant.tool_resources: | |
| print(f"Tool Resources: {assistant.tool_resources}") | |
| # Try to access file search vector stores if available | |
| if 'file_search' in assistant.tool_resources and 'vector_store_ids' in assistant.tool_resources['file_search']: | |
| vector_store_ids = assistant.tool_resources['file_search']['vector_store_ids'] | |
| print(f"Vector Store IDs: {vector_store_ids}") | |
| # Try to get content from vector stores | |
| try: | |
| for vs_id in vector_store_ids: | |
| print(f"\nAttempting to fetch content from vector store: {vs_id}") | |
| # You would need appropriate API calls here to retrieve content | |
| # This functionality depends on OpenAI's API for vector stores | |
| # Example (may not be available or work as shown): | |
| # vector_store = client.beta.vector_stores.retrieve(vs_id) | |
| # print(f"Vector Store Content: {vector_store}") | |
| except Exception as e: | |
| print(f"Error accessing vector store content: {e}") | |
| # # Try to access files if available | |
| # try: | |
| # files = client.beta.assistants.files.list(assistant_id=assistant.id) | |
| # if hasattr(files, 'data') and files.data: | |
| # print(f"\nFiles attached to assistant:") | |
| # for file in files.data: | |
| # print(f" - File ID: {file.id}") | |
| # # Note: Getting actual file content requires additional API calls | |
| # # and may not be directly available for all file types | |
| # except Exception as e: | |
| # print(f"Error accessing files: {e}") | |
| return assistants.data | |
| def remove_vector_store_from_assistant(assistant_id, vector_store_id_to_remove): | |
| """ | |
| Remove a specific vector store from an assistant's tool resources | |
| Args: | |
| assistant_id (str): The ID of the assistant to update | |
| vector_store_id_to_remove (str): The vector store ID to remove | |
| Returns: | |
| The updated assistant object | |
| """ | |
| # First, retrieve the current assistant configuration | |
| assistant = client.beta.assistants.retrieve(assistant_id) | |
| # Check if the assistant has tool_resources with file_search | |
| if not hasattr(assistant, 'tool_resources') or not assistant.tool_resources: | |
| print(f"Assistant {assistant_id} has no tool resources.") | |
| return assistant | |
| if 'file_search' not in assistant.tool_resources: | |
| print(f"Assistant {assistant_id} has no file_search in tool resources.") | |
| return assistant | |
| if 'vector_store_ids' not in assistant.tool_resources['file_search']: | |
| print(f"Assistant {assistant_id} has no vector_store_ids.") | |
| return assistant | |
| # Get the current vector store IDs | |
| current_vector_store_ids = assistant.tool_resources['file_search']['vector_store_ids'] | |
| # Check if the vector store to remove exists | |
| if vector_store_id_to_remove not in current_vector_store_ids: | |
| print(f"Vector store ID {vector_store_id_to_remove} not found in assistant.") | |
| return assistant | |
| # Remove the specified vector store ID | |
| updated_vector_store_ids = [vs_id for vs_id in current_vector_store_ids | |
| if vs_id != vector_store_id_to_remove] | |
| # Prepare updated tool resources | |
| updated_tool_resources = assistant.tool_resources.copy() | |
| # If there are no more vector stores, you might want to remove file_search entirely | |
| if not updated_vector_store_ids: | |
| # Option 1: Remove file_search entirely | |
| updated_tool_resources.pop('file_search', None) | |
| # Also update the tools to remove the file_search tool if it exists | |
| updated_tools = [tool for tool in assistant.tools if | |
| not (hasattr(tool, 'type') and tool.type == 'file_search')] | |
| else: | |
| # Option 2: Keep file_search but with updated vector store IDs | |
| updated_tool_resources['file_search']['vector_store_ids'] = updated_vector_store_ids | |
| updated_tools = assistant.tools # Keep the existing tools | |
| # Update the assistant | |
| try: | |
| updated_assistant = client.beta.assistants.update( | |
| assistant_id=assistant_id, | |
| tools=updated_tools, | |
| tool_resources=updated_tool_resources | |
| ) | |
| print(f"Successfully removed vector store {vector_store_id_to_remove} from assistant {assistant_id}") | |
| return updated_assistant | |
| except Exception as e: | |
| print(f"Error updating assistant: {e}") | |
| return assistant | |
| # google_drive_folder_id = os.getenv('google_drive_folder_id') | |
| # spent 4m 50s downloading all 175 files | |
| def embed_from_drive(folder_id): | |
| # auth.authenticate_user() | |
| gauth = GoogleAuth() | |
| gauth.credentials = GoogleCredentials.get_application_default() | |
| drive = GoogleDrive(gauth) | |
| # Get all files in '定稿專案' folder: https://drive.google.com/drive/folders/1dlsf5BNjNczzUYKPZvYXd2mLW21QCLUK?usp=drive_link | |
| file_list = drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList() | |
| # Download files to local (`/content/`), since file_streams don't recieve google docs | |
| local_file_paths = [] | |
| for file1 in file_list: | |
| print('Processing file title: %s, id: %s' % (file1['title'], file1['id'])) | |
| local_path = f"/content/{file1['title']}.docx" | |
| if 'exportLinks' in file1: | |
| if 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' in file1['exportLinks']: | |
| # update type if needed (application/vnd.openxmlformats-officedocument.wordprocessingml.document == .docx) | |
| export_url = file1['exportLinks']['application/vnd.openxmlformats-officedocument.wordprocessingml.document'] | |
| print(f"Downloading as Word document: {file1['title']}") | |
| downloaded_file = drive.CreateFile({'id': file1['id']}) | |
| downloaded_file.GetContentFile(local_path, mimetype='application/vnd.openxmlformats-officedocument.wordprocessingml.document') | |
| local_file_paths.append(local_path) | |
| else: | |
| print(f"No Word export available for: {file1['title']}") | |
| else: | |
| print(f"Skipping non-Google Docs file: {file1['title']}") | |
| for path in local_file_paths: | |
| print(f"Downloaded file: {path}") | |
| file_streams = [open(path, "rb") for path in local_file_paths] | |
| return file_streams | |
| # Embed files (downloaded from drive folder) | |
| def get_vector_store_id(vector_store_name, file_streams): | |
| vector_store = client.vector_stores.create(name=vector_store_name) | |
| # spent 51s batching all 175 files | |
| file_batch = client.vector_stores.file_batches.upload_and_poll( | |
| vector_store_id=vector_store.id, files=file_streams | |
| ) | |
| print("file_batch status",file_batch.status) | |
| print("file_counts",file_batch.file_counts) | |
| return vector_store.id | |
| def get_file_streams_from_folder(folder_path): | |
| """ | |
| Opens all files in the specified folder and returns a list of file stream objects. | |
| Args: | |
| folder_path (str): Path to the folder containing files | |
| Returns: | |
| list: List of opened file stream objects | |
| """ | |
| file_streams = [] | |
| try: | |
| # Get all files in the folder | |
| for filename in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, filename) | |
| # Skip directories, only process files | |
| if os.path.isfile(file_path): | |
| try: | |
| # Open file in binary mode | |
| file_stream = open(file_path, 'rb') | |
| file_streams.append(file_stream) | |
| except Exception as e: | |
| print(f"Error opening file {file_path}: {e}") | |
| return file_streams | |
| except Exception as e: | |
| print(f"Error accessing folder {folder_path}: {e}") | |
| return [] | |
| def create_vector_store(folder_path): | |
| file_streams = get_file_streams_from_folder(folder_path) | |
| vector_store_id = get_vector_store_id(file_streams) | |
| print(f"Vector store ID: {vector_store_id}") | |
| # Get vector store details | |
| print_vector_store_details(vector_store_id) | |
| return vector_store_id | |
| def print_vector_store_details(vector_store_id): | |
| """Print key information about a vector store.""" | |
| vector_store = client.vector_stores.retrieve(vector_store_id) | |
| print("\nVector Store Details:") | |
| print(f"ID: {vector_store.id}") | |
| print(f"Name: {vector_store.name}") | |
| print(f"Created at: {vector_store.created_at}") | |
| print(f"Status: {vector_store.status}") | |
| print(f"File count: {vector_store.file_counts}") | |
| print(f"usage_bytes: {vector_store.usage_bytes}") | |
| # Optionally print additional attributes if available | |
| for attr_name in dir(vector_store): | |
| if not attr_name.startswith('_') and attr_name not in ['id', 'name', 'created_at', 'status', 'file_counts', 'usage_bytes']: | |
| try: | |
| attr_value = getattr(vector_store, attr_name) | |
| # Only print if it's not a method or complex object | |
| if not callable(attr_value) and not isinstance(attr_value, (dict, list)) and attr_value is not None: | |
| print(f"{attr_name}: {attr_value}") | |
| except: | |
| pass | |
| # Update existing assistant through ID (please customize prefered inputs) | |
| def update_assistant(assistant_id): | |
| assistant = client.beta.assistants.update( | |
| assistant_id=assistant_id, | |
| # name="陪你師展魔法-Coach Chat", | |
| # description=ASSISTANT_DESCRIPTION, | |
| # instructions=ASSISTANT_INSTRUCTION, | |
| # model=ASSISTANT_MODEL, | |
| tools=[{ | |
| "type": "file_search", | |
| "file_search": { | |
| "max_num_results": 5 # Limit search results | |
| } | |
| }], | |
| # tool_resources={'file_search': {'vector_store_ids': ['vs_67e11690d1548191a21eeb15c317dc61']}}, # SEL | |
| # tool_resources={'file_search': {'vector_store_ids': ['vs_W1sSCS4uuIxhqN4WSdX4ObI0']}}, # NCSLM_LPD | |
| tool_resources={'file_search': {'vector_store_ids': []}}, | |
| # response_format=None | |
| ) | |
| def delete_assistant(assistant_id): | |
| """ | |
| Delete an assistant by its ID | |
| Args: | |
| assistant_id (str): The ID of the assistant to delete | |
| Returns: | |
| dict: The deletion response from the API | |
| """ | |
| try: | |
| response = client.beta.assistants.delete(assistant_id) | |
| print(f"Successfully deleted assistant {assistant_id}") | |
| print(f"Deletion response: {response}") | |
| return response | |
| except Exception as e: | |
| print(f"Error deleting assistant {assistant_id}: {e}") | |
| return None |