Spaces:
Sleeping
Sleeping
| """ | |
| AI NLP Processing Pipeline Template | |
| 1. Text Preprocessing & Cleaning (text_preprocessor.py) | |
| 2. Emotion Analysis | |
| 3. Keyword & Topic Extraction | |
| 4. Entity Recognition | |
| 5. Embedding Generation | |
| 6. Store in MongoDB + FAISS | |
| """ | |
| from typing import Dict, List, Optional | |
| import logging | |
| import json | |
| import os | |
| from urllib import error, request | |
| from backend.connection import get_collection | |
| from backend.crud import update_memory_with_nlp | |
| from backend.text_preprocessor import TextPreprocessor, preprocess_unprocessed_memories | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ---------------------------------------------------------------------- | |
| # Helper functions for Hugging Face API calls | |
| # ---------------------------------------------------------------------- | |
| def _get_hf_timeout_seconds() -> int: | |
| value = os.getenv("HF_INFERENCE_TIMEOUT_SECONDS", "20") | |
| try: | |
| return int(value) | |
| except ValueError: | |
| return 20 | |
| def _hf_inference_endpoints(model_id: str) -> List[str]: | |
| explicit_base = os.getenv("HF_INFERENCE_BASE_URL", "").strip().rstrip("/") | |
| endpoints: List[str] = [] | |
| if explicit_base: | |
| endpoints.append(f"{explicit_base}/{model_id}") | |
| endpoints.extend([ | |
| f"https://router.huggingface.co/hf-inference/models/{model_id}", | |
| f"https://api-inference.huggingface.co/models/{model_id}", | |
| ]) | |
| deduped: List[str] = [] | |
| seen = set() | |
| for endpoint in endpoints: | |
| if endpoint not in seen: | |
| deduped.append(endpoint) | |
| seen.add(endpoint) | |
| return deduped | |
| # ---------------------------------------------------------------------- | |
| # Config helpers | |
| # ---------------------------------------------------------------------- | |
| def _get_keybert_top_n() -> int: | |
| value = os.getenv("KEYBERT_TOP_N", "8") | |
| try: | |
| parsed = int(value) | |
| return max(1, min(parsed, 20)) | |
| except ValueError: | |
| return 8 | |
| def _get_topic_candidate_labels() -> List[str]: | |
| return [ | |
| "Work & Productivity", | |
| "Health & Wellness", | |
| "Emotions & Mental Health", | |
| "Relationships & Family", | |
| "Learning & Growth", | |
| "Finance", | |
| "Travel & Leisure", | |
| "Daily Life", | |
| ] | |
| def _get_topic_score_threshold() -> float: | |
| value = os.getenv("TOPIC_MIN_SCORE", "0.2") | |
| try: | |
| parsed = float(value) | |
| return max(0.0, min(parsed, 1.0)) | |
| except ValueError: | |
| return 0.2 | |
| def _get_topic_max_labels() -> int: | |
| value = os.getenv("TOPIC_MAX_LABELS", "2") | |
| try: | |
| parsed = int(value) | |
| return max(1, min(parsed, 5)) | |
| except ValueError: | |
| return 2 | |
| # ---------------------------------------------------------------------- | |
| # General helpers | |
| # ---------------------------------------------------------------------- | |
| def _flatten_hf_labels(payload: object) -> List[Dict[str, float]]: | |
| if not isinstance(payload, list): | |
| return [] | |
| if payload and isinstance(payload[0], list): | |
| candidates = payload[0] | |
| else: | |
| candidates = payload | |
| parsed: List[Dict[str, float]] = [] | |
| for item in candidates: | |
| if not isinstance(item, dict): | |
| continue | |
| label = str(item.get("label", "")).strip().lower() | |
| score = item.get("score", 0.0) | |
| try: | |
| parsed.append({"label": label, "score": float(score)}) | |
| except (TypeError, ValueError): | |
| continue | |
| return parsed | |
| def _dedupe_text_items(items: List[str]) -> List[str]: | |
| cleaned: List[str] = [] | |
| seen = set() | |
| for item in items: | |
| value = item.strip() | |
| if not value: | |
| continue | |
| key = value.lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| cleaned.append(value) | |
| return cleaned | |
| # ---------------------------------------------------------------------- | |
| # Text cleaning | |
| # ---------------------------------------------------------------------- | |
| def clean_text(text: str) -> str: | |
| preprocessor = TextPreprocessor() | |
| result = preprocessor.preprocess(text) | |
| return result["cleaned"] | |
| # ---------------------------------------------------------------------- | |
| # Emotion scoring (Hugging Face API) | |
| # ---------------------------------------------------------------------- | |
| EMOTION_BUCKET_LABELS = { | |
| "joy": {"joy", "amusement", "excitement", "optimism", "contentment", "happy", "excited", "content"}, | |
| "sadness": {"sadness", "disappointment", "grief", "remorse", "hurt", "lonely", "disappointed"}, | |
| "anger": {"anger", "annoyance", "rage", "frustration", "frustrated", "annoyed", "furious"}, | |
| "fear": {"fear", "nervousness", "anxiety", "worry", "anxious", "nervous", "worried"}, | |
| "surprise": {"surprise", "realization", "amazed", "amaze", "shocked"}, | |
| "disgust": {"disgust", "disapproval", "embarrassment", "dislike", "uncomfortable"}, | |
| } | |
| def _neutral_emotion_scores() -> Dict[str, float]: | |
| return { | |
| "joy": 0.0, | |
| "sadness": 0.0, | |
| "anger": 0.0, | |
| "fear": 0.0, | |
| "surprise": 0.0, | |
| "disgust": 0.0, | |
| } | |
| def _bucketize_emotions(label_scores: List[Dict[str, float]]) -> Dict[str, float]: | |
| bucket_scores = _neutral_emotion_scores() | |
| for item in label_scores: | |
| label = item["label"] | |
| score = float(item["score"]) | |
| for bucket, aliases in EMOTION_BUCKET_LABELS.items(): | |
| if label in aliases: | |
| bucket_scores[bucket] += score | |
| break | |
| total = sum(bucket_scores.values()) | |
| if total > 0: | |
| return {k: round(v / total, 4) for k, v in bucket_scores.items()} | |
| return bucket_scores | |
| def extract_emotion_scores(text: str) -> Dict[str, float]: | |
| if not text or not text.strip(): | |
| return _neutral_emotion_scores() | |
| hf_api_token = os.getenv("HF_API_TOKEN") | |
| if not hf_api_token: | |
| logger.warning("HF_API_TOKEN missing. Returning default emotion scores.") | |
| return _neutral_emotion_scores() | |
| hf_timeout_seconds = _get_hf_timeout_seconds() | |
| body = json.dumps({"inputs": text, "options": {"wait_for_model": True}}).encode("utf-8") | |
| for endpoint in _hf_inference_endpoints("AnasAlokla/multilingual_go_emotions"): | |
| req = request.Request( | |
| endpoint, | |
| data=body, | |
| method="POST", | |
| headers={ | |
| "Authorization": f"Bearer {hf_api_token}", | |
| "Content-Type": "application/json", | |
| }, | |
| ) | |
| try: | |
| with request.urlopen(req, timeout=hf_timeout_seconds) as res: | |
| payload = json.loads(res.read().decode("utf-8")) | |
| if isinstance(payload, dict) and payload.get("error"): | |
| logger.warning("Hugging Face API error from %s: %s", endpoint, payload.get("error")) | |
| continue | |
| label_scores = _flatten_hf_labels(payload) | |
| if label_scores: | |
| return _bucketize_emotions(label_scores) | |
| except error.HTTPError as e: | |
| logger.warning("HF HTTP error (%s) via %s", e.code, endpoint) | |
| except error.URLError as e: | |
| logger.warning("HF network error via %s: %s", endpoint, e.reason) | |
| except Exception as e: | |
| logger.warning("HF emotion scoring error via %s: %s", endpoint, str(e)) | |
| return _neutral_emotion_scores() | |
| # ---------------------------------------------------------------------- | |
| # Keyword extraction (lightweight) | |
| # ---------------------------------------------------------------------- | |
| def extract_keywords(text: str) -> List[str]: | |
| if not text or not text.strip(): | |
| return [] | |
| top_n = _get_keybert_top_n() | |
| try: | |
| preprocessor = TextPreprocessor() | |
| return preprocessor.extract_keywords(text, top_n=top_n) | |
| except Exception as e: | |
| logger.error("Keyword extraction failed: %s", str(e)) | |
| return [] | |
| # ---------------------------------------------------------------------- | |
| # Topic categorization | |
| # ---------------------------------------------------------------------- | |
| def _fallback_topic_classification(text: str) -> List[str]: | |
| topics = [] | |
| work_keywords = ["work", "email", "project", "deliverable", "deadline"] | |
| health_keywords = ["walk", "exercise", "sleep", "health", "tired"] | |
| mood_keywords = ["grateful", "happy", "sad", "anxious", "stressed"] | |
| text_lower = text.lower() | |
| if any(k in text_lower for k in work_keywords): | |
| topics.append("Work & Productivity") | |
| if any(k in text_lower for k in health_keywords): | |
| topics.append("Health & Wellness") | |
| if any(k in text_lower for k in mood_keywords): | |
| topics.append("Emotions & Mental Health") | |
| return topics or ["Daily Life"] | |
| def categorize_topics(text: str, keywords: List[str]) -> List[str]: | |
| if not text or not text.strip(): | |
| return ["Daily Life"] | |
| candidate_labels = _get_topic_candidate_labels() | |
| min_score = _get_topic_score_threshold() | |
| max_labels = _get_topic_max_labels() | |
| text_for_classification = text | |
| if keywords: | |
| text_for_classification = f"{text}\nKeywords: {', '.join(keywords[:10])}" | |
| hf_api_token = os.getenv("HF_API_TOKEN") | |
| if not hf_api_token: | |
| logger.warning("HF_API_TOKEN missing. Using fallback topic classification.") | |
| return _fallback_topic_classification(text) | |
| hf_timeout = _get_hf_timeout_seconds() | |
| model_id = "joeddav/xlm-roberta-large-xnli" | |
| try: | |
| body = json.dumps( | |
| { | |
| "inputs": text_for_classification, | |
| "parameters": {"candidate_labels": candidate_labels, "multi_label": True}, | |
| "options": {"wait_for_model": True}, | |
| } | |
| ).encode("utf-8") | |
| for endpoint in _hf_inference_endpoints(model_id): | |
| req = request.Request( | |
| endpoint, | |
| data=body, | |
| method="POST", | |
| headers={ | |
| "Authorization": f"Bearer {hf_api_token}", | |
| "Content-Type": "application/json", | |
| }, | |
| ) | |
| try: | |
| with request.urlopen(req, timeout=hf_timeout) as res: | |
| result = json.loads(res.read().decode("utf-8")) | |
| if isinstance(result, dict) and result.get("error"): | |
| logger.warning("HF topic API error: %s", result.get("error")) | |
| continue | |
| labels = result.get("labels", []) if isinstance(result, dict) else [] | |
| scores = result.get("scores", []) if isinstance(result, dict) else [] | |
| ranked_topics: List[str] = [] | |
| for label, score in zip(labels, scores): | |
| if float(score) >= min_score: | |
| ranked_topics.append(str(label)) | |
| if len(ranked_topics) >= max_labels: | |
| break | |
| if ranked_topics: | |
| return ranked_topics | |
| if labels: | |
| return [str(labels[0])] | |
| except Exception as e: | |
| logger.warning("HF topic request failed via %s: %s", endpoint, str(e)) | |
| continue | |
| except Exception as e: | |
| logger.error("Topic classification failed: %s", str(e)) | |
| return _fallback_topic_classification(text) | |
| # ---------------------------------------------------------------------- | |
| # Named Entity Recognition (disabled for memory) | |
| # ---------------------------------------------------------------------- | |
| def extract_entities(text: str) -> List[str]: | |
| logger.info("NER disabled for memory optimization") | |
| return [] | |
| # ---------------------------------------------------------------------- | |
| # Embedding generation (disabled/lightweight) | |
| # ---------------------------------------------------------------------- | |
| def generate_embedding(text: str) -> Dict: | |
| model_name = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2" | |
| return {"vector": [], "model": model_name} | |
| # ---------------------------------------------------------------------- | |
| # FAISS storage (placeholder) | |
| # ---------------------------------------------------------------------- | |
| def store_embedding_in_faiss(vector: List[float], memory_id: str, faiss_index) -> int: | |
| return 4271 | |
| # ---------------------------------------------------------------------- | |
| # Main processing loop (called by scheduler) | |
| # ---------------------------------------------------------------------- | |
| def process_unprocessed_memories(batch_size: int = 50) -> Dict: | |
| col = get_collection("memories") | |
| preprocessing_result = preprocess_unprocessed_memories(batch_size) | |
| unprocessed = list( | |
| col.find( | |
| { | |
| "preprocessing": {"$exists": True}, | |
| "nlp_insights": {"$exists": False}, | |
| } | |
| ).limit(batch_size) | |
| ) | |
| processed_count = 0 | |
| failed_count = 0 | |
| errors = [] | |
| for memory in unprocessed: | |
| try: | |
| memory_id = str(memory["_id"]) | |
| preprocessed = memory.get("preprocessing", {}) | |
| cleaned_text = preprocessed.get("cleaned", "") | |
| preprocessing_keywords = preprocessed.get("keywords", []) | |
| if not cleaned_text: | |
| continue | |
| logger.info("Processing memory %s...", memory_id) | |
| emotion_scores = extract_emotion_scores(cleaned_text) | |
| keywords = extract_keywords(cleaned_text) or preprocessing_keywords | |
| topics = categorize_topics(cleaned_text, keywords) | |
| entities = extract_entities(cleaned_text) | |
| embedding_data = generate_embedding(cleaned_text) | |
| embedding_id = store_embedding_in_faiss( | |
| embedding_data["vector"], | |
| memory_id, | |
| faiss_index=None, | |
| ) | |
| mood = max(emotion_scores, key=emotion_scores.get) if emotion_scores else "neutral" | |
| nlp_data = { | |
| "content_clean": cleaned_text, | |
| "mood": mood, | |
| "embedding_id": embedding_id, | |
| "nlp_insights": { | |
| "emotion_scores": emotion_scores, | |
| "keywords": keywords, | |
| "topics": topics, | |
| "entities": entities, | |
| }, | |
| } | |
| if update_memory_with_nlp(memory_id, nlp_data): | |
| processed_count += 1 | |
| logger.info("✓ Processed %s", memory_id) | |
| else: | |
| failed_count += 1 | |
| errors.append(f"Failed to update {memory_id}") | |
| except Exception as e: | |
| failed_count += 1 | |
| error_msg = f"Error processing {memory.get('_id')}: {str(e)}" | |
| errors.append(error_msg) | |
| logger.error(error_msg) | |
| return { | |
| "preprocessing": preprocessing_result, | |
| "nlp_processing": { | |
| "total": len(unprocessed), | |
| "processed": processed_count, | |
| "failed": failed_count, | |
| "errors": errors, | |
| }, | |
| } | |