Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Query, HTTPException | |
| from bson import ObjectId | |
| import sys | |
| import httpx | |
| import os | |
| import crawler | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from starlette.responses import RedirectResponse | |
| from introlix_api.app.routes import auth, posts, run_spider, similarity | |
| from typing import List | |
| from dotenv import load_dotenv, dotenv_values | |
| from introlix_api.app.appwrite import databases, APPWRITE_DATABASE_ID, ID, APPWRITE_ACCOUNT_COLLECTION_ID, get_interests | |
| from introlix_api.app.database import startup_db_client, shutdown_db_client | |
| from introlix_api.ml.recommendation import Recommendation | |
| from introlix_api.utils.tags import fetch_tags | |
| from introlix_api.exception import CustomException | |
| from contextlib import asynccontextmanager | |
| from pydantic import BaseModel, Field | |
| load_dotenv() | |
| YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") | |
| class FeedModel(BaseModel): | |
| id: str = Field(..., alias="_id") | |
| title: str | |
| desc: str | |
| url: str | |
| publication_date: str | |
| image_url: str | |
| category: str | |
| source: str | |
| async def lifespan(app: FastAPI): | |
| # Start the database connection | |
| await startup_db_client(app) | |
| yield | |
| # Close the database connection | |
| await shutdown_db_client(app) | |
| app = FastAPI(lifespan=lifespan) | |
| origins = [ | |
| "http://localhost:3000", | |
| "http://192.168.1.64:3000", | |
| "https://introlixfeed.vercel.app/", | |
| "https://introlixfeed.vercel.com/" | |
| # Add other allowed origins here if needed | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, # Specify allowed origins | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def index(): | |
| return RedirectResponse(url='/docs') | |
| async def get_feed_data(page: int = 1, limit: int = 20, user_id: str = Query(...), category=None): | |
| try: | |
| skip = (page - 1) * limit | |
| response = get_interests() | |
| user_interests = [] | |
| # getting only the interests not keywords | |
| for interest in response: | |
| user_interests.append(interest['interest']) | |
| users = databases.list_documents( | |
| database_id=APPWRITE_DATABASE_ID, | |
| collection_id=APPWRITE_ACCOUNT_COLLECTION_ID | |
| ) | |
| for doc in users['documents']: | |
| if user_id == doc['$id']: | |
| user_interests = doc['interests'] | |
| user_interests = [item.split(':')[0] for item in user_interests] | |
| # response = await app.mongodb['feedData'].find({"category": {"$in": user_interests}}).skip(skip).limit(limit).to_list(limit) | |
| # Perform the aggregation | |
| if category == None: | |
| response = await app.mongodb['feedData'].find({"category": {"$in": user_interests}}).skip(skip).limit(limit).to_list(limit) | |
| else: | |
| response = await app.mongodb['feedData'].find({"category": category}).skip(skip).limit(limit).to_list(limit) | |
| # random.shuffle(response) | |
| # Filter out items that do not have a title | |
| response = [item for item in response if item.get('title')] | |
| response = [item for item in response if item.get('desc')] | |
| article_titles = [item['title'] for item in response] | |
| recommendation_system = Recommendation(user_interests, article_titles) | |
| recommended_titles = recommendation_system.recommend() | |
| response = [post for post in response if post['title'] in recommended_titles] | |
| for item in response: | |
| item['_id'] = str(item['_id']) | |
| item['title'] = item.get('title') or '' | |
| item['desc'] = item.get('desc') or '' | |
| item['url'] = item.get('url') or '' | |
| item['publication_date'] = item.get('publication_date') or '' | |
| item['image_url'] = item.get('image_url') or '' | |
| item['category'] = item.get('category') or '' | |
| item['source'] = item.get('source') or '' | |
| return response | |
| except Exception as e: | |
| raise CustomException(e, sys) from e | |
| async def get_feed_data(post_id: str = Query(...)): | |
| try: | |
| post_id = ObjectId(post_id) | |
| response = await app.mongodb['feedData'].find_one({"_id": post_id}) | |
| if not response: | |
| raise HTTPException(status_code=404, detail="Post not found") | |
| # Convert _id to string | |
| response["_id"] = str(response["_id"]) | |
| # Check for null values and set defaults if needed | |
| response["desc"] = (response.get("desc") or "No Description")[:90] | |
| response["publication_date"] = response.get("publication_date") or "Unknown Date" | |
| response["image_url"] = response.get("image_url") or "No Image URL" | |
| response["category"] = response.get("category") or "Uncategorized" | |
| response["source"] = response.get("source") or "Unknown Source" | |
| # for item in response: | |
| # item['title'] = item.get('title') or '' | |
| # item['desc'] = item.get('desc') or '' | |
| # item['url'] = item.get('url') or '' | |
| # item['publication_date'] = item.get('publication_date') or '' | |
| # item['image_url'] = item.get('image_url') or '' | |
| # item['category'] = item.get('category') or '' | |
| # item['source'] = item.get('source') or '' | |
| return response | |
| except Exception as e: | |
| raise CustomException(e, sys) from e | |
| async def test_recommendation( | |
| user_interests: list[str] = Query(..., description="Comma-separated list of user interests"), | |
| articles: list[str] = Query(..., description="Comma-separated list of articles") | |
| ): | |
| """ | |
| Test endpoint for recommendations. | |
| Takes user interests and articles as query parameters and returns recommended articles. | |
| """ | |
| # Create a recommendation instance | |
| recommendation = Recommendation(user_interests, articles) | |
| # Get the recommended articles | |
| recommended_articles = recommendation.recommend() | |
| return { | |
| "user_interests": user_interests, | |
| "recommended_articles": recommended_articles, | |
| } | |
| async def get_youtube_videos(query: str = None): | |
| url = "https://www.googleapis.com/youtube/v3/search" | |
| params = { | |
| "key": YOUTUBE_API_KEY, | |
| "part": "snippet", | |
| "q": query or "trending", | |
| "type": "video", | |
| "maxResults": 10, | |
| "order": "viewCount" # You can change this to 'date' for recent uploads | |
| } | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get(url, params=params) | |
| response.raise_for_status() # Raise an error for bad responses | |
| return response.json() | |
| async def get_tags(): | |
| tags = fetch_tags() | |
| return tags | |
| app.include_router(auth.router, prefix="/auth") | |
| app.include_router(run_spider.router, prefix="/spider") | |
| app.include_router(similarity.router, prefix="/feed") | |
| app.include_router(crawler.router) | |
| app.include_router(posts.router) |