| import os |
| import datetime |
| import requests |
| import re |
| import pandas as pd |
| import gradio as gr |
| import threading |
| import uuid |
| import queue |
| import time |
| from transformers import AutoTokenizer |
| from mistralai import Mistral |
| from huggingface_hub import InferenceClient |
| import smtplib |
| import ssl |
| from email.mime.text import MIMEText |
| from email.mime.multipart import MIMEMultipart |
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from googleapiclient.discovery import build |
| import base64 |
| from google.oauth2.credentials import Credentials |
| from google.auth.transport.requests import Request |
| import openai |
| from openai.error import RateLimitError |
|
|
| |
| |
| |
| sheet_data = None |
| file_name = None |
| sheet = None |
| slider_max_tokens = None |
|
|
| def debug_print(message: str): |
| print(f"[{datetime.datetime.now().isoformat()}] {message}", flush=True) |
|
|
| def initialize_tokenizer(): |
| try: |
| return AutoTokenizer.from_pretrained("gpt2") |
| except Exception as e: |
| debug_print("Failed to initialize tokenizer: " + str(e)) |
| return None |
|
|
| global_tokenizer = initialize_tokenizer() |
|
|
| def count_tokens(text: str) -> int: |
| if global_tokenizer: |
| try: |
| return len(global_tokenizer.encode(text)) |
| except Exception: |
| return len(text.split()) |
| return len(text.split()) |
|
|
| def get_model_pricing(model_name: str): |
| """Return pricing information for models.""" |
| model_pricing = { |
| "GPT-3.5": {"USD": {"input": 0.0000005, "output": 0.0000015}, "RON": {"input": 0.0000023, "output": 0.0000069}}, |
| "GPT-4o": {"USD": {"input": 0.0000025, "output": 0.00001}, "RON": {"input": 0.0000115, "output": 0.000046}}, |
| "GPT-4o mini": {"USD": {"input": 0.00000015, "output": 0.0000006}, "RON": {"input": 0.0000007, "output": 0.0000028}}, |
| "o1-mini": {"USD": {"input": 0.0000011, "output": 0.0000044}, "RON": {"input": 0.0000051, "output": 0.0000204}}, |
| "o3-mini": {"USD": {"input": 0.0000011, "output": 0.0000044}, "RON": {"input": 0.0000051, "output": 0.0000204}}, |
| "Meta-Llama-3": {"USD": {"input": 0.00, "output": 0.00}, "RON": {"input": 0.00, "output": 0.00}}, |
| "Mistral-API": {"USD": {"input": 0.00, "output": 0.00}, "RON": {"input": 0.00, "output": 0.00}} |
| } |
| return model_pricing.get(model_name, {"USD": {"input": 0.00, "output": 0.00}, "RON": {"input": 0.00, "output": 0.00}}) |
|
|
| def get_model_max_tokens(model_name: str) -> int: |
| """Return the max context length for the selected model.""" |
| model_token_limits = { |
| "GPT-3.5": 16385, |
| "GPT-4o": 128000, |
| "GPT-4o mini": 128000, |
| "Meta-Llama-3": 4096, |
| "Mistral-API": 128000, |
| "o1-mini": 128000, |
| "o3-mini": 128000 |
| } |
| for key in model_token_limits: |
| if key in model_name: |
| return model_token_limits[key] |
| return 4096 |
|
|
|
|
| def generate_response(prompt: str, model_name: str, sheet_data: str = "") -> str: |
| global slider_max_tokens |
| |
| full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" if sheet_data else prompt |
| max_context_tokens = get_model_max_tokens(model_name) |
| max_tokens = min(slider_max_tokens, max_context_tokens) |
| |
| base_model_name = model_name.split()[1] if len(model_name.split()) > 1 else model_name |
| |
| |
| try: |
| if "Mistral" in model_name: |
| mistral_api_key = os.getenv("MISTRAL_API_KEY") |
| if not mistral_api_key: |
| raise ValueError("MISTRAL_API_KEY environment variable not set.") |
| mistral_client = Mistral(api_key=mistral_api_key) |
| response = mistral_client.chat.complete( |
| model="mistral-small-latest", |
| messages=[{"role": "user", "content": full_prompt}], |
| temperature=0.7, |
| top_p=0.95 |
| ) |
| return f"[Model: {model_name}]" + response.choices[0].message.content |
| |
| elif "Meta-Llama" in model_name: |
| hf_api_token = os.getenv("HF_API_TOKEN") |
| if not hf_api_token: |
| raise ValueError("HF_API_TOKEN environment variable not set.") |
| client = InferenceClient(token=hf_api_token) |
| response = client.text_generation( |
| full_prompt, |
| model="meta-llama/Meta-Llama-3-8B-Instruct", |
| temperature=0.7, |
| top_p=0.95, |
| max_new_tokens=max_tokens |
| ) |
| return f"[Model: {model_name}]" + response |
| |
| elif any(model in model_name for model in ["GPT-3.5", "GPT-4o", "o1-mini", "o3-mini"]): |
| model_map = { |
| "GPT-3.5": "gpt-3.5-turbo", |
| "GPT-4o": "gpt-4o", |
| "GPT-4o mini": "gpt-4o-mini", |
| "o1-mini": "gpt-4o-mini", |
| "o3-mini": "gpt-4o-mini" |
| } |
| model = next((model_map[key] for key in model_map if key in model_name), None) |
| |
| if not model: |
| raise ValueError(f"Unsupported OpenAI model: {model_name}") |
| |
| openai.api_key = os.getenv("OPEN_API_KEY") |
| |
| response = openai.ChatCompletion.create( |
| model=model, |
| messages=[{"role": "user", "content": full_prompt}], |
| temperature=0.7, |
| max_tokens=max_tokens |
| ) |
| |
| |
| input_tokens = count_tokens(full_prompt) |
| |
| |
| |
| output_tokens = count_tokens(response["choices"][0]["message"]["content"]) |
| |
| |
| pricing = get_model_pricing(base_model_name) |
| |
| |
| per_token_pricing = ( |
| f" (${input_tokens * pricing['USD']['input']:.3f}/in, " |
| f"${output_tokens * pricing['USD']['output']:.3f}/out | " |
| f"{input_tokens * pricing['RON']['input']:.3f} RON/in, " |
| f"{output_tokens * pricing['RON']['output']:.3f} RON/out)" |
| ) |
| |
| return f"[Model: {model_name}{per_token_pricing}]" + response["choices"][0]["message"]["content"] |
| |
| except Exception as e: |
| debug_print(f"β Error generating response: {str(e)}") |
| return f"[Model: {model_name}][Error] {str(e)}" |
|
|
| |
| |
| def process_query(prompt: str, model_name: str): |
| global sheet_data |
| |
| |
| if sheet_data is None: |
| sheet_data = get_sheet_data() |
| |
| full_prompt = f"{prompt}\n\nSheet Data:\n{sheet_data}" |
| debug_print(f"Processing query with model {model_name}: {full_prompt}") |
|
|
| |
| response = generate_response(prompt, model_name, sheet_data) |
| |
| |
| input_tokens = count_tokens(prompt + "\n\n" + sheet_data) |
| output_tokens = count_tokens(response) |
| |
| |
| return response, f"Input tokens: {input_tokens}", f"Output tokens: {output_tokens}" |
|
|
| |
| |
| |
| jobs = {} |
| results_queue = queue.Queue() |
| last_job_id = None |
|
|
| |
| |
| |
|
|
| def get_job_list(): |
| job_list_md = "### π Submitted Jobs\n\n" |
| |
| if not jobs: |
| return "No jobs found. Submit a query or load files to create jobs." |
| |
| |
| sorted_jobs = sorted( |
| [(job_id, job_info) for job_id, job_info in jobs.items()], |
| key=lambda x: x[1].get("start_time", 0), |
| reverse=True |
| ) |
| |
| for job_id, job_info in sorted_jobs: |
| status = job_info.get("status", "unknown") |
| job_type = job_info.get("type", "unknown") |
| query = job_info.get("query", "") |
| start_time = job_info.get("start_time", 0) |
| time_str = datetime.datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H:%M:%S") |
| |
| |
| query_preview = query[:30] + "..." if query and len(query) > 30 else query or "N/A" |
| |
| |
| if status == "processing": |
| status_formatted = f"<span style='color: red'>β³ {status}</span>" |
| elif status == "completed": |
| status_formatted = f"<span style='color: green'>β
{status}</span>" |
| else: |
| status_formatted = f"<span style='color: orange'>β {status}</span>" |
| |
| if job_type == "query": |
| job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - Query: {query_preview}\n" |
| else: |
| job_list_md += f"- [{job_id}](javascript:void) - {time_str} - {status_formatted} - File Load Job\n" |
| |
| return job_list_md |
|
|
| def get_sheet_data(): |
| global sheet_data |
| global file_name |
| global sheet |
| file = file_name |
| sheet_name = sheet |
| print ("file name: ",file," sheet name: ",sheet_name," ") |
| |
| if sheet_data is None: |
| try: |
| df = pd.read_excel(file.name, sheet_name=sheet_name) |
| sheet_data = df.to_string(index=False) |
| return sheet_data |
| except Exception as e: |
| return f"Error reading sheet: {str(e)}" |
| else: |
| return sheet_data |
|
|
| |
|
|
| def process_in_background(job_id, func, args): |
| """Runs a function in the background and stores its result in a shared queue.""" |
| result = func(*args) |
| results_queue.put((job_id, result)) |
| debug_print(f"Job {job_id} finished processing in background.") |
| |
|
|
| def submit_query_async(query, model_choice, max_tokens_slider): |
| """Asynchronous version of submit_query_updated to prevent timeouts.""" |
| global last_job_id |
| global sheet_data |
| global slider_max_tokens |
| slider_max_tokens = max_tokens_slider |
| |
| if not query: |
| return ("Please enter a non-empty query", "", "Input tokens: 0", "Output tokens: 0", "", "", get_job_list()) |
| |
| job_id = str(uuid.uuid4()) |
| debug_print(f"Starting async job {job_id} for query: {query}") |
| |
| |
| if sheet_data is None: |
| sheet_data = get_sheet_data() |
| |
| |
| query = f"{query}\n\nSheet Data:\n{sheet_data}" |
| |
| |
| threading.Thread( |
| target=process_in_background, |
| args=(job_id, process_query, [query, model_choice or "Mistral-API"]) |
| ).start() |
| |
| jobs[job_id] = { |
| "status": "processing", |
| "type": "query", |
| "start_time": time.time(), |
| "query": query, |
| "model": model_choice or "Mistral-API" |
| } |
| |
| last_job_id = job_id |
| |
| return ( |
| f"π Query submitted and processing in the background (Job ID: {job_id}).\n\n" |
| f"Use the 'Check Job Status' section to view results.", |
| f"Job ID: {job_id}", |
| f"Input tokens: {count_tokens(query)}", |
| "Output tokens: pending", |
| job_id, |
| query, |
| get_job_list() |
| ) |
|
|
| def job_selected(job_id): |
| if job_id in jobs: |
| return job_id, jobs[job_id].get("query", "No query for this job") |
| return job_id, "Job not found" |
|
|
| def refresh_job_list(): |
| return get_job_list() |
|
|
| def sync_model_dropdown(value): |
| return value |
|
|
| def check_job_status(job_id): |
| if not job_id: |
| html_response = "<div style='font-family: monospace;'><p>Please enter a job ID.</p></div>" |
| return html_response, "", "", "", "" |
| |
| |
| try: |
| while not results_queue.empty(): |
| completed_id, result = results_queue.get_nowait() |
| if completed_id in jobs: |
| jobs[completed_id]["status"] = "completed" |
| jobs[completed_id]["result"] = result |
| jobs[completed_id]["end_time"] = time.time() |
| debug_print(f"Job {completed_id} completed and stored in jobs dictionary") |
| except queue.Empty: |
| pass |
| |
| if job_id not in jobs: |
| html_response = "<div style='font-family: monospace;'><p>Job not found. Please check the ID and try again.</p></div>" |
| return html_response, "", "", "", "" |
| |
| job = jobs[job_id] |
| job_query = job.get("query", "No query available for this job") |
| |
| if job["status"] == "processing": |
| elapsed_time = time.time() - job["start_time"] |
| html_response = ( |
| f"<div style='font-family: monospace;'>" |
| f"<p><strong>β³ Query is still being processed</strong> (elapsed: {elapsed_time:.1f}s). Please check again shortly.</p>" |
| f"</div>" |
| ) |
| return ( |
| html_response, |
| f"Job ID: {job_id}", |
| f"Input tokens: {count_tokens(job.get('query', ''))}", |
| "Output tokens: pending", |
| job_query |
| ) |
| |
| if job["status"] == "completed": |
| result = job["result"] |
| processing_time = job["end_time"] - job["start_time"] |
| html_response = ( |
| f"<div style='font-family: monospace;'>" |
| f"<p><strong>β
Response:</strong> {result[0]}</p>" |
| f"<p>Processing time: {processing_time:.1f}s</p>" |
| f"</div>" |
| ) |
| return ( |
| html_response, |
| f"Job ID: {job_id}", |
| result[1], |
| result[2], |
| job_query |
| ) |
| |
| html_response = f"<div style='font-family: monospace;'><p>Job status: {job['status']}</p></div>" |
| return html_response, "", "", "", job_query |
|
|
| def cleanup_old_jobs(): |
| current_time = time.time() |
| to_delete = [] |
| |
| for job_id, job in jobs.items(): |
| |
| if job["status"] == "completed" and (current_time - job.get("end_time", 0)) > 86400: |
| to_delete.append(job_id) |
| elif job["status"] == "processing" and (current_time - job.get("start_time", 0)) > 172800: |
| to_delete.append(job_id) |
| |
| for job_id in to_delete: |
| del jobs[job_id] |
| |
| debug_print(f"Cleaned up {len(to_delete)} old jobs. {len(jobs)} jobs remaining.") |
| return f"Cleaned up {len(to_delete)} old jobs", "", "" |
|
|
| |
| def run_query(max_value): |
| |
| return [[i, i**2] for i in range(1, max_value + 1)] |
|
|
| |
| def periodic_update(is_checked): |
| interval = 3 if is_checked else None |
| debug_print(f"Auto-refresh checkbox is {'checked' if is_checked else 'unchecked'}, every={interval}") |
| if is_checked: |
| global last_job_id |
| job_list_md = refresh_job_list() |
| job_status = check_job_status(last_job_id) if last_job_id else ("No job ID available", "", "", "", "") |
| |
| |
| from bs4 import BeautifulSoup |
| html_content = job_status[0] |
| plain_text = "" |
| if html_content: |
| soup = BeautifulSoup(html_content, "html.parser") |
| plain_text = soup.get_text() |
| |
| |
| return job_list_md, job_status[0], plain_text, job_status[1], job_status[2], job_status[3], job_status[4] |
| else: |
| |
| return "", "", "", "", "", "", "" |
| |
| |
| SCOPES = ["https://www.googleapis.com/auth/gmail.send"] |
|
|
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from google.oauth2.credentials import Credentials |
| from google.auth.transport.requests import Request |
| import os |
| import json |
|
|
|
|
| def get_gmail_credentials(): |
| global oauth_flow |
| |
| creds = None |
| |
| |
| client_id = os.environ.get("HF_GOOGLE_CLIENT_ID") |
| client_secret = os.environ.get("HF_GOOGLE_CLIENT_SECRET") |
| |
| if not client_id or not client_secret: |
| raise ValueError("Missing Gmail OAuth credentials in environment variables.") |
| |
| |
| redirect_uri = "https://huggingface.co/spaces/alx-d/scout/oauth2callback" |
| |
| |
| if os.path.exists(token_path): |
| creds = Credentials.from_authorized_user_file(token_path) |
| |
| |
| if not creds or not creds.valid: |
| if creds and creds.expired and creds.refresh_token: |
| creds.refresh(Request()) |
| else: |
| client_config = { |
| "web": { |
| "client_id": client_id, |
| "project_id": "your_project_id", |
| "auth_uri": "https://accounts.google.com/o/oauth2/auth", |
| "token_uri": "https://oauth2.googleapis.com/token", |
| "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", |
| "client_secret": client_secret, |
| "redirect_uris": [redirect_uri] |
| } |
| } |
| |
| oauth_flow = Flow.from_client_config(client_config, SCOPES, redirect_uri=redirect_uri) |
| auth_url, _ = oauth_flow.authorization_url( |
| prompt='consent', |
| access_type='offline', |
| include_granted_scopes='true' |
| ) |
| |
| return None, auth_url |
| |
| return creds, None |
|
|
| |
| def send_email(email_address, content, is_formatted=True): |
| if not email_address or "@" not in email_address: |
| return "Please enter a valid email address" |
|
|
| try: |
| creds = get_gmail_credentials() |
| service = build("gmail", "v1", credentials=creds) |
|
|
| |
| msg = MIMEMultipart() |
| msg["to"] = email_address |
| msg["subject"] = "Scouting AI Report" |
| msg.attach(MIMEText(content, "html" if is_formatted else "plain")) |
|
|
| |
| encoded_msg = base64.urlsafe_b64encode(msg.as_bytes()).decode() |
| send_message = {"raw": encoded_msg} |
|
|
| |
| service.users().messages().send(userId="me", body=send_message).execute() |
| return "Email sent successfully via Gmail API!" |
| |
| except Exception as e: |
| return f"Failed to send email: {str(e)}" |
|
|
| |
| def copy_to_clipboard(content): |
| import pyperclip |
| pyperclip.copy(content) |
| return "Copied to clipboard!" |
|
|
|
|
| |
| def copy_plain_text(html_content): |
| try: |
| from bs4 import BeautifulSoup |
| except ImportError: |
| return "Error: BeautifulSoup is required to convert HTML to plain text. Please install it." |
| soup = BeautifulSoup(html_content, "html.parser") |
| plain_text = soup.get_text() |
| import pyperclip |
| pyperclip.copy(plain_text) |
| |
| return "Copied to clipboard!" |
|
|
|
|
| |
| default_prompt = """ |
| You are a scout who has played against this player, and you are analyzing the following statistics. |
| Create a scouting report for the head coach, detailing: |
| |
| 1) The player's strengths, along with a strategy to counter those strengths. |
| 2) The player's weaknesses, and how we can exploit those weaknesses based on the stats. |
| |
| Present the report in a way that is easy to read, combining each strength with its corresponding counter-strategy, and each weakness with an exploitation plan. |
| |
| At the end of the report, include a βKey Points to Emphasizeβ section. |
| |
| Use HTML formatting for the output, and apply a dark color palette (e.g., dark green, dark red, dark gray) for different sections to enhance visual readability. |
| """ |
|
|
| |
| |
| |
|
|
| with gr.Blocks() as app: |
| |
| gr.Markdown("## π Scouting AI App") |
| gr.Markdown("Welcome to the Scouting AI App! Upload your files, submit queries, and check job statuses easily. Game on! π") |
| |
| |
| with gr.Row(): |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### π Load File Section") |
| gr.Markdown("Upload your **.xlsm** file below, specify the sheet name, and click *Load Sheet* to process your file.") |
| file_input = gr.File(label="Upload .xlsm File") |
| sheet_input_file = gr.Textbox(label="Sheet Name") |
| load_button_file = gr.Button("Load Sheet") |
| sheet_output_file = gr.Textbox(label="Sheet Info", interactive=False) |
| |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### π Job Information") |
| gr.Markdown("View all submitted jobs, refresh the list, and check the status of individual jobs.") |
| |
| |
| job_list_display = gr.Markdown( |
| get_job_list(), |
| elem_id="job-list-display", |
| elem_classes=["scrollable-job-list"] |
| ) |
| |
| |
| gr.HTML(""" |
| <style> |
| .scrollable-job-list { |
| height: 220px; |
| overflow-y: auto; |
| border: 1px solid #ccc; |
| padding: 10px; |
| margin-bottom: 10px; |
| } |
| </style> |
| """) |
| |
| refresh_button = gr.Button("Refresh Job List") |
| |
| gr.Markdown("#### π Check Job Status") |
| job_id_input = gr.Textbox(label="Enter Job ID") |
| check_status_button = gr.Button("Check Job Status") |
| |
| |
| with gr.Row(): |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### π Submit Query") |
| gr.Markdown("Enter your prompt below and choose a model. Your query will be processed in the background.") |
| |
| |
| model_dropdown = gr.Dropdown( |
| choices=[ |
| "πΊπΈ GPT-3.5", |
| "πΊπΈ GPT-4o", |
| "πΊπΈ GPT-4o mini", |
| "πΊπΈ o1-mini", |
| "πΊπΈ o3-mini", |
| "πΊπΈ Remote Meta-Llama-3", |
| "πͺπΊ Mistral-API", |
| ], |
| value="πΊπΈ GPT-4o mini", |
| label="Select Model" |
| ) |
| max_tokens_slider = gr.Slider(minimum=200, maximum=4096, value=1200, label="π’ Max Tokens", step=50) |
| |
| prompt_input = gr.Textbox(label="Enter your prompt", value=default_prompt, lines=6) |
| with gr.Row(): |
| auto_refresh_checkbox = gr.Checkbox( |
| label="Enable Auto Refresh", |
| value=False |
| ) |
| submit_button = gr.Button("Submit Query ") |
| |
| |
| |
| status_text = gr.Textbox(label="Response Text ", visible=True) |
| |
| response_output = gr.Textbox(label="Response", interactive=False) |
| token_info = gr.Textbox(label="Token Info", interactive=False) |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| with gr.Column(scale=1): |
| |
| status_output = gr.HTML(label="Job Status", interactive=False) |
| |
| job_id_display = gr.Textbox(label="Job ID", interactive=False) |
| input_tokens_display = gr.Textbox(label="Input Tokens", interactive=False) |
| output_tokens_display = gr.Textbox(label="Output Tokens", interactive=False) |
| job_query_display = gr.Textbox(label="Job Query", interactive=False) |
| |
| |
| |
| |
| |
| |
| def load_file(file, sheet_name): |
| global sheet_data |
| global file_name |
| file_name = file |
| sheet = sheet_name |
| |
| if file is None or sheet_name.strip() == "": |
| return "Please upload a file and enter a valid sheet name." |
| |
| try: |
| df = pd.read_excel(file.name, sheet_name=sheet_name) |
| sheet_data = df.to_string(index=False) |
| return sheet_data |
| except Exception as e: |
| return f"Error reading sheet: {str(e)}" |
| |
| load_button_file.click( |
| fn=load_file, |
| inputs=[file_input, sheet_input_file], |
| outputs=sheet_output_file |
| ) |
| |
| |
| submit_button.click( |
| fn=submit_query_async, |
| inputs=[prompt_input, model_dropdown, max_tokens_slider], |
| outputs=[ |
| response_output, token_info, |
| input_tokens_display, output_tokens_display, |
| job_id_input, job_query_display, job_list_display |
| ] |
| ) |
| |
| |
| check_status_button.click( |
| fn=check_job_status, |
| inputs=[job_id_input], |
| outputs=[status_output, job_id_display, input_tokens_display, |
| output_tokens_display, job_query_display] |
| ) |
| |
| |
| refresh_button.click( |
| fn=refresh_job_list, |
| inputs=[], |
| outputs=job_list_display |
| ) |
| |
| |
| auto_refresh_checkbox.change( |
| fn=periodic_update, |
| inputs=[auto_refresh_checkbox], |
| outputs=[job_list_display, status_output, status_text, job_id_display, input_tokens_display, output_tokens_display, job_query_display], |
| every=3 |
| ) |
| |
|
|
| |
| def show_copy_text(text): |
| |
| return gr.update(value=text, visible=True) |
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
|
|
| if __name__ == "__main__": |
| debug_print("Launching Gradio UI...") |
| app.queue().launch(share=False) |
|
|