| """ |
| PIOE - Personal Intelligence & Opportunity Engine |
| |
| FastAPI Backend Application |
| """ |
| from fastapi import FastAPI, Depends, HTTPException, Query, BackgroundTasks |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import HTMLResponse, JSONResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| from sqlalchemy.orm import Session |
| from datetime import datetime |
| from typing import Optional |
| from pathlib import Path |
|
|
| from .database import get_db, init_db |
| from .models import Opportunity, OpportunityCategory, OpportunityStatus, Domain |
| from .delivery import DigestGenerator |
| from .ingestion import IngestionScheduler |
|
|
| |
| app = FastAPI( |
| title="PIOE - Personal Intelligence & Opportunity Engine", |
| description="Signal intelligence system for opportunities in AI, Robotics, and more", |
| version="1.0.0" |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| scheduler: Optional[IngestionScheduler] = None |
|
|
|
|
| @app.on_event("startup") |
| async def startup(): |
| """Initialize database and scheduler on startup.""" |
| init_db() |
| global scheduler |
| scheduler = IngestionScheduler() |
| |
| print("PIOE Backend started. Run /api/ingest/start to begin ingestion.") |
|
|
|
|
| @app.on_event("shutdown") |
| async def shutdown(): |
| """Cleanup on shutdown.""" |
| global scheduler |
| if scheduler: |
| scheduler.stop() |
|
|
|
|
| |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def serve_dashboard(): |
| """Serve the frontend dashboard.""" |
| frontend_path = Path(__file__).parent.parent / "frontend" / "index.html" |
| if frontend_path.exists(): |
| return HTMLResponse(content=frontend_path.read_text(), status_code=200) |
| return HTMLResponse(content="<h1>PIOE Dashboard - Frontend not found</h1>", status_code=200) |
|
|
|
|
| |
|
|
| @app.get("/api/opportunities") |
| async def get_opportunities( |
| db: Session = Depends(get_db), |
| category: Optional[str] = None, |
| domain: Optional[str] = None, |
| status: Optional[str] = None, |
| min_score: float = 0.0, |
| limit: int = Query(default=50, le=200), |
| offset: int = 0 |
| ): |
| """Get filtered list of opportunities.""" |
| query = db.query(Opportunity).filter( |
| Opportunity.combined_score >= min_score |
| ) |
| |
| if category: |
| try: |
| query = query.filter(Opportunity.category == OpportunityCategory(category)) |
| except ValueError: |
| pass |
| |
| if domain: |
| try: |
| query = query.filter(Opportunity.domain == Domain(domain)) |
| except ValueError: |
| pass |
| |
| if status: |
| try: |
| query = query.filter(Opportunity.status == OpportunityStatus(status)) |
| except ValueError: |
| pass |
| |
| total = query.count() |
| |
| opportunities = query.order_by( |
| Opportunity.combined_score.desc() |
| ).offset(offset).limit(limit).all() |
| |
| return { |
| "total": total, |
| "limit": limit, |
| "offset": offset, |
| "opportunities": [ |
| { |
| "id": o.id, |
| "title": o.title, |
| "category": o.category.value if o.category else None, |
| "domain": o.domain.value if o.domain else None, |
| "source_name": o.source_name, |
| "url": o.url, |
| "deadline": o.deadline.isoformat() if o.deadline else None, |
| "relevance_score": o.relevance_score, |
| "novelty_score": o.novelty_score, |
| "credibility_score": o.credibility_score, |
| "combined_score": o.combined_score, |
| |
| "roi_score": getattr(o, 'roi_score', None), |
| "risk_level": o.risk_level.value if hasattr(o, 'risk_level') and o.risk_level else "medium", |
| "region": o.region.value if hasattr(o, 'region') and o.region else "global", |
| "status": o.status.value if o.status else None, |
| "discovered_at": o.discovered_at.isoformat() if o.discovered_at else None, |
| "raw_text": o.raw_text[:500] if o.raw_text else None |
| } |
| for o in opportunities |
| ] |
| } |
|
|
|
|
| @app.get("/api/opportunities/{opportunity_id}") |
| async def get_opportunity(opportunity_id: str, db: Session = Depends(get_db)): |
| """Get single opportunity by ID with full PIOE 2.0 details.""" |
| opp = db.query(Opportunity).filter(Opportunity.id == opportunity_id).first() |
| if not opp: |
| raise HTTPException(status_code=404, detail="Opportunity not found") |
| |
| return { |
| "id": opp.id, |
| "title": opp.title, |
| "category": opp.category.value if opp.category else None, |
| "domain": opp.domain.value if opp.domain else None, |
| "source_name": opp.source_name, |
| "source_type": opp.source_type.value if opp.source_type else None, |
| "url": opp.url, |
| "deadline": opp.deadline.isoformat() if opp.deadline else None, |
| "published_at": opp.published_at.isoformat() if opp.published_at else None, |
| "discovered_at": opp.discovered_at.isoformat() if opp.discovered_at else None, |
| "raw_text": opp.raw_text, |
| |
| "relevance_score": opp.relevance_score, |
| "novelty_score": opp.novelty_score, |
| "credibility_score": opp.credibility_score, |
| "signal_strength": opp.signal_strength, |
| "combined_score": opp.combined_score, |
| |
| "roi_score": getattr(opp, 'roi_score', None), |
| "unlock_potential": getattr(opp, 'unlock_potential', None), |
| "risk_level": opp.risk_level.value if hasattr(opp, 'risk_level') and opp.risk_level else "medium", |
| "competition_level": getattr(opp, 'competition_level', None), |
| |
| "region": opp.region.value if hasattr(opp, 'region') and opp.region else "global", |
| "region_weight": getattr(opp, 'region_weight', 1.0), |
| |
| "status": opp.status.value if opp.status else None, |
| "metadata": opp.extra_data |
| } |
|
|
|
|
| @app.get("/api/opportunities/{opportunity_id}/guidance") |
| async def get_action_guidance(opportunity_id: str, db: Session = Depends(get_db)): |
| """PIOE 2.0: Get AI-powered action guidance for an opportunity.""" |
| from .intelligence import LLMClient |
| |
| opp = db.query(Opportunity).filter(Opportunity.id == opportunity_id).first() |
| if not opp: |
| raise HTTPException(status_code=404, detail="Opportunity not found") |
| |
| |
| opp_dict = { |
| "title": opp.title, |
| "category": opp.category.value if opp.category else "other", |
| "domain": opp.domain.value if opp.domain else "mixed", |
| "deadline": opp.deadline.isoformat() if opp.deadline else None, |
| "raw_text": opp.raw_text or "", |
| "roi_score": getattr(opp, 'roi_score', 0.5), |
| "competition_level": getattr(opp, 'competition_level', 0.5), |
| "region": opp.region.value if hasattr(opp, 'region') and opp.region else "global", |
| } |
| |
| |
| llm = LLMClient.get_client() |
| guidance = llm.recommend_action(opp_dict) |
| |
| return { |
| "opportunity_id": opportunity_id, |
| "guidance": guidance |
| } |
|
|
|
|
| @app.patch("/api/opportunities/{opportunity_id}/status") |
| async def update_opportunity_status( |
| opportunity_id: str, |
| status: str, |
| db: Session = Depends(get_db) |
| ): |
| """Update opportunity status (save, apply, dismiss, etc.).""" |
| opp = db.query(Opportunity).filter(Opportunity.id == opportunity_id).first() |
| if not opp: |
| raise HTTPException(status_code=404, detail="Opportunity not found") |
| |
| try: |
| opp.status = OpportunityStatus(status) |
| db.commit() |
| return {"success": True, "new_status": status} |
| except ValueError: |
| raise HTTPException(status_code=400, detail=f"Invalid status: {status}") |
|
|
|
|
| |
|
|
| @app.get("/api/digest/daily") |
| async def get_daily_digest(db: Session = Depends(get_db), limit: int = 10): |
| """Get today's opportunity digest.""" |
| generator = DigestGenerator(db) |
| digest = generator.generate_daily(limit) |
| return {"digest": digest} |
|
|
|
|
| @app.get("/api/digest/weekly") |
| async def get_weekly_digest(db: Session = Depends(get_db), limit: int = 25): |
| """Get weekly opportunity digest.""" |
| generator = DigestGenerator(db) |
| digest = generator.generate_weekly(limit) |
| return {"digest": digest} |
|
|
|
|
| @app.get("/api/digest/urgent") |
| async def get_urgent_digest(db: Session = Depends(get_db), limit: int = 10): |
| """Get urgent opportunities with approaching deadlines.""" |
| generator = DigestGenerator(db) |
| digest = generator.generate_urgent(limit) |
| return {"digest": digest} |
|
|
|
|
| @app.get("/api/digest/{category}") |
| async def get_category_digest( |
| category: str, |
| db: Session = Depends(get_db), |
| limit: int = 10 |
| ): |
| """Get digest for specific category.""" |
| try: |
| cat = OpportunityCategory(category) |
| except ValueError: |
| raise HTTPException(status_code=400, detail=f"Invalid category: {category}") |
| |
| generator = DigestGenerator(db) |
| digest = generator.generate_by_category(cat, limit) |
| return {"digest": digest} |
|
|
|
|
| |
|
|
| @app.post("/api/ingest/run") |
| async def run_ingestion(background_tasks: BackgroundTasks): |
| """Trigger full ingestion manually.""" |
| global scheduler |
| if not scheduler: |
| scheduler = IngestionScheduler() |
| |
| background_tasks.add_task(scheduler.run_full_ingestion) |
| return {"message": "Ingestion started in background"} |
|
|
|
|
| @app.post("/api/ingest/source/{source_name}") |
| async def run_source_ingestion(source_name: str, background_tasks: BackgroundTasks): |
| """Trigger ingestion for specific source.""" |
| global scheduler |
| if not scheduler: |
| scheduler = IngestionScheduler() |
| |
| background_tasks.add_task(scheduler.ingest_single_source, source_name) |
| return {"message": f"Ingestion started for {source_name}"} |
|
|
|
|
| @app.post("/api/ingest/start") |
| async def start_scheduler(): |
| """Start the automatic ingestion scheduler.""" |
| global scheduler |
| if not scheduler: |
| scheduler = IngestionScheduler() |
| |
| scheduler.start() |
| return {"message": "Scheduler started"} |
|
|
|
|
| @app.post("/api/ingest/stop") |
| async def stop_scheduler(): |
| """Stop the automatic ingestion scheduler.""" |
| global scheduler |
| if scheduler: |
| scheduler.stop() |
| return {"message": "Scheduler stopped"} |
|
|
|
|
| |
|
|
| @app.get("/api/stats") |
| async def get_stats(db: Session = Depends(get_db)): |
| """Get overview statistics.""" |
| from sqlalchemy import func |
| |
| total = db.query(Opportunity).count() |
| new_count = db.query(Opportunity).filter( |
| Opportunity.status == OpportunityStatus.NEW |
| ).count() |
| |
| |
| categories = db.query( |
| Opportunity.category, func.count(Opportunity.id) |
| ).group_by(Opportunity.category).all() |
| |
| |
| domains = db.query( |
| Opportunity.domain, func.count(Opportunity.id) |
| ).group_by(Opportunity.domain).all() |
| |
| return { |
| "total_opportunities": total, |
| "new_opportunities": new_count, |
| "by_category": { |
| cat.value if cat else "unknown": count |
| for cat, count in categories |
| }, |
| "by_domain": { |
| dom.value if dom else "unknown": count |
| for dom, count in domains |
| } |
| } |
|
|
|
|
| |
|
|
| from pydantic import BaseModel |
|
|
| class ChatMessage(BaseModel): |
| message: str |
|
|
| @app.post("/api/chat") |
| async def chat_with_opportunities( |
| chat: ChatMessage, |
| db: Session = Depends(get_db) |
| ): |
| """ |
| PIOE 2.0: AI-powered chat to search and explore opportunities. |
| Ask questions like: |
| - "Find me hackathons in Nigeria" |
| - "What grants are available for AI projects?" |
| - "Show me high ROI opportunities with low competition" |
| """ |
| from .intelligence import LLMClient |
| |
| user_message = chat.message.strip() |
| if not user_message: |
| return {"response": "Please ask a question about opportunities.", "opportunities": []} |
| |
| |
| opportunities = db.query(Opportunity).filter( |
| Opportunity.combined_score >= 0.3 |
| ).order_by(Opportunity.combined_score.desc()).limit(100).all() |
| |
| |
| opp_summaries = [] |
| for o in opportunities: |
| summary = f"[{o.id}] {o.title} | Category: {o.category.value if o.category else 'other'} | Domain: {o.domain.value if o.domain else 'mixed'} | Region: {o.region.value if hasattr(o, 'region') and o.region else 'global'} | ROI: {getattr(o, 'roi_score', 0.5):.0%} | Risk: {o.risk_level.value if hasattr(o, 'risk_level') and o.risk_level else 'medium'}" |
| opp_summaries.append(summary) |
| |
| opp_context = "\n".join(opp_summaries[:50]) if opp_summaries else "No opportunities found in database." |
| |
| |
| prompt = f"""You are PIOE, a Personal Intelligence & Opportunity Engine assistant. |
| The user is from Nigeria and interested in AI, Computer Vision, Robotics, and Web3 opportunities. |
| |
| AVAILABLE OPPORTUNITIES: |
| {opp_context} |
| |
| USER QUESTION: {user_message} |
| |
| Instructions: |
| 1. Answer the user's question based on the opportunities above |
| 2. If they're searching for specific types, list the most relevant opportunity IDs |
| 3. Provide actionable advice |
| 4. Be concise but helpful |
| 5. If no matching opportunities exist, suggest what to search for |
| |
| Return a JSON response: |
| {{ |
| "response": "Your helpful answer here", |
| "matched_ids": ["id1", "id2"] or [] if none match, |
| "suggested_action": "What the user should do next" |
| }}""" |
| |
| try: |
| llm = LLMClient.get_client() |
| result = llm._generate(prompt) if hasattr(llm, '_generate') else '{"response": "AI not configured", "matched_ids": [], "suggested_action": "Configure Gemini API key"}' |
| |
| import json |
| |
| start = result.find('{') |
| end = result.rfind('}') + 1 |
| if start != -1 and end > start: |
| parsed = json.loads(result[start:end]) |
| response_text = parsed.get("response", result) |
| matched_ids = parsed.get("matched_ids", []) |
| suggested_action = parsed.get("suggested_action", "") |
| else: |
| response_text = result |
| matched_ids = [] |
| suggested_action = "" |
| |
| |
| matched_opps = [] |
| if matched_ids: |
| for opp in opportunities: |
| if opp.id in matched_ids: |
| matched_opps.append({ |
| "id": opp.id, |
| "title": opp.title, |
| "category": opp.category.value if opp.category else None, |
| "domain": opp.domain.value if opp.domain else None, |
| "url": opp.url, |
| "roi_score": getattr(opp, 'roi_score', None), |
| "risk_level": opp.risk_level.value if hasattr(opp, 'risk_level') and opp.risk_level else "medium", |
| "region": opp.region.value if hasattr(opp, 'region') and opp.region else "global", |
| }) |
| |
| return { |
| "response": response_text, |
| "opportunities": matched_opps[:10], |
| "suggested_action": suggested_action, |
| "total_searched": len(opportunities) |
| } |
| |
| except Exception as e: |
| |
| keywords = user_message.lower().split() |
| matched = [] |
| for o in opportunities: |
| text = f"{o.title} {o.raw_text or ''}".lower() |
| if any(kw in text for kw in keywords): |
| matched.append({ |
| "id": o.id, |
| "title": o.title, |
| "category": o.category.value if o.category else None, |
| "url": o.url, |
| "roi_score": getattr(o, 'roi_score', None), |
| }) |
| |
| return { |
| "response": f"Found {len(matched)} opportunities matching your search. (AI unavailable: {str(e)[:50]})", |
| "opportunities": matched[:10], |
| "suggested_action": "Click on any opportunity for details", |
| "total_searched": len(opportunities) |
| } |
|
|
|
|
| |
| frontend_dir = Path(__file__).parent.parent / "frontend" |
| if frontend_dir.exists(): |
| app.mount("/static", StaticFiles(directory=str(frontend_dir)), name="static") |
|
|
|
|