Spaces:
Sleeping
Sleeping
File size: 7,327 Bytes
79d285f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 | from fastapi import FastAPI, Query, HTTPException
from bson import ObjectId
import sys
import httpx
import os
import crawler
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from introlix_api.app.routes import auth, posts, run_spider, similarity
from typing import List
from dotenv import load_dotenv, dotenv_values
from introlix_api.app.appwrite import databases, APPWRITE_DATABASE_ID, ID, APPWRITE_ACCOUNT_COLLECTION_ID, get_interests
from introlix_api.app.database import startup_db_client, shutdown_db_client
from introlix_api.ml.recommendation import Recommendation
from introlix_api.utils.tags import fetch_tags
from introlix_api.exception import CustomException
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
load_dotenv()
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
class FeedModel(BaseModel):
id: str = Field(..., alias="_id")
title: str
desc: str
url: str
publication_date: str
image_url: str
category: str
source: str
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start the database connection
await startup_db_client(app)
yield
# Close the database connection
await shutdown_db_client(app)
app = FastAPI(lifespan=lifespan)
origins = [
"http://localhost:3000",
"http://192.168.1.64:3000",
"https://introlixfeed.vercel.app/",
"https://introlixfeed.vercel.com/"
# Add other allowed origins here if needed
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins, # Specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", tags=["authentication"])
async def index():
return RedirectResponse(url='/docs')
@app.get("/feed_data", response_model=List[FeedModel])
async def get_feed_data(page: int = 1, limit: int = 20, user_id: str = Query(...), category=None):
try:
skip = (page - 1) * limit
response = get_interests()
user_interests = []
# getting only the interests not keywords
for interest in response:
user_interests.append(interest['interest'])
users = databases.list_documents(
database_id=APPWRITE_DATABASE_ID,
collection_id=APPWRITE_ACCOUNT_COLLECTION_ID
)
for doc in users['documents']:
if user_id == doc['$id']:
user_interests = doc['interests']
user_interests = [item.split(':')[0] for item in user_interests]
# response = await app.mongodb['feedData'].find({"category": {"$in": user_interests}}).skip(skip).limit(limit).to_list(limit)
# Perform the aggregation
if category == None:
response = await app.mongodb['feedData'].find({"category": {"$in": user_interests}}).skip(skip).limit(limit).to_list(limit)
else:
response = await app.mongodb['feedData'].find({"category": category}).skip(skip).limit(limit).to_list(limit)
# random.shuffle(response)
# Filter out items that do not have a title
response = [item for item in response if item.get('title')]
response = [item for item in response if item.get('desc')]
article_titles = [item['title'] for item in response]
recommendation_system = Recommendation(user_interests, article_titles)
recommended_titles = recommendation_system.recommend()
response = [post for post in response if post['title'] in recommended_titles]
for item in response:
item['_id'] = str(item['_id'])
item['title'] = item.get('title') or ''
item['desc'] = item.get('desc') or ''
item['url'] = item.get('url') or ''
item['publication_date'] = item.get('publication_date') or ''
item['image_url'] = item.get('image_url') or ''
item['category'] = item.get('category') or ''
item['source'] = item.get('source') or ''
return response
except Exception as e:
raise CustomException(e, sys) from e
@app.get("/fetch_post", response_model=FeedModel)
async def get_feed_data(post_id: str = Query(...)):
try:
post_id = ObjectId(post_id)
response = await app.mongodb['feedData'].find_one({"_id": post_id})
if not response:
raise HTTPException(status_code=404, detail="Post not found")
# Convert _id to string
response["_id"] = str(response["_id"])
# Check for null values and set defaults if needed
response["desc"] = (response.get("desc") or "No Description")[:90]
response["publication_date"] = response.get("publication_date") or "Unknown Date"
response["image_url"] = response.get("image_url") or "No Image URL"
response["category"] = response.get("category") or "Uncategorized"
response["source"] = response.get("source") or "Unknown Source"
# for item in response:
# item['title'] = item.get('title') or ''
# item['desc'] = item.get('desc') or ''
# item['url'] = item.get('url') or ''
# item['publication_date'] = item.get('publication_date') or ''
# item['image_url'] = item.get('image_url') or ''
# item['category'] = item.get('category') or ''
# item['source'] = item.get('source') or ''
return response
except Exception as e:
raise CustomException(e, sys) from e
@app.get("/test_recommendation")
async def test_recommendation(
user_interests: list[str] = Query(..., description="Comma-separated list of user interests"),
articles: list[str] = Query(..., description="Comma-separated list of articles")
):
"""
Test endpoint for recommendations.
Takes user interests and articles as query parameters and returns recommended articles.
"""
# Create a recommendation instance
recommendation = Recommendation(user_interests, articles)
# Get the recommended articles
recommended_articles = recommendation.recommend()
return {
"user_interests": user_interests,
"recommended_articles": recommended_articles,
}
@app.get("/youtube/videos")
async def get_youtube_videos(query: str = None):
url = "https://www.googleapis.com/youtube/v3/search"
params = {
"key": YOUTUBE_API_KEY,
"part": "snippet",
"q": query or "trending",
"type": "video",
"maxResults": 10,
"order": "viewCount" # You can change this to 'date' for recent uploads
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
response.raise_for_status() # Raise an error for bad responses
return response.json()
@app.get("/tags")
async def get_tags():
tags = fetch_tags()
return tags
app.include_router(auth.router, prefix="/auth")
app.include_router(run_spider.router, prefix="/spider")
app.include_router(similarity.router, prefix="/feed")
app.include_router(crawler.router)
app.include_router(posts.router) |