| import os |
| from datetime import datetime |
| import json |
| from huggingface_hub import HfApi |
| import gradio as gr |
| import csv |
| import pandas as pd |
| import io |
| from typing import TypedDict, List |
| from climateqa.constants import DOCUMENT_METADATA_DEFAULT_VALUES |
| from langchain_core.documents import Document |
|
|
| def serialize_docs(docs:list[Document])->list: |
| """Convert document objects to a simplified format compatible with Hugging Face datasets. |
| |
| This function processes document objects by extracting their page content and metadata, |
| normalizing the metadata structure to ensure consistency. It applies default values |
| from DOCUMENT_METADATA_DEFAULT_VALUES for any missing metadata fields. |
| |
| Args: |
| docs (list): List of document objects, each with page_content and metadata attributes |
| |
| Returns: |
| list: List of dictionaries with standardized "page_content" and "metadata" fields |
| """ |
| new_docs = [] |
| for doc in docs: |
| |
| new_doc = { |
| "page_content": doc.page_content, |
| "metadata": {} |
| } |
| |
| |
| for field, default_value in DOCUMENT_METADATA_DEFAULT_VALUES.items(): |
| new_value = doc.metadata.get(field, default_value) |
| try: |
| new_doc["metadata"][field] = type(default_value)(new_value) |
| except: |
| new_doc["metadata"][field] = default_value |
|
|
| new_docs.append(new_doc) |
| |
| if new_docs == []: |
| new_docs = [{"page_content": "No documents found", "metadata": DOCUMENT_METADATA_DEFAULT_VALUES}] |
| return new_docs |
|
|
| |
|
|
| def log_on_azure(file, logs, share_client): |
| """Log data to Azure Blob Storage. |
| |
| Args: |
| file (str): Name of the file to store logs |
| logs (dict): Log data to store |
| share_client: Azure share client instance |
| """ |
| logs = json.dumps(logs) |
| file_client = share_client.get_file_client(file) |
| file_client.upload_file(logs) |
|
|
|
|
| def log_interaction_to_azure(history, output_query, sources, docs, share_client, user_id): |
| """Log chat interaction to Azure and Hugging Face. |
| |
| Args: |
| history (list): Chat message history |
| output_query (str): Processed query |
| sources (list): Knowledge base sources used |
| docs (list): Retrieved documents |
| share_client: Azure share client instance |
| user_id (str): User identifier |
| """ |
| try: |
| |
| if os.getenv("GRADIO_ENV") != "local": |
| timestamp = str(datetime.now().timestamp()) |
| prompt = history[1]["content"] |
| logs = { |
| "user_id": str(user_id), |
| "prompt": prompt, |
| "query": prompt, |
| "question": output_query, |
| "sources": sources, |
| "docs": serialize_docs(docs), |
| "answer": history[-1].content, |
| "time": timestamp, |
| } |
| |
| log_on_azure(f"{timestamp}.json", logs, share_client) |
| except Exception as e: |
| print(f"Error logging on Azure Blob Storage: {e}") |
| error_msg = f"ClimateQ&A Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" |
| raise gr.Error(error_msg) |
| |
| def log_drias_interaction_to_azure(query, sql_query, data, share_client, user_id): |
| """Log Drias data interaction to Azure and Hugging Face. |
| |
| Args: |
| query (str): User query |
| sql_query (str): SQL query used |
| data: Retrieved data |
| share_client: Azure share client instance |
| user_id (str): User identifier |
| """ |
| try: |
| |
| if os.getenv("GRADIO_ENV") != "local": |
| timestamp = str(datetime.now().timestamp()) |
| logs = { |
| "user_id": str(user_id), |
| "query": query, |
| "sql_query": sql_query, |
| "time": timestamp, |
| } |
| log_on_azure(f"drias_{timestamp}.json", logs, share_client) |
| print(f"Logged Drias interaction to Azure Blob Storage: {logs}") |
| else: |
| print("share_client or user_id is None, or GRADIO_ENV is local") |
| except Exception as e: |
| print(f"Error logging Drias interaction on Azure Blob Storage: {e}") |
| error_msg = f"Drias Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" |
| raise gr.Error(error_msg) |
| |
| |
|
|
| def log_on_huggingface(log_filename, logs, log_type="chat"): |
| """Log data to Hugging Face dataset repository. |
| |
| Args: |
| log_filename (str): Name of the file to store logs |
| logs (dict): Log data to store |
| log_type (str): Type of log to store |
| """ |
| try: |
| if log_type =="chat": |
| |
| hf_token = os.getenv("HF_LOGS_TOKEN") |
| if not hf_token: |
| print("HF_LOGS_TOKEN not found in environment variables") |
| return |
|
|
| |
| repo_id = os.getenv("HF_DATASET_REPO", "Ekimetrics/climateqa_logs") |
| |
| elif log_type =="drias": |
| |
| hf_token = os.getenv("HF_LOGS_DRIAS_TOKEN") |
| if not hf_token: |
| print("HF_LOGS_DRIAS_TOKEN not found in environment variables") |
| return |
|
|
| |
| repo_id = os.getenv("HF_DATASET_REPO_DRIAS", "Ekimetrics/climateqa_logs_talk_to_data") |
|
|
| else: |
| raise ValueError(f"Invalid log type: {log_type}") |
| |
| |
| api = HfApi(token=hf_token) |
| |
| |
| logs["timestamp"] = datetime.now().strftime("%Y%m%d_%H%M%S_%f") |
| |
| |
| logs_json = json.dumps(logs) |
| |
| |
| api.upload_file( |
| path_or_fileobj=logs_json.encode('utf-8'), |
| path_in_repo=log_filename, |
| repo_id=repo_id, |
| repo_type="dataset" |
| ) |
| |
| except Exception as e: |
| print(f"Error logging to Hugging Face: {e}") |
|
|
| |
| def log_interaction_to_huggingface(history, output_query, sources, docs, share_client, user_id): |
| """Log chat interaction to Hugging Face. |
| |
| Args: |
| history (list): Chat message history |
| output_query (str): Processed query |
| sources (list): Knowledge base sources used |
| docs (list): Retrieved documents |
| share_client: Azure share client instance (unused in this function) |
| user_id (str): User identifier |
| """ |
| try: |
| |
| if os.getenv("GRADIO_ENV") != "local": |
| timestamp = str(datetime.now().timestamp()) |
| prompt = history[1]["content"] |
| logs = { |
| "user_id": str(user_id), |
| "prompt": prompt, |
| "query": prompt, |
| "question": output_query, |
| "sources": sources, |
| "docs": serialize_docs(docs), |
| "answer": history[-1].content, |
| "time": timestamp, |
| } |
| |
| log_on_huggingface(f"chat/{timestamp}.json", logs, log_type="chat") |
| print(f"Logged interaction to Hugging Face") |
| else: |
| print("Did not log to Hugging Face because GRADIO_ENV is local") |
| except Exception as e: |
| print(f"Error logging to Hugging Face: {e}") |
| error_msg = f"ClimateQ&A Error: {str(e)[:100]})" |
| raise gr.Error(error_msg) |
|
|
| def log_drias_interaction_to_huggingface(query, sql_query, user_id): |
| """Log Drias data interaction to Hugging Face. |
| |
| Args: |
| query (str): User query |
| sql_query (str): SQL query used |
| data: Retrieved data |
| user_id (str): User identifier |
| """ |
| try: |
| if os.getenv("GRADIO_ENV") != "local": |
| timestamp = str(datetime.now().timestamp()) |
| logs = { |
| "user_id": str(user_id), |
| "query": query, |
| "sql_query": sql_query, |
| "time": timestamp, |
| } |
| log_on_huggingface(f"drias/drias_{timestamp}.json", logs, log_type="drias") |
| print(f"Logged Drias interaction to Hugging Face: {logs}") |
| else: |
| print("share_client or user_id is None, or GRADIO_ENV is local") |
| except Exception as e: |
| print(f"Error logging Drias interaction to Hugging Face: {e}") |
| error_msg = f"Drias Error: {str(e)[:100]} - The error has been noted, try another question and if the error remains, you can contact us :)" |
| raise gr.Error(error_msg) |
|
|
| def log_interaction(history, output_query, sources, docs, share_client, user_id): |
| """Log chat interaction to Hugging Face, and fall back to Azure if that fails. |
| |
| Args: |
| history (list): Chat message history |
| output_query (str): Processed query |
| sources (list): Knowledge base sources used |
| docs (list): Retrieved documents |
| share_client: Azure share client instance |
| user_id (str): User identifier |
| """ |
| try: |
| |
| log_interaction_to_huggingface(history, output_query, sources, docs, share_client, user_id) |
| except Exception as e: |
| print(f"Failed to log to Hugging Face, falling back to Azure: {e}") |
| try: |
| |
| if os.getenv("GRADIO_ENV") != "local": |
| timestamp = str(datetime.now().timestamp()) |
| prompt = history[1]["content"] |
| logs = { |
| "user_id": str(user_id), |
| "prompt": prompt, |
| "query": prompt, |
| "question": output_query, |
| "sources": sources, |
| "docs": serialize_docs(docs), |
| "answer": history[-1].content, |
| "time": timestamp, |
| } |
| |
| log_on_azure(f"{timestamp}.json", logs, share_client) |
| print("Successfully logged to Azure as fallback") |
| except Exception as azure_error: |
| print(f"Error in Azure fallback logging: {azure_error}") |
| error_msg = f"ClimateQ&A Logging Error: {str(azure_error)[:100]})" |
| |
| print(error_msg) |
|
|
|
|
|
|