import os import requests import gradio as gr from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import uvicorn # Configuration API_URL = "https://router.huggingface.co/v1/chat/completions" headers = { "Authorization": f"Bearer {os.environ.get('HF_TOKEN', '')}", } MODEL = "deepseek-ai/DeepSeek-V3.2:novita" app = FastAPI() def query_ai(prompt): payload = { "messages": [{"role": "user", "content": prompt}], "model": MODEL } try: response = requests.post(API_URL, headers=headers, json=payload, timeout=30) return response.json() except Exception as e: return {"error": str(e)} def search_osm_logic(query, location="Nigeria"): params = { "q": f"{query} in {location}", "format": "json", "addressdetails": 1, "limit": 10 } try: resp = requests.get("https://nominatim.openstreetmap.org/search", params=params, headers={"User-Agent": "Internly/1.0"}, timeout=10) return resp.json() except Exception as e: return [] @app.get("/api/search") async def api_search(q: str, loc: str = "Nigeria"): osm_results = search_osm_logic(q, loc) enriched_results = [] for item in osm_results: name = item.get("display_name").split(",")[0] address = item.get("display_name") lat = item.get("lat") lon = item.get("lon") osm_id = item.get("osm_id") prompt = ( f"Analyze '{name}' at '{address}' for SIWES internship in Nigeria. " "Return ONLY a JSON object with: 'orgType' (Private/Government/Unknown), " "'siwesCategory' (e.g. Healthcare, Engineering), 'summary' (1 sentence), 'relevance' (1 sentence)." ) ai_resp = query_ai(prompt) result = { "osmId": str(osm_id), "name": name, "address": address, "location": loc, "coordinates": {"lat": float(lat) if lat else 0, "lon": float(lon) if lon else 0}, "type": item.get("type", "unknown") } try: content = ai_resp["choices"][0]["message"]["content"] # Basic JSON extraction if model wraps in markdown if "```json" in content: content = content.split("```json")[1].split("```")[0] import json ai_data = json.loads(content.strip()) result.update(ai_data) except: result.update({ "orgType": "Unknown", "siwesCategory": "Unknown", "summary": "AI enrichment failed.", "relevance": "Unknown" }) enriched_results.append(result) return enriched_results # Gradio Interface for testing def gradio_search(query, location): import asyncio import json # Use the same logic for the UI results = search_osm_logic(query, location) formatted = [] for r in results: formatted.append(f"### {r.get('display_name')}\n") return "\n".join(formatted) if formatted else "No results found." with gr.Blocks(title="Internly API Service") as demo: gr.Markdown("# Internly API Service") gr.Markdown("This space serves as the backend API for Internly. Search is exposed at `/api/search`.") with gr.Row(): q_input = gr.Textbox(label="Query") l_input = gr.Textbox(label="Location", value="Nigeria") btn = gr.Button("Test Search") output = gr.Markdown() btn.click(fn=gradio_search, inputs=[q_input, l_input], outputs=output) app = gr.mount_gradio_app(app, demo, path="/") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)