Update utils/instaloader_utils.py
Browse files- utils/instaloader_utils.py +17 -15
utils/instaloader_utils.py
CHANGED
|
@@ -1,13 +1,12 @@
|
|
| 1 |
import instaloader
|
| 2 |
-
from typing import List, Dict
|
| 3 |
-
import
|
|
|
|
| 4 |
|
| 5 |
# Initialize Instaloader
|
| 6 |
L = instaloader.Instaloader()
|
| 7 |
|
| 8 |
-
|
| 9 |
-
|
| 10 |
-
async def fetch_user_posts(username: str, max_posts: int = 20):
|
| 11 |
try:
|
| 12 |
print(f"Fetching posts for user: {username}")
|
| 13 |
profile = instaloader.Profile.from_username(L.context, username)
|
|
@@ -17,7 +16,7 @@ async def fetch_user_posts(username: str, max_posts: int = 20):
|
|
| 17 |
if count >= max_posts:
|
| 18 |
break
|
| 19 |
posts.append({
|
| 20 |
-
"caption": post.caption,
|
| 21 |
"likes": post.likes,
|
| 22 |
"comments": post.comments,
|
| 23 |
"date": post.date_utc.isoformat(),
|
|
@@ -25,12 +24,13 @@ async def fetch_user_posts(username: str, max_posts: int = 20):
|
|
| 25 |
})
|
| 26 |
|
| 27 |
if not posts:
|
| 28 |
-
|
| 29 |
-
|
| 30 |
-
|
|
|
|
| 31 |
|
| 32 |
except instaloader.exceptions.ProfileNotExistsException:
|
| 33 |
-
raise HTTPException(status_code=404, detail="Profile does not exist.")
|
| 34 |
except instaloader.exceptions.ConnectionException:
|
| 35 |
raise HTTPException(status_code=503, detail="Instagram connection failed. Try again later.")
|
| 36 |
except Exception as e:
|
|
@@ -64,7 +64,8 @@ def find_similar_accounts(username: str, rapidapi_key: str) -> List[str]:
|
|
| 64 |
print(f"API request failed: {e}")
|
| 65 |
return []
|
| 66 |
|
| 67 |
-
|
|
|
|
| 68 |
"""
|
| 69 |
Fetch posts for similar accounts (competitors) using the RapidAPI endpoint.
|
| 70 |
"""
|
|
@@ -78,7 +79,8 @@ def fetch_competitors_posts(username: str, rapidapi_key: str, max_posts: int = 5
|
|
| 78 |
all_posts = []
|
| 79 |
for account in similar_accounts:
|
| 80 |
print(f"Fetching posts for competitor: {account}")
|
| 81 |
-
|
| 82 |
-
|
| 83 |
-
|
| 84 |
-
|
|
|
|
|
|
| 1 |
import instaloader
|
| 2 |
+
from typing import List, Dict
|
| 3 |
+
from fastapi import HTTPException
|
| 4 |
+
import requests
|
| 5 |
|
| 6 |
# Initialize Instaloader
|
| 7 |
L = instaloader.Instaloader()
|
| 8 |
|
| 9 |
+
async def fetch_user_posts(username: str, max_posts: int = 20) -> List[Dict]:
|
|
|
|
|
|
|
| 10 |
try:
|
| 11 |
print(f"Fetching posts for user: {username}")
|
| 12 |
profile = instaloader.Profile.from_username(L.context, username)
|
|
|
|
| 16 |
if count >= max_posts:
|
| 17 |
break
|
| 18 |
posts.append({
|
| 19 |
+
"caption": post.caption or "No Caption",
|
| 20 |
"likes": post.likes,
|
| 21 |
"comments": post.comments,
|
| 22 |
"date": post.date_utc.isoformat(),
|
|
|
|
| 24 |
})
|
| 25 |
|
| 26 |
if not posts:
|
| 27 |
+
print("No posts found.")
|
| 28 |
+
return []
|
| 29 |
+
|
| 30 |
+
return posts
|
| 31 |
|
| 32 |
except instaloader.exceptions.ProfileNotExistsException:
|
| 33 |
+
raise HTTPException(status_code=404, detail=f"Profile '{username}' does not exist.")
|
| 34 |
except instaloader.exceptions.ConnectionException:
|
| 35 |
raise HTTPException(status_code=503, detail="Instagram connection failed. Try again later.")
|
| 36 |
except Exception as e:
|
|
|
|
| 64 |
print(f"API request failed: {e}")
|
| 65 |
return []
|
| 66 |
|
| 67 |
+
|
| 68 |
+
async def fetch_competitors_posts(username: str, rapidapi_key: str, max_posts: int = 50) -> List[Dict]:
|
| 69 |
"""
|
| 70 |
Fetch posts for similar accounts (competitors) using the RapidAPI endpoint.
|
| 71 |
"""
|
|
|
|
| 79 |
all_posts = []
|
| 80 |
for account in similar_accounts:
|
| 81 |
print(f"Fetching posts for competitor: {account}")
|
| 82 |
+
try:
|
| 83 |
+
competitor_posts = await fetch_user_posts(account, max_posts)
|
| 84 |
+
all_posts.extend(competitor_posts)
|
| 85 |
+
except HTTPException as e:
|
| 86 |
+
print(f"Error fetching posts for {account}: {e.detail}")
|