Spaces:
Sleeping
Sleeping
| import logging | |
| import re | |
| from typing import List | |
| from functools import lru_cache | |
| from src.similarity_model import ( | |
| compare_two_ideas, | |
| find_similar_projects | |
| ) | |
| from src.recommendation_engine.config import ( | |
| IDEA_DUPLICATE_THRESHOLD, | |
| FEATURE_DUPLICATE_THRESHOLD | |
| ) | |
| logger = logging.getLogger(__name__) | |
| GENERIC_PATTERNS = [ | |
| "dashboard", | |
| "platform", | |
| "system", | |
| "application", | |
| "website", | |
| "ai module", | |
| "analytics module", | |
| "smart system", | |
| "management system" | |
| ] | |
| def normalize(text: str) -> str: | |
| text = str(text).lower().strip() | |
| text = re.sub(r"[^a-z0-9\s]", " ", text) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def is_generic(text: str) -> bool: | |
| low = normalize(text) | |
| for pattern in GENERIC_PATTERNS: | |
| if pattern in low: | |
| return True | |
| return False | |
| def token_overlap_score(a: str, b: str) -> float: | |
| a_tokens = set(normalize(a).split()) | |
| b_tokens = set(normalize(b).split()) | |
| if not a_tokens or not b_tokens: | |
| return 0.0 | |
| overlap = len(a_tokens & b_tokens) | |
| union = len(a_tokens | b_tokens) | |
| return overlap / union | |
| def is_feature_novel( | |
| feature: str, | |
| existing_features: List[str] | |
| ) -> bool: | |
| feature = normalize(feature) | |
| if not feature: | |
| return False | |
| if is_generic(feature): | |
| logger.debug(f"[GENERIC FEATURE] {feature}") | |
| return False | |
| existing_norm = [ | |
| normalize(f) | |
| for f in existing_features | |
| ] | |
| if feature in existing_norm: | |
| logger.debug(f"[EXACT DUP] {feature}") | |
| return False | |
| for old in existing_norm: | |
| if not old: | |
| continue | |
| semantic_score = compare_two_ideas( | |
| feature, | |
| old | |
| ) | |
| overlap_score = token_overlap_score( | |
| feature, | |
| old | |
| ) | |
| final_score = max( | |
| semantic_score, | |
| overlap_score | |
| ) | |
| logger.debug( | |
| f"[COMPARE] {feature} ~ {old} " | |
| f"(semantic={semantic_score:.2f}, " | |
| f"overlap={overlap_score:.2f})" | |
| ) | |
| if final_score >= ( | |
| FEATURE_DUPLICATE_THRESHOLD + 0.08 | |
| ): | |
| logger.debug( | |
| f"[FEATURE DUPLICATE] " | |
| f"{feature} ~ {old}" | |
| ) | |
| return False | |
| return True | |
| def filter_duplicate_features( | |
| generated_features: List[str], | |
| existing_features: List[str] | |
| ) -> List[str]: | |
| final = [] | |
| seen = set() | |
| for feat in generated_features: | |
| clean = str(feat).strip() | |
| norm = normalize(clean) | |
| if not clean: | |
| continue | |
| if norm in seen: | |
| continue | |
| if not is_feature_novel( | |
| norm, | |
| existing_features + final | |
| ): | |
| continue | |
| seen.add(norm) | |
| final.append(clean) | |
| return final | |
| def _cached_db_check(idea: str) -> float: | |
| try: | |
| results = find_similar_projects( | |
| title=idea, | |
| description=idea, | |
| top_k=3 | |
| ) | |
| if ( | |
| hasattr(results, "iloc") | |
| and len(results) > 0 | |
| ): | |
| scores = [] | |
| for _, row in results.iterrows(): | |
| score = float( | |
| row.get("hybrid_score", 0) | |
| ) | |
| scores.append(score) | |
| if scores: | |
| return max(scores) | |
| except Exception as e: | |
| logger.warning(f"[DB ERROR] {e}") | |
| return 0.0 | |
| def is_idea_novel(idea_title: str) -> bool: | |
| idea_title = normalize(idea_title) | |
| if not idea_title: | |
| return False | |
| if is_generic(idea_title): | |
| logger.info( | |
| f"[GENERIC IDEA REJECTED] " | |
| f"{idea_title}" | |
| ) | |
| return False | |
| score = _cached_db_check(idea_title) | |
| logger.info(f"[DB CHECK] {idea_title}") | |
| logger.info(f"[SIMILARITY SCORE] {score:.4f}") | |
| return score < IDEA_DUPLICATE_THRESHOLD | |
| def score_feature_novelty( | |
| feature: str, | |
| existing_features: List[str] | |
| ) -> float: | |
| feature = normalize(feature) | |
| if not existing_features: | |
| return 1.0 | |
| scores = [] | |
| for old in existing_features: | |
| old_norm = normalize(old) | |
| semantic = compare_two_ideas( | |
| feature, | |
| old_norm | |
| ) | |
| overlap = token_overlap_score( | |
| feature, | |
| old_norm | |
| ) | |
| scores.append( | |
| max(semantic, overlap) | |
| ) | |
| if not scores: | |
| return 1.0 | |
| return round( | |
| 1.0 - max(scores), | |
| 4 | |
| ) | |