introlix_api / app.py
satyam998's picture
Initial commit
79d285f
from fastapi import FastAPI, Query, HTTPException
from bson import ObjectId
import sys
import httpx
import os
import crawler
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from introlix_api.app.routes import auth, posts, run_spider, similarity
from typing import List
from dotenv import load_dotenv, dotenv_values
from introlix_api.app.appwrite import databases, APPWRITE_DATABASE_ID, ID, APPWRITE_ACCOUNT_COLLECTION_ID, get_interests
from introlix_api.app.database import startup_db_client, shutdown_db_client
from introlix_api.ml.recommendation import Recommendation
from introlix_api.utils.tags import fetch_tags
from introlix_api.exception import CustomException
from contextlib import asynccontextmanager
from pydantic import BaseModel, Field
load_dotenv()
YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
class FeedModel(BaseModel):
id: str = Field(..., alias="_id")
title: str
desc: str
url: str
publication_date: str
image_url: str
category: str
source: str
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start the database connection
await startup_db_client(app)
yield
# Close the database connection
await shutdown_db_client(app)
app = FastAPI(lifespan=lifespan)
origins = [
"http://localhost:3000",
"http://192.168.1.64:3000",
"https://introlixfeed.vercel.app/",
"https://introlixfeed.vercel.com/"
# Add other allowed origins here if needed
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins, # Specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", tags=["authentication"])
async def index():
return RedirectResponse(url='/docs')
@app.get("/feed_data", response_model=List[FeedModel])
async def get_feed_data(page: int = 1, limit: int = 20, user_id: str = Query(...), category=None):
try:
skip = (page - 1) * limit
response = get_interests()
user_interests = []
# getting only the interests not keywords
for interest in response:
user_interests.append(interest['interest'])
users = databases.list_documents(
database_id=APPWRITE_DATABASE_ID,
collection_id=APPWRITE_ACCOUNT_COLLECTION_ID
)
for doc in users['documents']:
if user_id == doc['$id']:
user_interests = doc['interests']
user_interests = [item.split(':')[0] for item in user_interests]
# response = await app.mongodb['feedData'].find({"category": {"$in": user_interests}}).skip(skip).limit(limit).to_list(limit)
# Perform the aggregation
if category == None:
response = await app.mongodb['feedData'].find({"category": {"$in": user_interests}}).skip(skip).limit(limit).to_list(limit)
else:
response = await app.mongodb['feedData'].find({"category": category}).skip(skip).limit(limit).to_list(limit)
# random.shuffle(response)
# Filter out items that do not have a title
response = [item for item in response if item.get('title')]
response = [item for item in response if item.get('desc')]
article_titles = [item['title'] for item in response]
recommendation_system = Recommendation(user_interests, article_titles)
recommended_titles = recommendation_system.recommend()
response = [post for post in response if post['title'] in recommended_titles]
for item in response:
item['_id'] = str(item['_id'])
item['title'] = item.get('title') or ''
item['desc'] = item.get('desc') or ''
item['url'] = item.get('url') or ''
item['publication_date'] = item.get('publication_date') or ''
item['image_url'] = item.get('image_url') or ''
item['category'] = item.get('category') or ''
item['source'] = item.get('source') or ''
return response
except Exception as e:
raise CustomException(e, sys) from e
@app.get("/fetch_post", response_model=FeedModel)
async def get_feed_data(post_id: str = Query(...)):
try:
post_id = ObjectId(post_id)
response = await app.mongodb['feedData'].find_one({"_id": post_id})
if not response:
raise HTTPException(status_code=404, detail="Post not found")
# Convert _id to string
response["_id"] = str(response["_id"])
# Check for null values and set defaults if needed
response["desc"] = (response.get("desc") or "No Description")[:90]
response["publication_date"] = response.get("publication_date") or "Unknown Date"
response["image_url"] = response.get("image_url") or "No Image URL"
response["category"] = response.get("category") or "Uncategorized"
response["source"] = response.get("source") or "Unknown Source"
# for item in response:
# item['title'] = item.get('title') or ''
# item['desc'] = item.get('desc') or ''
# item['url'] = item.get('url') or ''
# item['publication_date'] = item.get('publication_date') or ''
# item['image_url'] = item.get('image_url') or ''
# item['category'] = item.get('category') or ''
# item['source'] = item.get('source') or ''
return response
except Exception as e:
raise CustomException(e, sys) from e
@app.get("/test_recommendation")
async def test_recommendation(
user_interests: list[str] = Query(..., description="Comma-separated list of user interests"),
articles: list[str] = Query(..., description="Comma-separated list of articles")
):
"""
Test endpoint for recommendations.
Takes user interests and articles as query parameters and returns recommended articles.
"""
# Create a recommendation instance
recommendation = Recommendation(user_interests, articles)
# Get the recommended articles
recommended_articles = recommendation.recommend()
return {
"user_interests": user_interests,
"recommended_articles": recommended_articles,
}
@app.get("/youtube/videos")
async def get_youtube_videos(query: str = None):
url = "https://www.googleapis.com/youtube/v3/search"
params = {
"key": YOUTUBE_API_KEY,
"part": "snippet",
"q": query or "trending",
"type": "video",
"maxResults": 10,
"order": "viewCount" # You can change this to 'date' for recent uploads
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
response.raise_for_status() # Raise an error for bad responses
return response.json()
@app.get("/tags")
async def get_tags():
tags = fetch_tags()
return tags
app.include_router(auth.router, prefix="/auth")
app.include_router(run_spider.router, prefix="/spider")
app.include_router(similarity.router, prefix="/feed")
app.include_router(crawler.router)
app.include_router(posts.router)