import gradio as gr import pandas as pd import json from sentence_transformers import SentenceTransformer, util import torch import requests import re import urllib.parse import itertools # For generating pairs import os import io # Required for Google Drive upload # --- Configuration --- CATEGORY_JSON_PATH = "categories.json" TECHNOLOGY_EXCEL_PATH = "technologies.xlsx" MODEL_NAME = 'all-MiniLM-L6-v2' CATEGORY_SIMILARITY_THRESHOLD = 0.3 # Threshold for *displaying* the best category match MAX_TECHNOLOGIES_TO_SHOW = 8 # Max technologies relevant to the problem (selected across ALL categories) MAX_TECHNOLOGY_PAIRS_TO_SEARCH = 5 # Max pairs (from the relevant tech) to use for solution search MAX_SEARCH_REFERENCES_PER_PAIR = 3 # Max references from the API per pair SEARCH_API_URL = "https://ychkhan-ptt-endpoints.hf.space/search" # --- Global Variables --- # To store pre-computed embeddings and data categories_data = {} category_names = [] category_embeddings = None technologies_df = pd.DataFrame() technology_embeddings = None # Will store pre-computed embeddings for descriptions model = None ###- GOOGLE DRIVE API # Check if running in an environment where Google Credentials are set # Use placeholder credentials if not found, but functionality will fail GOOGLE_CREDENTIALS = os.environ.get("GOOGLE_CREDENTIALS") FOLDER_ID = os.getenv("FOLDER_ID") # Optional: Folder ID for uploads # Only import Google libraries if credentials are potentially available if GOOGLE_CREDENTIALS: try: from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload GOOGLE_API_AVAILABLE = True print("Google API libraries loaded.") except ImportError: print("Warning: Google API libraries not found. Google Drive upload will be disabled.") GOOGLE_API_AVAILABLE = False else: print("Warning: GOOGLE_CREDENTIALS environment variable not set. Google Drive upload will be disabled.") GOOGLE_API_AVAILABLE = False # Define dummy functions or handle calls gracefully if needed elsewhere def create_new_file_in_drive(*args, **kwargs): print("Google Drive upload skipped: Credentials not configured.") return None if GOOGLE_API_AVAILABLE: def create_new_file_in_drive(username, dataframe_to_upload, credentials_json_str, folder_id): """Crée un nouveau fichier CSV dans Google Drive à partir d'un DataFrame Pandas.""" print(f"Attempting to upload results for user: {username}") if not credentials_json_str: print("Error: Google Credentials JSON string is empty.") return None if not folder_id: print("Warning: Google Drive FOLDER_ID not specified. Upload might fail or go to root.") # Decide if you want to default to root or fail # return None # Option: Fail if no folder ID try: creds_dict = json.loads(credentials_json_str) except json.JSONDecodeError as e: print(f"Error decoding Google Credentials JSON: {e}") return None try: # Charger les informations d'identification du compte de service creds = service_account.Credentials.from_service_account_info(creds_dict) # Construire le service API Drive service = build('drive', 'v3', credentials=creds) # Convertir le DataFrame en fichier CSV en mémoire csv_buffer = io.BytesIO() # Ensure UTF-8 encoding, especially with BOM for Excel compatibility if needed dataframe_to_upload.to_csv(csv_buffer, index=False, sep=';', encoding='utf-8-sig') csv_buffer.seek(0) # Créer les métadonnées du fichier filename = f"rating-results-{username}.csv" # Consider adding a timestamp file_metadata = {'name': filename} if folder_id: file_metadata['parents'] = [folder_id] # Télécharger le fichier CSV sur Google Drive media = MediaIoBaseUpload(csv_buffer, mimetype='text/csv', resumable=True) file = service.files().create(body=file_metadata, media_body=media, fields='id, name, webViewLink').execute() print(f"File '{file.get('name')}' created successfully in Google Drive. ID: {file.get('id')}") print(f"Link: {file.get('webViewLink')}") # Optional: print link return file.get('id') except Exception as e: print(f"Error during Google Drive upload: {e}") # Consider more specific error handling (e.g., authentication errors) return None ###- # --- Load Data and Model (Load once at startup) --- def load_data_and_model(): global categories_data, category_names, category_embeddings global technologies_df, technology_embeddings, model print("Loading data and model...") try: # Load Categories with open(CATEGORY_JSON_PATH, 'r', encoding='utf-8') as f: # Specify encoding categories_data = json.load(f)["Category"] category_names = list(categories_data.keys()) category_texts = [f"{name}: {', '.join(keywords)}" for name, keywords in categories_data.items()] print(f"Loaded {len(category_names)} categories.") # Load Technologies technologies_df = pd.read_excel(TECHNOLOGY_EXCEL_PATH) # Clean column names (remove leading/trailing spaces) technologies_df.columns = technologies_df.columns.str.strip() # Ensure required columns exist if 'technology' not in technologies_df.columns or 'description' not in technologies_df.columns: raise ValueError("Missing required columns 'technology' or 'description' in technologies.xlsx") technologies_df['category'] = technologies_df.get('category', '').fillna('').astype(str) # Use .get for optional category technologies_df['description_clean'] = technologies_df['description'].fillna('').astype(str) # Add a unique ID if 'technology' name isn't unique or for easier embedding mapping technologies_df['tech_id'] = technologies_df.index print(f"Loaded {len(technologies_df)} technologies.") # Load Sentence Transformer Model model = SentenceTransformer(MODEL_NAME) print(f"Loaded Sentence Transformer model: {MODEL_NAME}") # Pre-compute category embeddings print("Computing category embeddings...") category_embeddings = model.encode(category_texts, convert_to_tensor=True, show_progress_bar=True) print("Category embeddings computed.") # Pre-compute technology description embeddings print("Computing technology description embeddings...") valid_descriptions = technologies_df['description_clean'].tolist() technology_embeddings = model.encode(valid_descriptions, convert_to_tensor=True, show_progress_bar=True) print(f"Technology description embeddings computed (shape: {technology_embeddings.shape}).") except FileNotFoundError as e: print(f"ERROR: File not found - {e}. Please ensure '{CATEGORY_JSON_PATH}' and '{TECHNOLOGY_EXCEL_PATH}' exist.") raise e except Exception as e: print(f"ERROR loading data or model: {e}") raise e # --- Helper Functions --- def find_best_category(problem_description): """ Finds the most relevant category using pre-computed embeddings. This is now primarily for informational output. """ if not problem_description or not category_names or category_embeddings is None: return None, 0.0 try: problem_embedding = model.encode(problem_description, convert_to_tensor=True) cosine_scores = util.pytorch_cos_sim(problem_embedding, category_embeddings)[0] best_score, best_idx = torch.max(cosine_scores, dim=0) # Return the best category regardless of threshold, but indicate confidence best_category_name = category_names[best_idx.item()] best_category_score = best_score.item() # Decide if the match is confident enough to strongly suggest is_confident = best_category_score >= CATEGORY_SIMILARITY_THRESHOLD return best_category_name, best_category_score, is_confident except Exception as e: print(f"Error during category finding: {e}") return None, 0.0, False # --- MODIFIED FUNCTION --- def find_relevant_technologies(problem_description): """ Calculates similarity between the problem description and ALL technology descriptions using pre-computed embeddings, sorts, and returns the top results. Category is no longer used for filtering here. """ all_tech_data = [] if technologies_df.empty or technology_embeddings is None or not problem_description: print("Warning: Technologies DF, embeddings, or problem description missing.") return pd.DataFrame() try: problem_embedding = model.encode(problem_description, convert_to_tensor=True) # Iterate through ALL technologies for index, row in technologies_df.iterrows(): tech_id = row['tech_id'] # Use the pre-assigned index/id # Ensure tech_id is within the bounds of the embeddings tensor if tech_id >= technology_embeddings.shape[0]: print(f"Warning: tech_id {tech_id} is out of bounds for technology_embeddings (shape: {technology_embeddings.shape}). Skipping.") continue # Retrieve pre-computed embedding using tech_id tech_embedding = technology_embeddings[tech_id] # Calculate similarity score with the problem # Ensure embeddings are compatible (e.g., both are single vectors) if problem_embedding.ndim == 1: problem_embedding_exp = problem_embedding.unsqueeze(0) # Add batch dimension if needed else: problem_embedding_exp = problem_embedding if tech_embedding.ndim == 1: tech_embedding_exp = tech_embedding.unsqueeze(0) else: tech_embedding_exp = tech_embedding similarity_score = util.pytorch_cos_sim(problem_embedding_exp, tech_embedding_exp)[0][0].item() # Store the original row data and the similarity score all_tech_data.append({'data': row.to_dict(), 'similarity_score_problem': similarity_score}) # Sort technologies based on similarity to the problem (descending) all_tech_data.sort(key=lambda item: item['similarity_score_problem'], reverse=True) if not all_tech_data: print("No technologies found or scored.") return pd.DataFrame() # Create DataFrame from the top N results # Extract the 'data' part (which is a dict) for DataFrame creation top_tech_rows = [item['data'] for item in all_tech_data[:MAX_TECHNOLOGIES_TO_SHOW]] # Extract the corresponding scores top_tech_scores = [item['similarity_score_problem'] for item in all_tech_data[:MAX_TECHNOLOGIES_TO_SHOW]] if not top_tech_rows: return pd.DataFrame() relevant_df = pd.DataFrame(top_tech_rows) # Important: Ensure the index aligns if you add the score column later relevant_df = relevant_df.reset_index(drop=True) relevant_df['similarity_score_problem'] = top_tech_scores # Add scores as a new column # print(f"Top relevant technologies DF head:\n{relevant_df.head()}") # Debug print return relevant_df # Return the top N technologies based on problem similarity except Exception as e: print(f"Error during technology finding/scoring: {e}") import traceback traceback.print_exc() # Print full traceback for debugging return pd.DataFrame() def find_top_technology_pairs(relevant_technologies_df): """ Calculates similarity between pairs of the identified relevant technologies (which were selected based on problem similarity) and returns the top pairs. Uses pre-computed embeddings. """ if relevant_technologies_df.empty or len(relevant_technologies_df) < 2 or technology_embeddings is None: # print("Warning: Not enough relevant technologies (<2) or embeddings missing for pairing.") return [] pairs_with_scores = [] # Use tech_id (which should be the original index) to reliably get embeddings # Check if 'tech_id' column exists in the relevant_technologies_df if 'tech_id' not in relevant_technologies_df.columns: print("Error: 'tech_id' column missing in relevant_technologies_df. Cannot proceed with pairing.") return [] tech_ids = relevant_technologies_df['tech_id'].tolist() # Create a mapping from tech_id back to the technology name in the relevant subset for easy lookup tech_id_to_name = pd.Series(relevant_technologies_df['technology'].values, index=relevant_technologies_df['tech_id']).to_dict() # Generate unique pairs of tech_ids from the relevant list for id_a, id_b in itertools.combinations(tech_ids, 2): try: # Retrieve pre-computed embeddings using the original index (tech_id) # Add boundary checks again just in case if id_a >= technology_embeddings.shape[0] or id_b >= technology_embeddings.shape[0]: print(f"Warning: tech_id {id_a} or {id_b} out of bounds for embeddings. Skipping pair.") continue embedding_a = technology_embeddings[id_a] embedding_b = technology_embeddings[id_b] # Ensure embeddings are 1D or correctly shaped for cos_sim if embedding_a.ndim > 1: embedding_a = embedding_a.squeeze() if embedding_b.ndim > 1: embedding_b = embedding_b.squeeze() if embedding_a.ndim == 0 or embedding_b.ndim == 0: # Check if squeeze resulted in 0-dim tensor print(f"Warning: Invalid embedding dimension after squeeze for pair ({id_a}, {id_b}). Skipping.") continue # Calculate inter-technology similarity inter_similarity = util.pytorch_cos_sim(embedding_a, embedding_b)[0][0].item() # Get technology names using the mapping created earlier tech_name_a = tech_id_to_name.get(id_a, f"Unknown Tech (ID:{id_a})") tech_name_b = tech_id_to_name.get(id_b, f"Unknown Tech (ID:{id_b})") # Clean names for display/use clean_tech_name_a = re.sub(r'^- Title\s*:\s*', '', str(tech_name_a)).strip() clean_tech_name_b = re.sub(r'^- Title\s*:\s*', '', str(tech_name_b)).strip() pairs_with_scores.append(((clean_tech_name_a, clean_tech_name_b), inter_similarity)) except IndexError: print(f"Warning: Could not find pre-computed embedding for index {id_a} or {id_b}. Skipping pair.") continue except Exception as e: print(f"Error calculating similarity for pair ({id_a}, {id_b}): {e}") import traceback traceback.print_exc() continue # Sort pairs by inter-similarity score (descending) pairs_with_scores.sort(key=lambda item: item[1], reverse=True) # Return the top K pairs # print(f"Top pairs identified: {pairs_with_scores[:MAX_TECHNOLOGY_PAIRS_TO_SEARCH]}") # Debug print return pairs_with_scores[:MAX_TECHNOLOGY_PAIRS_TO_SEARCH] def search_solutions_for_pairs(problem_description, top_pairs): """ Searches for solutions/patents using pairs of technologies via the API. """ results = {} # Store results keyed by the pair tuple if not top_pairs or not problem_description: # Provide a more informative message if no pairs were generated if not top_pairs: return "No relevant technology pairs were identified (need at least 2 relevant technologies). Cannot search for solutions.\n" else: # problem_description must be missing return "Problem description is missing. Cannot search for solutions.\n" headers = {'accept': 'application/json'} for pair_info in top_pairs: pair_names, pair_score = pair_info tech_a_name, tech_b_name = pair_names if not tech_a_name or not tech_b_name: continue # Skip if names are invalid # Construct query for the API # Focus query on tech combination and context (patent/research) # Keep problem description out of the API query unless the API is designed for it # query = f'"{tech_a_name}" AND "{tech_b_name}" patent OR research paper OR application' # More targeted query: query = f'Combining {tech_a_name} and {tech_b_name} for applications related to {problem_description}' # Use snippet of problem params = { 'query': query, 'max_references': MAX_SEARCH_REFERENCES_PER_PAIR } encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) # Ensure proper encoding full_url = f"{SEARCH_API_URL}?{encoded_params}" pair_key = f"{tech_a_name} + {tech_b_name}" # Key for storing results print(f"Calling API for pair ({pair_key}): POST {SEARCH_API_URL} with query: {query}") # Log query separately try: # Using POST as originally indicated, send params in the body (common for longer queries) # If API expects GET, change to requests.get(full_url, headers=headers) response = requests.post(SEARCH_API_URL, headers=headers, params=params, timeout=45) # Increased timeout response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) try: api_response = response.json() except json.JSONDecodeError: err_msg = f"API Error: Invalid JSON response. Status: {response.status_code}, Response text: {response.text[:200]}" print(f"Error decoding JSON response for pair '{pair_key}'. {err_msg}") results[pair_key] = {"score": pair_score, "error": err_msg} continue # Skip to next pair search_results = [] # --- Adapt based on actual API response structure --- if isinstance(api_response, list): search_results = api_response # Assumes list of dicts like {'title': '...', 'url': '...'} elif isinstance(api_response, dict) and 'results' in api_response and isinstance(api_response['results'], list): search_results = api_response['results'] elif isinstance(api_response, dict) and 'references' in api_response and isinstance(api_response['references'], list): # Handle potential alternative key name search_results = api_response['references'] else: print(f"Warning: Unexpected API response format for pair '{pair_key}'. Response: {api_response}") # Attempt to extract links if possible, otherwise mark as no results # This part needs adjustment based on observed API responses search_results = [] # Default to empty if format unknown # --- End adaptation --- valid_links = [] for r in search_results: if isinstance(r, dict): title = r.get('title', 'N/A') url = r.get('url', r.get('link')) # Check for 'url' or 'link' if url and isinstance(url, str) and url.startswith(('http://', 'https://')): valid_links.append({'title': title, 'link': url}) elif url: print(f"Warning: Invalid or missing URL for result '{title}' in pair '{pair_key}': {url}") results[pair_key] = { "score": pair_score, # Store pair score for context "links": valid_links } except requests.exceptions.Timeout: print(f"Error: API call timed out for pair '{pair_key}'") results[pair_key] = {"score": pair_score, "error": "API Timeout"} except requests.exceptions.HTTPError as e: print(f"Error: HTTP Error calling search API for pair '{pair_key}': {e}") results[pair_key] = {"score": pair_score, "error": f"API HTTP Error: {e.response.status_code}"} except requests.exceptions.RequestException as e: print(f"Error calling search API for pair '{pair_key}': {e}") results[pair_key] = {"score": pair_score, "error": f"API Request Error: {e}"} except Exception as e: err_msg = f"Unexpected Error during API call: {e}" print(f"Unexpected error during API call for pair '{pair_key}': {e}") import traceback traceback.print_exc() results[pair_key] = {"score": pair_score, "error": err_msg} # Format results for display output = f"### Potential Solutions & Patents (Found using Top {len(results)} Technology Pairs):\n\n" if not results: output += "No search results could be retrieved from the API for the generated technology pairs." return output # Display results in the order they were searched (already sorted by pair score) for pair_key, search_data in results.items(): pair_score = search_data.get('score', 0.0) output += f"**For Technology Pair: {pair_key}** (Inter-Similarity Score: {pair_score:.3f})\n" # More precision if "error" in search_data: output += f"- *Search failed: {search_data['error']}*\n" elif "links" in search_data: links = search_data["links"] if links: for link_info in links: # Ensure title is a string before replacing title_str = str(link_info.get('title', 'N/A')) # Basic sanitization for Markdown display title_sanitized = title_str.replace('[','(').replace(']',')') output += f"- [{title_sanitized}]({link_info.get('link', '#')})\n" else: output += "- *No specific results found by the API for this technology pair.*\n" else: output += "- *Unknown search result state.*\n" output += "\n" # Add space between pairs return output # --- Main Processing Function --- def process_problem(problem_description): """ Main function called by Gradio interface. Orchestrates the process. """ print(f"\n--- Processing request for: '{problem_description[:100]}...' ---") # Log start if not problem_description: return "Please enter a problem description." # 1. Categorize Problem (Informational) category_name, cat_score, is_confident = find_best_category(problem_description) if category_name: confidence_text = "(Confident Match)" if is_confident else "(Possible Match)" category_output = f"**Best Matching Category:** {category_name} {confidence_text} (Similarity Score: {cat_score:.3f})" else: category_output = "**Could not identify a matching category.**" print(f"Category identified: {category_name} (Score: {cat_score:.3f}, Confident: {is_confident})") # 2. Find Relevant Technologies (relative to problem, across ALL categories) relevant_technologies_df = find_relevant_technologies(problem_description) print(f"Found {len(relevant_technologies_df)} relevant technologies based on problem similarity.") tech_output = "" if not relevant_technologies_df.empty: tech_output += f"### Top {len(relevant_technologies_df)} Most Relevant Technologies (selected based on similarity to your problem):\n\n" for _, row in relevant_technologies_df.iterrows(): tech_name = re.sub(r'^- Title\s*:\s*', '', str(row.get('technology', 'N/A'))).strip() problem_relevance = row.get('similarity_score_problem', 0.0) tech_output += f"- **{tech_name}** (Problem Relevance: {problem_relevance:.3f})\n" original_cats = str(row.get('category', 'Unknown')).strip() if original_cats: tech_output += f" *Original Category listed as: {original_cats}*\n" tech_output += "\n---\n" # Add separator else: tech_output = "Could not identify any relevant technologies based on the problem description.\n\n---\n" # 3. Find Top Technology Pairs (based on inter-similarity among the relevant ones) top_pairs = find_top_technology_pairs(relevant_technologies_df) print(f"Identified {len(top_pairs)} top technology pairs for searching.") pairs_output = "" if top_pairs: pairs_output += f"### Top {len(top_pairs)} Technology Pairs (selected from the relevant technologies above, based on their inter-similarity):\n\n" for pair_names, score in top_pairs: pairs_output += f"- **{pair_names[0]} + {pair_names[1]}** (Inter-Similarity: {score:.3f})\n" pairs_output += "\n---\n" # Note: The "else" case message will be added during final output assembly # 4. Search for Solutions using the Top Pairs solution_output = search_solutions_for_pairs(problem_description, top_pairs) print("API search for solutions completed.") # 5. Combine Outputs for Gradio --- CORRECTED SECTION --- final_output = ( f"## Analysis Results for: \"{problem_description[:150]}...\"\n\n" f"{category_output}\n\n" f"{tech_output}" # Intentionally left blank line above for structure ) # Add the pairs section conditionally - This avoids the backslash issue if top_pairs: final_output += pairs_output # pairs_output already contains formatting and separators else: # Add the "no pairs" message directly here final_output += "No technology pairs identified to search with.\n\n---\n" # Add the solution output final_output += solution_output # --- END OF CORRECTION --- print("--- Processing finished ---") return final_output # --- Create Gradio Interface --- print("Setting up Gradio interface...") # Load data only once when the script starts try: load_data_and_model() interface_enabled = True except Exception as e: print(f"FATAL: Failed to initialize application. Error: {e}") interface_enabled = False # Only create interface if initialization succeeded if interface_enabled: iface = gr.Interface( fn=process_problem, inputs=gr.Textbox(lines=5, label="Enter Technical Problem Description", placeholder="Describe your technical challenge or requirement here... e.g., 'Develop low-latency communication protocols for 6G networks'"), outputs=gr.Markdown(label="Analysis and Potential Solutions"), title="Technical Problem Analyzer v4 (Cross-Category Relevance)", description=( "Enter a technical problem. The app:\n" "1. Identifies the best matching **category** (for informational purposes).\n" "2. Finds the **most relevant technologies** based *directly on your problem description* (across all categories).\n" "3. Identifies **promising pairs** among these relevant technologies based on their similarity to each other.\n" "4. Searches for **patents/research** using these pairs via an external API." ), examples=[ ["How can I establish reliable communication between low-orbit satellites for continuous global monitoring?"], ["Need a system to automatically detect anomalies in sensor data from industrial machinery using machine learning."], ["Develop low-latency communication protocols for 6G networks"], ["Design efficient routing algorithms for large scale mesh networks in smart cities"], ["Create biodegradable packaging material from agricultural waste"], # Example crossing categories potentially ["Develop a method for real-time traffic prediction using heterogeneous data sources"] ], allow_flagging='never', # Add theme for better visuals if desired # theme=gr.themes.Soft() ) else: # Provide a dummy interface indicating failure def error_fn(): return "Application failed to initialize. Please check the logs for errors (e.g., missing files or model issues)." iface = gr.Interface(fn=error_fn, inputs=[], outputs=gr.Markdown(), title="Initialization Failed") # --- Launch the App --- if __name__ == "__main__": print("Launching Gradio app...") # Consider adding share=True for public link if running on appropriate infra # debug=True can be helpful during development iface.launch()