Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import pandas as pd | |
| import json | |
| from sentence_transformers import SentenceTransformer, util | |
| import torch | |
| import requests | |
| import re | |
| import urllib.parse | |
| import itertools # For generating pairs | |
| import os | |
| import io # Required for Google Drive upload | |
| # --- Configuration --- | |
| CATEGORY_JSON_PATH = "categories.json" | |
| TECHNOLOGY_EXCEL_PATH = "technologies.xlsx" | |
| MODEL_NAME = 'all-MiniLM-L6-v2' | |
| CATEGORY_SIMILARITY_THRESHOLD = 0.3 # Threshold for *displaying* the best category match | |
| MAX_TECHNOLOGIES_TO_SHOW = 8 # Max technologies relevant to the problem (selected across ALL categories) | |
| MAX_TECHNOLOGY_PAIRS_TO_SEARCH = 5 # Max pairs (from the relevant tech) to use for solution search | |
| MAX_SEARCH_REFERENCES_PER_PAIR = 3 # Max references from the API per pair | |
| SEARCH_API_URL = "https://ychkhan-ptt-endpoints.hf.space/search" | |
| # --- Global Variables --- | |
| # To store pre-computed embeddings and data | |
| categories_data = {} | |
| category_names = [] | |
| category_embeddings = None | |
| technologies_df = pd.DataFrame() | |
| technology_embeddings = None # Will store pre-computed embeddings for descriptions | |
| model = None | |
| ###- GOOGLE DRIVE API | |
| # Check if running in an environment where Google Credentials are set | |
| # Use placeholder credentials if not found, but functionality will fail | |
| GOOGLE_CREDENTIALS = os.environ.get("GOOGLE_CREDENTIALS") | |
| FOLDER_ID = os.getenv("FOLDER_ID") # Optional: Folder ID for uploads | |
| # Only import Google libraries if credentials are potentially available | |
| if GOOGLE_CREDENTIALS: | |
| try: | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload, MediaIoBaseUpload | |
| GOOGLE_API_AVAILABLE = True | |
| print("Google API libraries loaded.") | |
| except ImportError: | |
| print("Warning: Google API libraries not found. Google Drive upload will be disabled.") | |
| GOOGLE_API_AVAILABLE = False | |
| else: | |
| print("Warning: GOOGLE_CREDENTIALS environment variable not set. Google Drive upload will be disabled.") | |
| GOOGLE_API_AVAILABLE = False | |
| # Define dummy functions or handle calls gracefully if needed elsewhere | |
| def create_new_file_in_drive(*args, **kwargs): | |
| print("Google Drive upload skipped: Credentials not configured.") | |
| return None | |
| if GOOGLE_API_AVAILABLE: | |
| def create_new_file_in_drive(username, dataframe_to_upload, credentials_json_str, folder_id): | |
| """Crée un nouveau fichier CSV dans Google Drive à partir d'un DataFrame Pandas.""" | |
| print(f"Attempting to upload results for user: {username}") | |
| if not credentials_json_str: | |
| print("Error: Google Credentials JSON string is empty.") | |
| return None | |
| if not folder_id: | |
| print("Warning: Google Drive FOLDER_ID not specified. Upload might fail or go to root.") | |
| # Decide if you want to default to root or fail | |
| # return None # Option: Fail if no folder ID | |
| try: | |
| creds_dict = json.loads(credentials_json_str) | |
| except json.JSONDecodeError as e: | |
| print(f"Error decoding Google Credentials JSON: {e}") | |
| return None | |
| try: | |
| # Charger les informations d'identification du compte de service | |
| creds = service_account.Credentials.from_service_account_info(creds_dict) | |
| # Construire le service API Drive | |
| service = build('drive', 'v3', credentials=creds) | |
| # Convertir le DataFrame en fichier CSV en mémoire | |
| csv_buffer = io.BytesIO() | |
| # Ensure UTF-8 encoding, especially with BOM for Excel compatibility if needed | |
| dataframe_to_upload.to_csv(csv_buffer, index=False, sep=';', encoding='utf-8-sig') | |
| csv_buffer.seek(0) | |
| # Créer les métadonnées du fichier | |
| filename = f"rating-results-{username}.csv" # Consider adding a timestamp | |
| file_metadata = {'name': filename} | |
| if folder_id: | |
| file_metadata['parents'] = [folder_id] | |
| # Télécharger le fichier CSV sur Google Drive | |
| media = MediaIoBaseUpload(csv_buffer, mimetype='text/csv', resumable=True) | |
| file = service.files().create(body=file_metadata, media_body=media, fields='id, name, webViewLink').execute() | |
| print(f"File '{file.get('name')}' created successfully in Google Drive. ID: {file.get('id')}") | |
| print(f"Link: {file.get('webViewLink')}") # Optional: print link | |
| return file.get('id') | |
| except Exception as e: | |
| print(f"Error during Google Drive upload: {e}") | |
| # Consider more specific error handling (e.g., authentication errors) | |
| return None | |
| ###- | |
| # --- Load Data and Model (Load once at startup) --- | |
| def load_data_and_model(): | |
| global categories_data, category_names, category_embeddings | |
| global technologies_df, technology_embeddings, model | |
| print("Loading data and model...") | |
| try: | |
| # Load Categories | |
| with open(CATEGORY_JSON_PATH, 'r', encoding='utf-8') as f: # Specify encoding | |
| categories_data = json.load(f)["Category"] | |
| category_names = list(categories_data.keys()) | |
| category_texts = [f"{name}: {', '.join(keywords)}" for name, keywords in categories_data.items()] | |
| print(f"Loaded {len(category_names)} categories.") | |
| # Load Technologies | |
| technologies_df = pd.read_excel(TECHNOLOGY_EXCEL_PATH) | |
| # Clean column names (remove leading/trailing spaces) | |
| technologies_df.columns = technologies_df.columns.str.strip() | |
| # Ensure required columns exist | |
| if 'technology' not in technologies_df.columns or 'description' not in technologies_df.columns: | |
| raise ValueError("Missing required columns 'technology' or 'description' in technologies.xlsx") | |
| technologies_df['category'] = technologies_df.get('category', '').fillna('').astype(str) # Use .get for optional category | |
| technologies_df['description_clean'] = technologies_df['description'].fillna('').astype(str) | |
| # Add a unique ID if 'technology' name isn't unique or for easier embedding mapping | |
| technologies_df['tech_id'] = technologies_df.index | |
| print(f"Loaded {len(technologies_df)} technologies.") | |
| # Load Sentence Transformer Model | |
| model = SentenceTransformer(MODEL_NAME) | |
| print(f"Loaded Sentence Transformer model: {MODEL_NAME}") | |
| # Pre-compute category embeddings | |
| print("Computing category embeddings...") | |
| category_embeddings = model.encode(category_texts, convert_to_tensor=True, show_progress_bar=True) | |
| print("Category embeddings computed.") | |
| # Pre-compute technology description embeddings | |
| print("Computing technology description embeddings...") | |
| valid_descriptions = technologies_df['description_clean'].tolist() | |
| technology_embeddings = model.encode(valid_descriptions, convert_to_tensor=True, show_progress_bar=True) | |
| print(f"Technology description embeddings computed (shape: {technology_embeddings.shape}).") | |
| except FileNotFoundError as e: | |
| print(f"ERROR: File not found - {e}. Please ensure '{CATEGORY_JSON_PATH}' and '{TECHNOLOGY_EXCEL_PATH}' exist.") | |
| raise e | |
| except Exception as e: | |
| print(f"ERROR loading data or model: {e}") | |
| raise e | |
| # --- Helper Functions --- | |
| def find_best_category(problem_description): | |
| """ | |
| Finds the most relevant category using pre-computed embeddings. | |
| This is now primarily for informational output. | |
| """ | |
| if not problem_description or not category_names or category_embeddings is None: | |
| return None, 0.0 | |
| try: | |
| problem_embedding = model.encode(problem_description, convert_to_tensor=True) | |
| cosine_scores = util.pytorch_cos_sim(problem_embedding, category_embeddings)[0] | |
| best_score, best_idx = torch.max(cosine_scores, dim=0) | |
| # Return the best category regardless of threshold, but indicate confidence | |
| best_category_name = category_names[best_idx.item()] | |
| best_category_score = best_score.item() | |
| # Decide if the match is confident enough to strongly suggest | |
| is_confident = best_category_score >= CATEGORY_SIMILARITY_THRESHOLD | |
| return best_category_name, best_category_score, is_confident | |
| except Exception as e: | |
| print(f"Error during category finding: {e}") | |
| return None, 0.0, False | |
| # --- MODIFIED FUNCTION --- | |
| def find_relevant_technologies(problem_description): | |
| """ | |
| Calculates similarity between the problem description and ALL technology | |
| descriptions using pre-computed embeddings, sorts, and returns the top results. | |
| Category is no longer used for filtering here. | |
| """ | |
| all_tech_data = [] | |
| if technologies_df.empty or technology_embeddings is None or not problem_description: | |
| print("Warning: Technologies DF, embeddings, or problem description missing.") | |
| return pd.DataFrame() | |
| try: | |
| problem_embedding = model.encode(problem_description, convert_to_tensor=True) | |
| # Iterate through ALL technologies | |
| for index, row in technologies_df.iterrows(): | |
| tech_id = row['tech_id'] # Use the pre-assigned index/id | |
| # Ensure tech_id is within the bounds of the embeddings tensor | |
| if tech_id >= technology_embeddings.shape[0]: | |
| print(f"Warning: tech_id {tech_id} is out of bounds for technology_embeddings (shape: {technology_embeddings.shape}). Skipping.") | |
| continue | |
| # Retrieve pre-computed embedding using tech_id | |
| tech_embedding = technology_embeddings[tech_id] | |
| # Calculate similarity score with the problem | |
| # Ensure embeddings are compatible (e.g., both are single vectors) | |
| if problem_embedding.ndim == 1: | |
| problem_embedding_exp = problem_embedding.unsqueeze(0) # Add batch dimension if needed | |
| else: | |
| problem_embedding_exp = problem_embedding | |
| if tech_embedding.ndim == 1: | |
| tech_embedding_exp = tech_embedding.unsqueeze(0) | |
| else: | |
| tech_embedding_exp = tech_embedding | |
| similarity_score = util.pytorch_cos_sim(problem_embedding_exp, tech_embedding_exp)[0][0].item() | |
| # Store the original row data and the similarity score | |
| all_tech_data.append({'data': row.to_dict(), 'similarity_score_problem': similarity_score}) | |
| # Sort technologies based on similarity to the problem (descending) | |
| all_tech_data.sort(key=lambda item: item['similarity_score_problem'], reverse=True) | |
| if not all_tech_data: | |
| print("No technologies found or scored.") | |
| return pd.DataFrame() | |
| # Create DataFrame from the top N results | |
| # Extract the 'data' part (which is a dict) for DataFrame creation | |
| top_tech_rows = [item['data'] for item in all_tech_data[:MAX_TECHNOLOGIES_TO_SHOW]] | |
| # Extract the corresponding scores | |
| top_tech_scores = [item['similarity_score_problem'] for item in all_tech_data[:MAX_TECHNOLOGIES_TO_SHOW]] | |
| if not top_tech_rows: | |
| return pd.DataFrame() | |
| relevant_df = pd.DataFrame(top_tech_rows) | |
| # Important: Ensure the index aligns if you add the score column later | |
| relevant_df = relevant_df.reset_index(drop=True) | |
| relevant_df['similarity_score_problem'] = top_tech_scores # Add scores as a new column | |
| # print(f"Top relevant technologies DF head:\n{relevant_df.head()}") # Debug print | |
| return relevant_df # Return the top N technologies based on problem similarity | |
| except Exception as e: | |
| print(f"Error during technology finding/scoring: {e}") | |
| import traceback | |
| traceback.print_exc() # Print full traceback for debugging | |
| return pd.DataFrame() | |
| def find_top_technology_pairs(relevant_technologies_df): | |
| """ | |
| Calculates similarity between pairs of the identified relevant technologies | |
| (which were selected based on problem similarity) and returns the top pairs. | |
| Uses pre-computed embeddings. | |
| """ | |
| if relevant_technologies_df.empty or len(relevant_technologies_df) < 2 or technology_embeddings is None: | |
| # print("Warning: Not enough relevant technologies (<2) or embeddings missing for pairing.") | |
| return [] | |
| pairs_with_scores = [] | |
| # Use tech_id (which should be the original index) to reliably get embeddings | |
| # Check if 'tech_id' column exists in the relevant_technologies_df | |
| if 'tech_id' not in relevant_technologies_df.columns: | |
| print("Error: 'tech_id' column missing in relevant_technologies_df. Cannot proceed with pairing.") | |
| return [] | |
| tech_ids = relevant_technologies_df['tech_id'].tolist() | |
| # Create a mapping from tech_id back to the technology name in the relevant subset for easy lookup | |
| tech_id_to_name = pd.Series(relevant_technologies_df['technology'].values, index=relevant_technologies_df['tech_id']).to_dict() | |
| # Generate unique pairs of tech_ids from the relevant list | |
| for id_a, id_b in itertools.combinations(tech_ids, 2): | |
| try: | |
| # Retrieve pre-computed embeddings using the original index (tech_id) | |
| # Add boundary checks again just in case | |
| if id_a >= technology_embeddings.shape[0] or id_b >= technology_embeddings.shape[0]: | |
| print(f"Warning: tech_id {id_a} or {id_b} out of bounds for embeddings. Skipping pair.") | |
| continue | |
| embedding_a = technology_embeddings[id_a] | |
| embedding_b = technology_embeddings[id_b] | |
| # Ensure embeddings are 1D or correctly shaped for cos_sim | |
| if embedding_a.ndim > 1: embedding_a = embedding_a.squeeze() | |
| if embedding_b.ndim > 1: embedding_b = embedding_b.squeeze() | |
| if embedding_a.ndim == 0 or embedding_b.ndim == 0: # Check if squeeze resulted in 0-dim tensor | |
| print(f"Warning: Invalid embedding dimension after squeeze for pair ({id_a}, {id_b}). Skipping.") | |
| continue | |
| # Calculate inter-technology similarity | |
| inter_similarity = util.pytorch_cos_sim(embedding_a, embedding_b)[0][0].item() | |
| # Get technology names using the mapping created earlier | |
| tech_name_a = tech_id_to_name.get(id_a, f"Unknown Tech (ID:{id_a})") | |
| tech_name_b = tech_id_to_name.get(id_b, f"Unknown Tech (ID:{id_b})") | |
| # Clean names for display/use | |
| clean_tech_name_a = re.sub(r'^- Title\s*:\s*', '', str(tech_name_a)).strip() | |
| clean_tech_name_b = re.sub(r'^- Title\s*:\s*', '', str(tech_name_b)).strip() | |
| pairs_with_scores.append(((clean_tech_name_a, clean_tech_name_b), inter_similarity)) | |
| except IndexError: | |
| print(f"Warning: Could not find pre-computed embedding for index {id_a} or {id_b}. Skipping pair.") | |
| continue | |
| except Exception as e: | |
| print(f"Error calculating similarity for pair ({id_a}, {id_b}): {e}") | |
| import traceback | |
| traceback.print_exc() | |
| continue | |
| # Sort pairs by inter-similarity score (descending) | |
| pairs_with_scores.sort(key=lambda item: item[1], reverse=True) | |
| # Return the top K pairs | |
| # print(f"Top pairs identified: {pairs_with_scores[:MAX_TECHNOLOGY_PAIRS_TO_SEARCH]}") # Debug print | |
| return pairs_with_scores[:MAX_TECHNOLOGY_PAIRS_TO_SEARCH] | |
| def search_solutions_for_pairs(problem_description, top_pairs): | |
| """ | |
| Searches for solutions/patents using pairs of technologies via the API. | |
| """ | |
| results = {} # Store results keyed by the pair tuple | |
| if not top_pairs or not problem_description: | |
| # Provide a more informative message if no pairs were generated | |
| if not top_pairs: | |
| return "No relevant technology pairs were identified (need at least 2 relevant technologies). Cannot search for solutions.\n" | |
| else: # problem_description must be missing | |
| return "Problem description is missing. Cannot search for solutions.\n" | |
| headers = {'accept': 'application/json'} | |
| for pair_info in top_pairs: | |
| pair_names, pair_score = pair_info | |
| tech_a_name, tech_b_name = pair_names | |
| if not tech_a_name or not tech_b_name: continue # Skip if names are invalid | |
| # Construct query for the API | |
| # Focus query on tech combination and context (patent/research) | |
| # Keep problem description out of the API query unless the API is designed for it | |
| # query = f'"{tech_a_name}" AND "{tech_b_name}" patent OR research paper OR application' | |
| # More targeted query: | |
| query = f'Combining {tech_a_name} and {tech_b_name} for applications related to {problem_description}' # Use snippet of problem | |
| params = { | |
| 'query': query, | |
| 'max_references': MAX_SEARCH_REFERENCES_PER_PAIR | |
| } | |
| encoded_params = urllib.parse.urlencode(params, quote_via=urllib.parse.quote) # Ensure proper encoding | |
| full_url = f"{SEARCH_API_URL}?{encoded_params}" | |
| pair_key = f"{tech_a_name} + {tech_b_name}" # Key for storing results | |
| print(f"Calling API for pair ({pair_key}): POST {SEARCH_API_URL} with query: {query}") # Log query separately | |
| try: | |
| # Using POST as originally indicated, send params in the body (common for longer queries) | |
| # If API expects GET, change to requests.get(full_url, headers=headers) | |
| response = requests.post(SEARCH_API_URL, headers=headers, params=params, timeout=45) # Increased timeout | |
| response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
| try: | |
| api_response = response.json() | |
| except json.JSONDecodeError: | |
| err_msg = f"API Error: Invalid JSON response. Status: {response.status_code}, Response text: {response.text[:200]}" | |
| print(f"Error decoding JSON response for pair '{pair_key}'. {err_msg}") | |
| results[pair_key] = {"score": pair_score, "error": err_msg} | |
| continue # Skip to next pair | |
| search_results = [] | |
| # --- Adapt based on actual API response structure --- | |
| if isinstance(api_response, list): | |
| search_results = api_response # Assumes list of dicts like {'title': '...', 'url': '...'} | |
| elif isinstance(api_response, dict) and 'results' in api_response and isinstance(api_response['results'], list): | |
| search_results = api_response['results'] | |
| elif isinstance(api_response, dict) and 'references' in api_response and isinstance(api_response['references'], list): | |
| # Handle potential alternative key name | |
| search_results = api_response['references'] | |
| else: | |
| print(f"Warning: Unexpected API response format for pair '{pair_key}'. Response: {api_response}") | |
| # Attempt to extract links if possible, otherwise mark as no results | |
| # This part needs adjustment based on observed API responses | |
| search_results = [] # Default to empty if format unknown | |
| # --- End adaptation --- | |
| valid_links = [] | |
| for r in search_results: | |
| if isinstance(r, dict): | |
| title = r.get('title', 'N/A') | |
| url = r.get('url', r.get('link')) # Check for 'url' or 'link' | |
| if url and isinstance(url, str) and url.startswith(('http://', 'https://')): | |
| valid_links.append({'title': title, 'link': url}) | |
| elif url: | |
| print(f"Warning: Invalid or missing URL for result '{title}' in pair '{pair_key}': {url}") | |
| results[pair_key] = { | |
| "score": pair_score, # Store pair score for context | |
| "links": valid_links | |
| } | |
| except requests.exceptions.Timeout: | |
| print(f"Error: API call timed out for pair '{pair_key}'") | |
| results[pair_key] = {"score": pair_score, "error": "API Timeout"} | |
| except requests.exceptions.HTTPError as e: | |
| print(f"Error: HTTP Error calling search API for pair '{pair_key}': {e}") | |
| results[pair_key] = {"score": pair_score, "error": f"API HTTP Error: {e.response.status_code}"} | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error calling search API for pair '{pair_key}': {e}") | |
| results[pair_key] = {"score": pair_score, "error": f"API Request Error: {e}"} | |
| except Exception as e: | |
| err_msg = f"Unexpected Error during API call: {e}" | |
| print(f"Unexpected error during API call for pair '{pair_key}': {e}") | |
| import traceback | |
| traceback.print_exc() | |
| results[pair_key] = {"score": pair_score, "error": err_msg} | |
| # Format results for display | |
| output = f"### Potential Solutions & Patents (Found using Top {len(results)} Technology Pairs):\n\n" | |
| if not results: | |
| output += "No search results could be retrieved from the API for the generated technology pairs." | |
| return output | |
| # Display results in the order they were searched (already sorted by pair score) | |
| for pair_key, search_data in results.items(): | |
| pair_score = search_data.get('score', 0.0) | |
| output += f"**For Technology Pair: {pair_key}** (Inter-Similarity Score: {pair_score:.3f})\n" # More precision | |
| if "error" in search_data: | |
| output += f"- *Search failed: {search_data['error']}*\n" | |
| elif "links" in search_data: | |
| links = search_data["links"] | |
| if links: | |
| for link_info in links: | |
| # Ensure title is a string before replacing | |
| title_str = str(link_info.get('title', 'N/A')) | |
| # Basic sanitization for Markdown display | |
| title_sanitized = title_str.replace('[','(').replace(']',')') | |
| output += f"- [{title_sanitized}]({link_info.get('link', '#')})\n" | |
| else: | |
| output += "- *No specific results found by the API for this technology pair.*\n" | |
| else: | |
| output += "- *Unknown search result state.*\n" | |
| output += "\n" # Add space between pairs | |
| return output | |
| # --- Main Processing Function --- | |
| def process_problem(problem_description): | |
| """ | |
| Main function called by Gradio interface. Orchestrates the process. | |
| """ | |
| print(f"\n--- Processing request for: '{problem_description[:100]}...' ---") # Log start | |
| if not problem_description: | |
| return "Please enter a problem description." | |
| # 1. Categorize Problem (Informational) | |
| category_name, cat_score, is_confident = find_best_category(problem_description) | |
| if category_name: | |
| confidence_text = "(Confident Match)" if is_confident else "(Possible Match)" | |
| category_output = f"**Best Matching Category:** {category_name} {confidence_text} (Similarity Score: {cat_score:.3f})" | |
| else: | |
| category_output = "**Could not identify a matching category.**" | |
| print(f"Category identified: {category_name} (Score: {cat_score:.3f}, Confident: {is_confident})") | |
| # 2. Find Relevant Technologies (relative to problem, across ALL categories) | |
| relevant_technologies_df = find_relevant_technologies(problem_description) | |
| print(f"Found {len(relevant_technologies_df)} relevant technologies based on problem similarity.") | |
| tech_output = "" | |
| if not relevant_technologies_df.empty: | |
| tech_output += f"### Top {len(relevant_technologies_df)} Most Relevant Technologies (selected based on similarity to your problem):\n\n" | |
| for _, row in relevant_technologies_df.iterrows(): | |
| tech_name = re.sub(r'^- Title\s*:\s*', '', str(row.get('technology', 'N/A'))).strip() | |
| problem_relevance = row.get('similarity_score_problem', 0.0) | |
| tech_output += f"- **{tech_name}** (Problem Relevance: {problem_relevance:.3f})\n" | |
| original_cats = str(row.get('category', 'Unknown')).strip() | |
| if original_cats: | |
| tech_output += f" *Original Category listed as: {original_cats}*\n" | |
| tech_output += "\n---\n" # Add separator | |
| else: | |
| tech_output = "Could not identify any relevant technologies based on the problem description.\n\n---\n" | |
| # 3. Find Top Technology Pairs (based on inter-similarity among the relevant ones) | |
| top_pairs = find_top_technology_pairs(relevant_technologies_df) | |
| print(f"Identified {len(top_pairs)} top technology pairs for searching.") | |
| pairs_output = "" | |
| if top_pairs: | |
| pairs_output += f"### Top {len(top_pairs)} Technology Pairs (selected from the relevant technologies above, based on their inter-similarity):\n\n" | |
| for pair_names, score in top_pairs: | |
| pairs_output += f"- **{pair_names[0]} + {pair_names[1]}** (Inter-Similarity: {score:.3f})\n" | |
| pairs_output += "\n---\n" | |
| # Note: The "else" case message will be added during final output assembly | |
| # 4. Search for Solutions using the Top Pairs | |
| solution_output = search_solutions_for_pairs(problem_description, top_pairs) | |
| print("API search for solutions completed.") | |
| # 5. Combine Outputs for Gradio --- CORRECTED SECTION --- | |
| final_output = ( | |
| f"## Analysis Results for: \"{problem_description[:150]}...\"\n\n" | |
| f"{category_output}\n\n" | |
| f"{tech_output}" | |
| # Intentionally left blank line above for structure | |
| ) | |
| # Add the pairs section conditionally - This avoids the backslash issue | |
| if top_pairs: | |
| final_output += pairs_output # pairs_output already contains formatting and separators | |
| else: | |
| # Add the "no pairs" message directly here | |
| final_output += "No technology pairs identified to search with.\n\n---\n" | |
| # Add the solution output | |
| final_output += solution_output | |
| # --- END OF CORRECTION --- | |
| print("--- Processing finished ---") | |
| return final_output | |
| # --- Create Gradio Interface --- | |
| print("Setting up Gradio interface...") | |
| # Load data only once when the script starts | |
| try: | |
| load_data_and_model() | |
| interface_enabled = True | |
| except Exception as e: | |
| print(f"FATAL: Failed to initialize application. Error: {e}") | |
| interface_enabled = False | |
| # Only create interface if initialization succeeded | |
| if interface_enabled: | |
| iface = gr.Interface( | |
| fn=process_problem, | |
| inputs=gr.Textbox(lines=5, label="Enter Technical Problem Description", placeholder="Describe your technical challenge or requirement here... e.g., 'Develop low-latency communication protocols for 6G networks'"), | |
| outputs=gr.Markdown(label="Analysis and Potential Solutions"), | |
| title="Technical Problem Analyzer v4 (Cross-Category Relevance)", | |
| description=( | |
| "Enter a technical problem. The app:\n" | |
| "1. Identifies the best matching **category** (for informational purposes).\n" | |
| "2. Finds the **most relevant technologies** based *directly on your problem description* (across all categories).\n" | |
| "3. Identifies **promising pairs** among these relevant technologies based on their similarity to each other.\n" | |
| "4. Searches for **patents/research** using these pairs via an external API." | |
| ), | |
| examples=[ | |
| ["How can I establish reliable communication between low-orbit satellites for continuous global monitoring?"], | |
| ["Need a system to automatically detect anomalies in sensor data from industrial machinery using machine learning."], | |
| ["Develop low-latency communication protocols for 6G networks"], | |
| ["Design efficient routing algorithms for large scale mesh networks in smart cities"], | |
| ["Create biodegradable packaging material from agricultural waste"], # Example crossing categories potentially | |
| ["Develop a method for real-time traffic prediction using heterogeneous data sources"] | |
| ], | |
| allow_flagging='never', | |
| # Add theme for better visuals if desired | |
| # theme=gr.themes.Soft() | |
| ) | |
| else: | |
| # Provide a dummy interface indicating failure | |
| def error_fn(): | |
| return "Application failed to initialize. Please check the logs for errors (e.g., missing files or model issues)." | |
| iface = gr.Interface(fn=error_fn, inputs=[], outputs=gr.Markdown(), title="Initialization Failed") | |
| # --- Launch the App --- | |
| if __name__ == "__main__": | |
| print("Launching Gradio app...") | |
| # Consider adding share=True for public link if running on appropriate infra | |
| # debug=True can be helpful during development | |
| iface.launch() |