heymenn's picture
Update app.py
d455ce2 verified
raw
history blame
28.8 kB
import gradio as gr
import pandas as pd
import json
from sentence_transformers import SentenceTransformer, util
import torch
import requests
import re
import urllib.parse
import itertools # For generating pairs
import os
import io # Required for Google Drive upload
# --- Configuration ---
CATEGORY_JSON_PATH = "categories.json"
TECHNOLOGY_EXCEL_PATH = "technologies.xlsx"
MODEL_NAME = 'all-MiniLM-L6-v2'
CATEGORY_SIMILARITY_THRESHOLD = 0.3 # Threshold for *displaying* the best category match
MAX_TECHNOLOGIES_TO_SHOW = 8 # Max technologies relevant to the problem (selected across ALL categories)
MAX_TECHNOLOGY_PAIRS_TO_SEARCH = 5 # Max pairs (from the relevant tech) to use for solution search
MAX_SEARCH_REFERENCES_PER_PAIR = 3 # Max references from the API per pair
SEARCH_API_URL = "https://ychkhan-ptt-endpoints.hf.space/search"
# --- Global Variables ---
# To store pre-computed embeddings and data
categories_data = {}
category_names = []
category_embeddings = None
technologies_df = pd.DataFrame()
technology_embeddings = None # Will store pre-computed embeddings for descriptions
model = None
###- GOOGLE DRIVE API
# Check if running in an environment where Google Credentials are set
# Use placeholder credentials if not found, but functionality will fail
GOOGLE_CREDENTIALS = os.environ.get("GOOGLE_CREDENTIALS")
FOLDER_ID = os.getenv("FOLDER_ID") # Optional: Folder ID for uploads
# Only import Google libraries if credentials are potentially available
if GOOGLE_CREDENTIALS:
try:
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload
GOOGLE_API_AVAILABLE = True
print("Google API libraries loaded.")
except ImportError:
print("Warning: Google API libraries not found. Google Drive upload will be disabled.")
GOOGLE_API_AVAILABLE = False
else:
print("Warning: GOOGLE_CREDENTIALS environment variable not set. Google Drive upload will be disabled.")
GOOGLE_API_AVAILABLE = False
# Define dummy functions or handle calls gracefully if needed elsewhere
def create_new_file_in_drive(*args, **kwargs):
print("Google Drive upload skipped: Credentials not configured.")
return None
if GOOGLE_API_AVAILABLE:
def create_new_file_in_drive(username, dataframe_to_upload, credentials_json_str, folder_id):
"""Crée un nouveau fichier CSV dans Google Drive à partir d'un DataFrame Pandas."""
print(f"Attempting to upload results for user: {username}")
if not credentials_json_str:
print("Error: Google Credentials JSON string is empty.")
return None
if not folder_id:
print("Warning: Google Drive FOLDER_ID not specified. Upload might fail or go to root.")
# Decide if you want to default to root or fail
# return None # Option: Fail if no folder ID
try:
creds_dict = json.loads(credentials_json_str)
except json.JSONDecodeError as e:
print(f"Error decoding Google Credentials JSON: {e}")
return None
try:
# Charger les informations d'identification du compte de service
creds = service_account.Credentials.from_service_account_info(creds_dict)
# Construire le service API Drive
service = build('drive', 'v3', credentials=creds)
# Convertir le DataFrame en fichier CSV en mémoire
csv_buffer = io.BytesIO()
# Ensure UTF-8 encoding, especially with BOM for Excel compatibility if needed
dataframe_to_upload.to_csv(csv_buffer, index=False, sep=';', encoding='utf-8-sig')
csv_buffer.seek(0)
# Créer les métadonnées du fichier
filename = f"rating-results-{username}.csv" # Consider adding a timestamp
file_metadata = {'name': filename}
if folder_id:
file_metadata['parents'] = [folder_id]
# Télécharger le fichier CSV sur Google Drive
media = MediaIoBaseUpload(csv_buffer, mimetype='text/csv', resumable=True)
file = service.files().create(body=file_metadata, media_body=media, fields='id, name, webViewLink').execute()
print(f"File '{file.get('name')}' created successfully in Google Drive. ID: {file.get('id')}")
print(f"Link: {file.get('webViewLink')}") # Optional: print link
return file.get('id')
except Exception as e:
print(f"Error during Google Drive upload: {e}")
# Consider more specific error handling (e.g., authentication errors)
return None
###-
# --- Load Data and Model (Load once at startup) ---
def load_data_and_model():
global categories_data, category_names, category_embeddings
global technologies_df, technology_embeddings, model
print("Loading data and model...")
try:
# Load Categories
with open(CATEGORY_JSON_PATH, 'r', encoding='utf-8') as f: # Specify encoding
categories_data = json.load(f)["Category"]
category_names = list(categories_data.keys())
category_texts = [f"{name}: {', '.join(keywords)}" for name, keywords in categories_data.items()]
print(f"Loaded {len(category_names)} categories.")
# Load Technologies
technologies_df = pd.read_excel(TECHNOLOGY_EXCEL_PATH)
# Clean column names (remove leading/trailing spaces)
technologies_df.columns = technologies_df.columns.str.strip()
# Ensure required columns exist
if 'technology' not in technologies_df.columns or 'description' not in technologies_df.columns:
raise ValueError("Missing required columns 'technology' or 'description' in technologies.xlsx")
technologies_df['category'] = technologies_df.get('category', '').fillna('').astype(str) # Use .get for optional category
technologies_df['description_clean'] = technologies_df['description'].fillna('').astype(str)
# Add a unique ID if 'technology' name isn't unique or for easier embedding mapping
technologies_df['tech_id'] = technologies_df.index
print(f"Loaded {len(technologies_df)} technologies.")
# Load Sentence Transformer Model
model = SentenceTransformer(MODEL_NAME)
print(f"Loaded Sentence Transformer model: {MODEL_NAME}")
# Pre-compute category embeddings
print("Computing category embeddings...")
category_embeddings = model.encode(category_texts, convert_to_tensor=True, show_progress_bar=True)
print("Category embeddings computed.")
# Pre-compute technology description embeddings
print("Computing technology description embeddings...")
valid_descriptions = technologies_df['description_clean'].tolist()
technology_embeddings = model.encode(valid_descriptions, convert_to_tensor=True, show_progress_bar=True)
print(f"Technology description embeddings computed (shape: {technology_embeddings.shape}).")
except FileNotFoundError as e:
print(f"ERROR: File not found - {e}. Please ensure '{CATEGORY_JSON_PATH}' and '{TECHNOLOGY_EXCEL_PATH}' exist.")
raise e
except Exception as e:
print(f"ERROR loading data or model: {e}")
raise e
# --- Helper Functions ---
def find_best_category(problem_description):
"""
Finds the most relevant category using pre-computed embeddings.
This is now primarily for informational output.
"""
if not problem_description or not category_names or category_embeddings is None:
return None, 0.0
try:
problem_embedding = model.encode(problem_description, convert_to_tensor=True)
cosine_scores = util.pytorch_cos_sim(problem_embedding, category_embeddings)[0]
best_score, best_idx = torch.max(cosine_scores, dim=0)
# Return the best category regardless of threshold, but indicate confidence
best_category_name = category_names[best_idx.item()]
best_category_score = best_score.item()
# Decide if the match is confident enough to strongly suggest
is_confident = best_category_score >= CATEGORY_SIMILARITY_THRESHOLD
return best_category_name, best_category_score, is_confident
except Exception as e:
print(f"Error during category finding: {e}")
return None, 0.0, False
# --- MODIFIED FUNCTION ---
def find_relevant_technologies(problem_description):
"""
Calculates similarity between the problem description and ALL technology
descriptions using pre-computed embeddings, sorts, and returns the top results.
Category is no longer used for filtering here.
"""
all_tech_data = []
if technologies_df.empty or technology_embeddings is None or not problem_description:
print("Warning: Technologies DF, embeddings, or problem description missing.")
return pd.DataFrame()
try:
problem_embedding = model.encode(problem_description, convert_to_tensor=True)
# Iterate through ALL technologies
for index, row in technologies_df.iterrows():
tech_id = row['tech_id'] # Use the pre-assigned index/id
# Ensure tech_id is within the bounds of the embeddings tensor
if tech_id >= technology_embeddings.shape[0]:
print(f"Warning: tech_id {tech_id} is out of bounds for technology_embeddings (shape: {technology_embeddings.shape}). Skipping.")
continue
# Retrieve pre-computed embedding using tech_id
tech_embedding = technology_embeddings[tech_id]
# Calculate similarity score with the problem
# Ensure embeddings are compatible (e.g., both are single vectors)
if problem_embedding.ndim == 1:
problem_embedding_exp = problem_embedding.unsqueeze(0) # Add batch dimension if needed
else:
problem_embedding_exp = problem_embedding
if tech_embedding.ndim == 1:
tech_embedding_exp = tech_embedding.unsqueeze(0)
else:
tech_embedding_exp = tech_embedding
similarity_score = util.pytorch_cos_sim(problem_embedding_exp, tech_embedding_exp)[0][0].item()
# Store the original row data and the similarity score
all_tech_data.append({'data': row.to_dict(), 'similarity_score_problem': similarity_score})
# Sort technologies based on similarity to the problem (descending)
all_tech_data.sort(key=lambda item: item['similarity_score_problem'], reverse=True)
if not all_tech_data:
print("No technologies found or scored.")
return pd.DataFrame()
# Create DataFrame from the top N results
# Extract the 'data' part (which is a dict) for DataFrame creation
top_tech_rows = [item['data'] for item in all_tech_data[:MAX_TECHNOLOGIES_TO_SHOW]]
# Extract the corresponding scores
top_tech_scores = [item['similarity_score_problem'] for item in all_tech_data[:MAX_TECHNOLOGIES_TO_SHOW]]
if not top_tech_rows:
return pd.DataFrame()
relevant_df = pd.DataFrame(top_tech_rows)
# Important: Ensure the index aligns if you add the score column later
relevant_df = relevant_df.reset_index(drop=True)
relevant_df['similarity_score_problem'] = top_tech_scores # Add scores as a new column
# print(f"Top relevant technologies DF head:\n{relevant_df.head()}") # Debug print
return relevant_df # Return the top N technologies based on problem similarity
except Exception as e:
print(f"Error during technology finding/scoring: {e}")
import traceback
traceback.print_exc() # Print full traceback for debugging
return pd.DataFrame()
def find_top_technology_pairs(relevant_technologies_df):
"""
Calculates similarity between pairs of the identified relevant technologies
(which were selected based on problem similarity) and returns the top pairs.
Uses pre-computed embeddings.
"""
if relevant_technologies_df.empty or len(relevant_technologies_df) < 2 or technology_embeddings is None:
# print("Warning: Not enough relevant technologies (<2) or embeddings missing for pairing.")
return []
pairs_with_scores = []
# Use tech_id (which should be the original index) to reliably get embeddings
# Check if 'tech_id' column exists in the relevant_technologies_df
if 'tech_id' not in relevant_technologies_df.columns:
print("Error: 'tech_id' column missing in relevant_technologies_df. Cannot proceed with pairing.")
return []
tech_ids = relevant_technologies_df['tech_id'].tolist()
# Create a mapping from tech_id back to the technology name in the relevant subset for easy lookup
tech_id_to_name = pd.Series(relevant_technologies_df['technology'].values, index=relevant_technologies_df['tech_id']).to_dict()
# Generate unique pairs of tech_ids from the relevant list
for id_a, id_b in itertools.combinations(tech_ids, 2):
try:
# Retrieve pre-computed embeddings using the original index (tech_id)
# Add boundary checks again just in case
if id_a >= technology_embeddings.shape[0] or id_b >= technology_embeddings.shape[0]:
print(f"Warning: tech_id {id_a} or {id_b} out of bounds for embeddings. Skipping pair.")
continue
embedding_a = technology_embeddings[id_a]
embedding_b = technology_embeddings[id_b]
# Ensure embeddings are 1D or correctly shaped for cos_sim
if embedding_a.ndim > 1: embedding_a = embedding_a.squeeze()
if embedding_b.ndim > 1: embedding_b = embedding_b.squeeze()
if embedding_a.ndim == 0 or embedding_b.ndim == 0: # Check if squeeze resulted in 0-dim tensor
print(f"Warning: Invalid embedding dimension after squeeze for pair ({id_a}, {id_b}). Skipping.")
continue
# Calculate inter-technology similarity
inter_similarity = util.pytorch_cos_sim(embedding_a, embedding_b)[0][0].item()
# Get technology names using the mapping created earlier
tech_name_a = tech_id_to_name.get(id_a, f"Unknown Tech (ID:{id_a})")
tech_name_b = tech_id_to_name.get(id_b, f"Unknown Tech (ID:{id_b})")
# Clean names for display/use
clean_tech_name_a = re.sub(r'^- Title\s*:\s*', '', str(tech_name_a)).strip()
clean_tech_name_b = re.sub(r'^- Title\s*:\s*', '', str(tech_name_b)).strip()
pairs_with_scores.append(((clean_tech_name_a, clean_tech_name_b), inter_similarity))
except IndexError:
print(f"Warning: Could not find pre-computed embedding for index {id_a} or {id_b}. Skipping pair.")
continue
except Exception as e:
print(f"Error calculating similarity for pair ({id_a}, {id_b}): {e}")
import traceback
traceback.print_exc()
continue
# Sort pairs by inter-similarity score (descending)
pairs_with_scores.sort(key=lambda item: item[1], reverse=True)
# Return the top K pairs
# print(f"Top pairs identified: {pairs_with_scores[:MAX_TECHNOLOGY_PAIRS_TO_SEARCH]}") # Debug print
return pairs_with_scores[:MAX_TECHNOLOGY_PAIRS_TO_SEARCH]
def search_solutions_for_pairs(problem_description, top_pairs):
"""
Searches for solutions/patents using pairs of technologies via the API.
"""
results = {} # Store results keyed by the pair tuple
if not top_pairs or not problem_description:
# Provide a more informative message if no pairs were generated
if not top_pairs:
return "No relevant technology pairs were identified (need at least 2 relevant technologies). Cannot search for solutions.\n"
else: # problem_description must be missing
return "Problem description is missing. Cannot search for solutions.\n"
headers = {'accept': 'application/json'}
for pair_info in top_pairs:
pair_names, pair_score = pair_info
tech_a_name, tech_b_name = pair_names
if not tech_a_name or not tech_b_name: continue # Skip if names are invalid
# Construct query for the API
# Focus query on tech combination and context (patent/research)
# Keep problem description out of the API query unless the API is designed for it
# query = f'"{tech_a_name}" AND "{tech_b_name}" patent OR research paper OR application'
# More targeted query:
query = f'Combining {tech_a_name} and {tech_b_name} for applications related to {problem_description}' # Use snippet of problem
params = {
'query': query,
'max_references': MAX_SEARCH_REFERENCES_PER_PAIR
}
encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) # Ensure proper encoding
full_url = f"{SEARCH_API_URL}?{encoded_params}"
pair_key = f"{tech_a_name} + {tech_b_name}" # Key for storing results
print(f"Calling API for pair ({pair_key}): POST {SEARCH_API_URL} with query: {query}") # Log query separately
try:
# Using POST as originally indicated, send params in the body (common for longer queries)
# If API expects GET, change to requests.get(full_url, headers=headers)
response = requests.post(SEARCH_API_URL, headers=headers, params=params, timeout=45) # Increased timeout
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
try:
api_response = response.json()
except json.JSONDecodeError:
err_msg = f"API Error: Invalid JSON response. Status: {response.status_code}, Response text: {response.text[:200]}"
print(f"Error decoding JSON response for pair '{pair_key}'. {err_msg}")
results[pair_key] = {"score": pair_score, "error": err_msg}
continue # Skip to next pair
search_results = []
# --- Adapt based on actual API response structure ---
if isinstance(api_response, list):
search_results = api_response # Assumes list of dicts like {'title': '...', 'url': '...'}
elif isinstance(api_response, dict) and 'results' in api_response and isinstance(api_response['results'], list):
search_results = api_response['results']
elif isinstance(api_response, dict) and 'references' in api_response and isinstance(api_response['references'], list):
# Handle potential alternative key name
search_results = api_response['references']
else:
print(f"Warning: Unexpected API response format for pair '{pair_key}'. Response: {api_response}")
# Attempt to extract links if possible, otherwise mark as no results
# This part needs adjustment based on observed API responses
search_results = [] # Default to empty if format unknown
# --- End adaptation ---
valid_links = []
for r in search_results:
if isinstance(r, dict):
title = r.get('title', 'N/A')
url = r.get('url', r.get('link')) # Check for 'url' or 'link'
if url and isinstance(url, str) and url.startswith(('http://', 'https://')):
valid_links.append({'title': title, 'link': url})
elif url:
print(f"Warning: Invalid or missing URL for result '{title}' in pair '{pair_key}': {url}")
results[pair_key] = {
"score": pair_score, # Store pair score for context
"links": valid_links
}
except requests.exceptions.Timeout:
print(f"Error: API call timed out for pair '{pair_key}'")
results[pair_key] = {"score": pair_score, "error": "API Timeout"}
except requests.exceptions.HTTPError as e:
print(f"Error: HTTP Error calling search API for pair '{pair_key}': {e}")
results[pair_key] = {"score": pair_score, "error": f"API HTTP Error: {e.response.status_code}"}
except requests.exceptions.RequestException as e:
print(f"Error calling search API for pair '{pair_key}': {e}")
results[pair_key] = {"score": pair_score, "error": f"API Request Error: {e}"}
except Exception as e:
err_msg = f"Unexpected Error during API call: {e}"
print(f"Unexpected error during API call for pair '{pair_key}': {e}")
import traceback
traceback.print_exc()
results[pair_key] = {"score": pair_score, "error": err_msg}
# Format results for display
output = f"### Potential Solutions & Patents (Found using Top {len(results)} Technology Pairs):\n\n"
if not results:
output += "No search results could be retrieved from the API for the generated technology pairs."
return output
# Display results in the order they were searched (already sorted by pair score)
for pair_key, search_data in results.items():
pair_score = search_data.get('score', 0.0)
output += f"**For Technology Pair: {pair_key}** (Inter-Similarity Score: {pair_score:.3f})\n" # More precision
if "error" in search_data:
output += f"- *Search failed: {search_data['error']}*\n"
elif "links" in search_data:
links = search_data["links"]
if links:
for link_info in links:
# Ensure title is a string before replacing
title_str = str(link_info.get('title', 'N/A'))
# Basic sanitization for Markdown display
title_sanitized = title_str.replace('[','(').replace(']',')')
output += f"- [{title_sanitized}]({link_info.get('link', '#')})\n"
else:
output += "- *No specific results found by the API for this technology pair.*\n"
else:
output += "- *Unknown search result state.*\n"
output += "\n" # Add space between pairs
return output
# --- Main Processing Function ---
def process_problem(problem_description):
"""
Main function called by Gradio interface. Orchestrates the process.
"""
print(f"\n--- Processing request for: '{problem_description[:100]}...' ---") # Log start
if not problem_description:
return "Please enter a problem description."
# 1. Categorize Problem (Informational)
category_name, cat_score, is_confident = find_best_category(problem_description)
if category_name:
confidence_text = "(Confident Match)" if is_confident else "(Possible Match)"
category_output = f"**Best Matching Category:** {category_name} {confidence_text} (Similarity Score: {cat_score:.3f})"
else:
category_output = "**Could not identify a matching category.**"
print(f"Category identified: {category_name} (Score: {cat_score:.3f}, Confident: {is_confident})")
# 2. Find Relevant Technologies (relative to problem, across ALL categories)
relevant_technologies_df = find_relevant_technologies(problem_description)
print(f"Found {len(relevant_technologies_df)} relevant technologies based on problem similarity.")
tech_output = ""
if not relevant_technologies_df.empty:
tech_output += f"### Top {len(relevant_technologies_df)} Most Relevant Technologies (selected based on similarity to your problem):\n\n"
for _, row in relevant_technologies_df.iterrows():
tech_name = re.sub(r'^- Title\s*:\s*', '', str(row.get('technology', 'N/A'))).strip()
problem_relevance = row.get('similarity_score_problem', 0.0)
tech_output += f"- **{tech_name}** (Problem Relevance: {problem_relevance:.3f})\n"
original_cats = str(row.get('category', 'Unknown')).strip()
if original_cats:
tech_output += f" *Original Category listed as: {original_cats}*\n"
tech_output += "\n---\n" # Add separator
else:
tech_output = "Could not identify any relevant technologies based on the problem description.\n\n---\n"
# 3. Find Top Technology Pairs (based on inter-similarity among the relevant ones)
top_pairs = find_top_technology_pairs(relevant_technologies_df)
print(f"Identified {len(top_pairs)} top technology pairs for searching.")
pairs_output = ""
if top_pairs:
pairs_output += f"### Top {len(top_pairs)} Technology Pairs (selected from the relevant technologies above, based on their inter-similarity):\n\n"
for pair_names, score in top_pairs:
pairs_output += f"- **{pair_names[0]} + {pair_names[1]}** (Inter-Similarity: {score:.3f})\n"
pairs_output += "\n---\n"
# Note: The "else" case message will be added during final output assembly
# 4. Search for Solutions using the Top Pairs
solution_output = search_solutions_for_pairs(problem_description, top_pairs)
print("API search for solutions completed.")
# 5. Combine Outputs for Gradio --- CORRECTED SECTION ---
final_output = (
f"## Analysis Results for: \"{problem_description[:150]}...\"\n\n"
f"{category_output}\n\n"
f"{tech_output}"
# Intentionally left blank line above for structure
)
# Add the pairs section conditionally - This avoids the backslash issue
if top_pairs:
final_output += pairs_output # pairs_output already contains formatting and separators
else:
# Add the "no pairs" message directly here
final_output += "No technology pairs identified to search with.\n\n---\n"
# Add the solution output
final_output += solution_output
# --- END OF CORRECTION ---
print("--- Processing finished ---")
return final_output
# --- Create Gradio Interface ---
print("Setting up Gradio interface...")
# Load data only once when the script starts
try:
load_data_and_model()
interface_enabled = True
except Exception as e:
print(f"FATAL: Failed to initialize application. Error: {e}")
interface_enabled = False
# Only create interface if initialization succeeded
if interface_enabled:
iface = gr.Interface(
fn=process_problem,
inputs=gr.Textbox(lines=5, label="Enter Technical Problem Description", placeholder="Describe your technical challenge or requirement here... e.g., 'Develop low-latency communication protocols for 6G networks'"),
outputs=gr.Markdown(label="Analysis and Potential Solutions"),
title="Technical Problem Analyzer v4 (Cross-Category Relevance)",
description=(
"Enter a technical problem. The app:\n"
"1. Identifies the best matching **category** (for informational purposes).\n"
"2. Finds the **most relevant technologies** based *directly on your problem description* (across all categories).\n"
"3. Identifies **promising pairs** among these relevant technologies based on their similarity to each other.\n"
"4. Searches for **patents/research** using these pairs via an external API."
),
examples=[
["How can I establish reliable communication between low-orbit satellites for continuous global monitoring?"],
["Need a system to automatically detect anomalies in sensor data from industrial machinery using machine learning."],
["Develop low-latency communication protocols for 6G networks"],
["Design efficient routing algorithms for large scale mesh networks in smart cities"],
["Create biodegradable packaging material from agricultural waste"], # Example crossing categories potentially
["Develop a method for real-time traffic prediction using heterogeneous data sources"]
],
allow_flagging='never',
# Add theme for better visuals if desired
# theme=gr.themes.Soft()
)
else:
# Provide a dummy interface indicating failure
def error_fn():
return "Application failed to initialize. Please check the logs for errors (e.g., missing files or model issues)."
iface = gr.Interface(fn=error_fn, inputs=[], outputs=gr.Markdown(), title="Initialization Failed")
# --- Launch the App ---
if __name__ == "__main__":
print("Launching Gradio app...")
# Consider adding share=True for public link if running on appropriate infra
# debug=True can be helpful during development
iface.launch()