import os import time import requests import json import base64 import threading from PIL import Image _model_lock = threading.Lock() # Backend configuration via environment variable. Defaults to auto-detected or "mock" try: import llama_cpp default_backend = "llama_cpp" except ImportError: default_backend = "mock" BACKEND = "llama_cpp" # Force llama_cpp backend # Constants for Hugging Face Space model loading MODEL_REPO = "bartowski/gemma-1.1-2b-it-GGUF" MODEL_FILE = "gemma-1.1-2b-it-Q4_K_M.gguf" LOCAL_MODEL_DIR = os.environ.get("MODEL_DIR", "./model") _llama_model = None def _download_gguf(): """Download GGUF model from Hugging Face if not already present.""" os.makedirs(LOCAL_MODEL_DIR, exist_ok=True) local_path = os.path.join(LOCAL_MODEL_DIR, MODEL_FILE) if os.path.exists(local_path): print(f"[llm.py] Model GGUF already exists at {local_path}") return local_path print(f"[llm.py] Downloading {MODEL_FILE} from HF repo {MODEL_REPO}...") try: from huggingface_hub import hf_hub_download downloaded_path = hf_hub_download( repo_id=MODEL_REPO, filename=MODEL_FILE, local_dir=LOCAL_MODEL_DIR, local_dir_use_symlinks=False ) print(f"[llm.py] Model downloaded successfully to {downloaded_path}") return downloaded_path except Exception as e: print(f"[llm.py] Error downloading model from Hugging Face: {e}") return None def init_llama_cpp(): """Lazy initialization of llama_cpp model.""" global _llama_model if _llama_model is not None: return _llama_model try: from llama_cpp import Llama except ImportError: print("[llm.py] Warning: llama-cpp-python is not installed. Falling back to mock backend.") return None model_path = _download_gguf() if not model_path or not os.path.exists(model_path): print("[llm.py] Error: Model file not found. Cannot load llama_cpp.") return None print(f"[llm.py] Loading model into memory: {model_path}") num_threads = 1 if os.environ.get("SPACE_ID") else 4 try: _llama_model = Llama( model_path=model_path, n_ctx=2048, n_threads=num_threads, verbose=False ) print("[llm.py] llama_cpp model loaded successfully!") return _llama_model except Exception as e: print(f"[llm.py] Error loading llama_cpp: {e}") return None # --- Whisper.cpp ASR (Speech-to-Text) --- _whisper_model = None def _init_whisper(): """Lazy initialization of whisper.cpp model for offline ASR.""" global _whisper_model if _whisper_model is not None: return _whisper_model try: from pywhispercpp.model import Model as WhisperModel print("[llm.py] Loading whisper.cpp 'tiny' model for ASR...") _whisper_model = WhisperModel( 'tiny', n_threads=2 if not os.environ.get("SPACE_ID") else 1 ) print("[llm.py] whisper.cpp ASR model loaded successfully!") return _whisper_model except ImportError: print("[llm.py] pywhispercpp not installed. ASR will try transformers fallback.") return None except Exception as e: print(f"[llm.py] Error loading whisper.cpp ASR model: {e}") return None # --- Transformers ASR fallback --- _transformers_asr = None def _init_transformers_asr(): """Lazy initialization of transformers Whisper pipeline for fallback ASR.""" global _transformers_asr if _transformers_asr is not None: return _transformers_asr try: from transformers import pipeline print("[llm.py] Loading transformers Whisper-tiny model for fallback ASR...") _transformers_asr = pipeline( "automatic-speech-recognition", model="openai/whisper-tiny", device="cpu" ) print("[llm.py] transformers ASR model loaded successfully!") return _transformers_asr except ImportError: print("[llm.py] transformers or torch not installed. ASR will use mock fallback.") return None except Exception as e: print(f"[llm.py] Error loading transformers ASR model: {e}") return None def transcribe_audio(audio_path, prompt=""): """ Transcribe audio file to text using whisper.cpp (offline, lightweight). Falls back to transformers or mock transcription if whisper.cpp is unavailable. """ if not audio_path or not os.path.exists(audio_path): print("[llm.py] Audio file not found, using mock ASR.") return _mock_transcribe_audio(prompt) whisper = _init_whisper() if whisper is not None: temp_wav_path = None try: try: import miniaudio import wave print(f"[llm.py] Decoding and resampling audio to 16kHz mono WAV using miniaudio...") sound = miniaudio.decode_file(audio_path, nchannels=1, sample_rate=16000) # Save to temp WAV file temp_wav_path = audio_path + ".temp_16k.wav" with wave.open(temp_wav_path, "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) # 16-bit PCM wav_file.setframerate(16000) wav_file.writeframes(sound.samples) audio_path = temp_wav_path print(f"[llm.py] Resampled audio saved to: {audio_path}") except ImportError: print("[llm.py] miniaudio not installed. Passing audio file directly to whisper.cpp.") except Exception as e: print(f"[llm.py] miniaudio transcoding failed: {e}. Passing original file directly.") print(f"[llm.py] Transcribing audio: {audio_path}") segments = whisper.transcribe(audio_path) transcription = " ".join([seg.text.strip() for seg in segments]).strip() if temp_wav_path and os.path.exists(temp_wav_path): try: os.remove(temp_wav_path) except: pass if not transcription: print("[llm.py] Whisper returned empty transcription, trying transformers fallback.") else: print(f"[llm.py] ASR Transcription: \"{transcription}\"") return transcription except Exception as e: if temp_wav_path and os.path.exists(temp_wav_path): try: os.remove(temp_wav_path) except: pass print(f"[llm.py] Error during whisper.cpp transcription: {e}") # Fallback to transformers ASR asr_pipe = _init_transformers_asr() if asr_pipe is not None: try: print(f"[llm.py] Transcribing audio using transformers: {audio_path}") result = asr_pipe(audio_path) transcription = result.get("text", "").strip() if transcription: print(f"[llm.py] ASR (transformers) Transcription: \"{transcription}\"") return transcription except Exception as e: print(f"[llm.py] Error during transformers transcription: {e}") return _mock_transcribe_audio(prompt) def _mock_transcribe_audio(prompt=""): """Mock ASR fallback when whisper.cpp is not available.""" prompt_lower = str(prompt).lower() if prompt else "" if "first" in prompt_lower or "injury" in prompt_lower or "ems" in prompt_lower: return "How do I treat a sprained ankle on the trail?" elif "gear" in prompt_lower or "backpack" in prompt_lower: return "What gear list do I need for a 3-day high-altitude trek?" else: return "Am I on the correct route right now?" # Keep backward-compatible alias mock_transcribe_audio = _mock_transcribe_audio def generate_mock(prompt, system="", image_path=None, audio_path=None, history=None): """Simulate streaming for the mock backend tailored for Trailhead.""" response = "" # 0. Handle Voice Audio ASR if audio_path: transcription = transcribe_audio(audio_path, prompt) response += f"[🎙️ **Voice Journal Transcription:** \"{transcription}\"]\n\n" prompt = transcription prompt_lower = prompt.lower() # 0.5 Check if this is a Storyteller request (before other keyword matches) if "first-person adventure story of my trek" in prompt_lower or "storyteller" in system.lower() or "adventure story" in system.lower(): import re # Parse stats total_dist_match = re.search(r"Total Distance: ([\d\.]+) km", prompt) ele_gain_match = re.search(r"Total Elevation Gain: ([\d\.]+) m", prompt) alt_range_match = re.search(r"Altitude Range: (.*?)\n", prompt) total_dist = total_dist_match.group(1) if total_dist_match else "3.49" ele_gain = ele_gain_match.group(1) if ele_gain_match else "120.0" alt_range = alt_range_match.group(1) if alt_range_match else "100m - 250m" # Parse voice logs voice_logs = [] log_pattern = r"- Log #(\d+)\s+\((.*?)\)\s+at Km\s+([\d\.]+)\s+\(Alt:\s+([\d\.]+)m\):\s*\"(.*?)\"" matches = re.findall(log_pattern, prompt, re.DOTALL) for num, timestamp, km, alt, transcript in matches: voice_logs.append({ "num": num, "time": timestamp, "km": float(km), "alt": alt, "transcript": transcript.strip() }) if not voice_logs: # Fallback line-by-line parsing lines = prompt.split("\n") current_log = None for line in lines: if "- Log #" in line: try: parts = line.split(" at Km ") header_part = parts[0] km_alt_part = parts[1] num_time = header_part.replace("- Log #", "").strip() num = num_time.split(" ")[0] time_str = num_time.replace(num, "").strip("() ") km = km_alt_part.split(" ")[0] alt = km_alt_part.split("Alt: ")[1].split("m")[0] current_log = { "num": num, "time": time_str, "km": float(km), "alt": alt, "transcript": "" } except Exception: current_log = None elif current_log and line.strip().startswith('"'): current_log["transcript"] = line.strip().strip('"') voice_logs.append(current_log) current_log = None # Parse amenities amenities = [] amenity_pattern = r"- (.*?)\s+\((.*?)\)\s+at approx\.\s+Km\s+([\d\.]+)\s+\(located\s+([\d\.]+) meters off the trail\)" amenity_matches = re.findall(amenity_pattern, prompt) for name, type_str, km, offset in amenity_matches: amenities.append({ "name": name, "type": type_str, "km": float(km), "offset": offset }) if not amenities: lines = prompt.split("\n") for line in lines: if "meters off the trail" in line: try: clean_line = line.strip().lstrip("- ") name_part = clean_line.split(" (")[0] rest = clean_line.split(" (")[1] type_part = rest.split(") at approx. Km ")[0] km_offset = rest.split(") at approx. Km ")[1] km = km_offset.split(" (located ")[0] offset = km_offset.split(" (located ")[1].split(" meters off the trail")[0] amenities.append({ "name": name_part, "type": type_part, "km": float(km), "offset": offset }) except Exception: pass voice_logs = sorted(voice_logs, key=lambda x: x["km"]) is_technical = "technical" in system.lower() if is_technical: response += "🧭 **Trailhead Technical Trek Report**\n" response += "*Compiled by Trailhead AI Storyteller*\n\n" response += "### 📊 Trek Telemetry\n" response += f"- **Total Distance:** {total_dist} km\n" response += f"- **Total Elevation Gain:** {ele_gain} m\n" response += f"- **Altitude Profile:** {alt_range}\n\n" response += "### 🎒 Amenities & Points of Interest\n" if amenities: for am in amenities: response += f"- **{am['name']}** ({am['type']}) at approx. Km {am['km']:.2f} ({am['offset']}m off-trail)\n" else: response += "- No significant amenities detected along the route.\n" response += "\n### 🎙️ Geotagged Voice Logs\n" if voice_logs: for log in voice_logs: response += f"- **Km {log['km']:.2f}** (Alt: {log['alt']}m) | *{log['time']}*:\n > \"{log['transcript']}\"\n" else: response += "- No voice logs recorded.\n" else: water_count = sum(1 for am in amenities if "water" in am["name"].lower() or "fountain" in am["name"].lower()) camp_count = sum(1 for am in amenities if "camp" in am["name"].lower() or "shelter" in am["name"].lower()) other_count = len(amenities) - water_count - camp_count response += "🌲 **MY WILDERNESS EXPEDITION REPORT** 🌲\n" response += "*Powered by Trailhead Tactical Trail Computer*\n\n" response += f"What an absolute journey! 🏔️ Just finished an intense trek covering **{total_dist} km** with **{ele_gain} m** of vertical climb! " response += f"The altitude range profile spanned from **{alt_range}**, offering challenging terrain but rewarding views.\n\n" response += "### 🥾 The Journey & Resource Milestones\n" response += "Setting off, the trail presented a rugged path but was well-equipped for resource management. " if water_count > 0 or camp_count > 0 or other_count > 0: parts = [] if water_count > 0: parts.append(f"{water_count} drinking water and fountain stations") if camp_count > 0: parts.append(f"{camp_count} campsite/shelter areas") if other_count > 0: parts.append(f"{other_count} other points of interest") response += f"Along the way, I passed through **{', '.join(parts)}** situated conveniently off the path, ensuring hydration and safety were never compromised. " response += "Navigating these waypoints required careful planning, but it paid off beautifully.\n\n" if voice_logs: response += "### 🎙️ Trail Reflections & Audio Log Highlights\n" for log in voice_logs: transcript_lower = log['transcript'].lower() icon = "🎙️" title = "Trail Observation" if "water" in transcript_lower or "waterfall" in transcript_lower: icon = "💧" title = "Water Source & Hydration Check" elif "view" in transcript_lower or "scenic" in transcript_lower: icon = "👁️" title = "Scenic Viewpoint Reflection" elif "finish" in transcript_lower or "complete" in transcript_lower: icon = "🏁" title = "Trek Completion Signoff" response += f"{icon} **Km {log['km']:.2f} | {title}** 📝\n" response += f"Recorded voice entry at {log['alt']}m altitude:\n" response += f"> *\"{log['transcript']}\"*\n\n" response += "🏁 **Trek Complete!**\n" response += "Every step was worth it. Pushed my limits, managed my resources, and conquered the route. 🥾\n\n" response += "---\n" response += "#HikingAdventure #BackcountryExploration #TrailheadAI #WildernessLiving #TrekTelemetry #OptOutside\n" for word in response.split(" "): yield word + " " time.sleep(0.02) return # 1. Checkpoint / Narration Queries if "checkpoint" in prompt_lower or "narration" in prompt_lower or "current position" in prompt_lower: response += ( "🧭 **Trailhead Contextual Guide:**\n" "You are approaching **Km 2.0 Checkpoint**. The terrain ahead is moderately steep with an elevation gain of ~45m over the next kilometer.\n\n" "⚠️ **Advisory:** Watch your water supply; the next reliable spring is at Km 3.5. Ensure you reach the shelter before 17:00 as temperatures drop rapidly to 5°C." ) # 2. Gear Checklist Queries elif "gear" in prompt_lower or "checklist" in prompt_lower or "pack" in prompt_lower: response += ( "🎒 **Suggested Gear Checklist (Pace- & Altitude-Adjusted):**\n" "Based on your 1-day trek details, here is a highly tailored packing guide:\n\n" "- **Navigation:** Offline map download, compass, backup physical map.\n" "- **Hydration:** 2.5L water capacity + iodine tablets (water sources tagged at Km 3.5).\n" "- **Apparel:** Windbreaker/rain shell, moisture-wicking base layers, wool socks.\n" "- **Safety:** First-aid kit (with blister care), whistle, multi-tool, space blanket.\n" "- **Nutrition:** 2500 kcal high-density trail snacks (nuts, bars, jerky)." ) # 3. Wilderness First-Aid / RAG Queries elif "first-aid" in prompt_lower or "first aid" in prompt_lower or "medical" in prompt_lower or "injury" in prompt_lower or "sprain" in prompt_lower or "ams" in prompt_lower or "sick" in prompt_lower: response += ( "🩹 **Wilderness First-Aid Protocol (CITED):**\n" "For managing a **Sprained Ankle / Strain** in the backcountry, use the **R.I.C.E.** protocol:\n\n" "1. **Rest:** Stop hiking immediately. Remove weight from the injured limb.\n" "2. **Ice / Cold:** Apply a cold pack or submerge in cold trail stream for 20 mins to reduce swelling.\n" "3. **Compression:** Wrap firmly with an elastic bandage (do not restrict circulation).\n" "4. **Elevation:** Elevate the ankle above the heart level whenever resting.\n\n" "📖 *CITED SOURCE: Wilderness Medicine Field Guide, Section 7: Musculoskeletal Injuries.*" ) # 4. Off-Route / Deviation Queries elif "route" in prompt_lower or "off-route" in prompt_lower or "deviate" in prompt_lower or "map" in prompt_lower: response += ( "⚠️ **Navigation Warning:**\n" "You have deviated from the planned polyline by **42 meters**. \n\n" "**Action:** Look for physical trail markers or backtrack to your last known coordinate. Do not proceed off-trail through dense underbrush." ) # 5. Default Response else: response += ( "🌲 **Welcome to Trailhead Navigation Assistant!**\n" "I am your offline-first trail computer. I can analyze your uploaded GPX files, estimate Naismith trekking durations, auto-partition checkpoints, and offer grounded AI advice.\n\n" "Ask me about gear checklists, route narration, deviation warnings, or wilderness first-aid emergency protocols." ) for word in response.split(" "): yield word + " " time.sleep(0.03) def generate_llama_cpp(prompt, system="", image_path=None, audio_path=None, history=None): """Query the in-process llama-cpp-python model with a timeout fallback to mock.""" if getattr(generate_llama_cpp, "disabled", False): print("[llm.py] llama_cpp is disabled (too slow or failed). Using mock backend.") for chunk in generate_mock(prompt, system, image_path, audio_path, history): yield chunk return acquired = _model_lock.acquire(blocking=True) if not acquired: print("[llm.py] Could not acquire model lock. Falling back to mock.") for chunk in generate_mock(prompt, system, image_path, audio_path, history): yield chunk return try: start_time = time.time() model = None try: model = init_llama_cpp() except Exception as e: print(f"[llm.py] Exception during init_llama_cpp: {e}") if model is None: print("[llm.py] Fallback to mock backend.") for chunk in generate_mock(prompt, system, image_path, audio_path, history): yield chunk return init_duration = time.time() - start_time if init_duration > 120.0: print(f"[llm.py] Warning: Model loading took {init_duration:.2f}s (exceeded 120s limit). Disabling llama_cpp and falling back to mock backend.") generate_llama_cpp.disabled = True for chunk in generate_mock(prompt, system, image_path, audio_path, history): yield chunk return voice_prefix = "" if audio_path: transcription = transcribe_audio(audio_path, prompt) voice_prefix = f"[🎙️ **ASR Transcribed:** \"{transcription}\"]\n\n" prompt = f"The hiker asked by voice: '{transcription}'. Respond directly to this query." if image_path: prompt = f"[📸 Image uploaded] {prompt}" messages = [] combined_prompt = prompt if system: combined_prompt = f"System Instructions:\n{system}\n\nUser Query: {prompt}" if history: first_msg_updated = False for msg in history: role = msg.get("role", "user") content = msg.get("content", "") if role == "system": continue if not first_msg_updated and role == "user": content = f"System Instructions:\n{system}\n\nUser Query: {content}" first_msg_updated = True messages.append({"role": role, "content": content}) messages.append({"role": "user", "content": prompt}) else: messages.append({"role": "user", "content": combined_prompt}) print(f"\n--- [llama.cpp INPUT MESSAGES] ---\n{messages}\n--------------------------------") print("--- [llama.cpp STREAMING RESPONSE] ---") try: response = model.create_chat_completion( messages=messages, max_tokens=512, temperature=0.3, top_p=0.9, stream=True ) first_token_timeout = 120.0 response_iter = iter(response) first_chunk_start = time.time() try: first_chunk = next(response_iter) except StopIteration: first_chunk = None prefill_duration = time.time() - first_chunk_start if prefill_duration > first_token_timeout: print(f"[llm.py] Prompt evaluation took {prefill_duration:.2f}s (exceeded {first_token_timeout}s limit). Disabling llama_cpp and falling back to mock.") generate_llama_cpp.disabled = True for chunk in generate_mock(prompt, system, image_path, audio_path, history): yield chunk return if voice_prefix: yield voice_prefix if first_chunk: text = first_chunk['choices'][0]['delta'].get('content', '') print(text, end="", flush=True) yield text for chunk in response_iter: text = chunk['choices'][0]['delta'].get('content', '') print(text, end="", flush=True) yield text print("\n--------------------------------------") except Exception as e: print(f"[llm.py] Error running llama.cpp: {e}. Falling back to mock.") for chunk in generate_mock(prompt, system, image_path, audio_path, history): yield chunk finally: _model_lock.release() def generate(prompt, system="", image_path=None, audio_path=None, history=None, stream=True): """Entry point for LLM generation supporting text, image, and voice inputs.""" print(f"[llm.py] Using backend: {BACKEND}") if BACKEND == "llama_cpp": generator = generate_llama_cpp(prompt, system, image_path, audio_path, history) else: # mock generator = generate_mock(prompt, system, image_path, audio_path, history) if stream: return generator else: res = "" for chunk in generator: res += chunk return res