key_word_Fast_API / services /linkedin_api.py
ihtesham0345's picture
feat: Add LinkedIn API posting + enhanced analyzer + schema
639b959
Raw
History Blame Contribute Delete
4.72 kB
import os
import json
import requests
import time
from pathlib import Path
from dotenv import load_dotenv
env_path = Path(__file__).resolve().parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
CLIENT_ID = os.getenv("LINKEDIN_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("LINKEDIN_CLIENT_SECRET", "")
REDIRECT_URI = os.getenv("LINKEDIN_REDIRECT_URI", "http://localhost:8000/linkedin/callback/")
SCOPE = "w_member_social profile email openid"
API_BASE = "https://api.linkedin.com"
AUTH_BASE = "https://www.linkedin.com/oauth/v2"
def _log(msg):
print(f"[LinkedInAPI] {msg}")
def get_oauth_url(state=""):
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": SCOPE,
}
if state:
params["state"] = state
qs = "&".join(f"{k}={requests.utils.quote(str(v))}" for k, v in params.items())
return f"{AUTH_BASE}/authorization?{qs}"
def exchange_code(code):
if not CLIENT_ID or not CLIENT_SECRET:
return None, "LinkedIn app not configured. Set LINKEDIN_CLIENT_ID and LINKEDIN_CLIENT_SECRET."
data = {
"grant_type": "authorization_code",
"code": code,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uri": REDIRECT_URI,
}
try:
resp = requests.post(f"{AUTH_BASE}/accessToken", data=data, timeout=30)
if resp.status_code != 200:
_log(f"Token exchange failed ({resp.status_code}): {resp.text[:200]}")
return None, f"Failed to get access token: {resp.text[:100]}"
body = resp.json()
token = body.get("access_token")
expires = body.get("expires_in", 86400)
if not token:
return None, "No access_token in response"
_log("Token exchange successful")
return {"access_token": token, "expires_at": time.time() + expires}, None
except requests.exceptions.RequestException as e:
_log(f"Token exchange error: {e}")
return None, str(e)
def get_user_info(access_token):
headers = {"Authorization": f"Bearer {access_token}"}
try:
resp = requests.get(f"{API_BASE}/v2/userinfo", headers=headers, timeout=15)
if resp.status_code != 200:
_log(f"User info failed ({resp.status_code}): {resp.text[:200]}")
return None, "Failed to get user info"
body = resp.json()
sub = body.get("sub", "")
name = body.get("name", "")
picture = body.get("picture", "")
_log(f"User info: {name} ({sub})")
return {"urn": f"urn:li:person:{sub}", "name": name, "picture": picture, "sub": sub}, None
except requests.exceptions.RequestException as e:
_log(f"User info error: {e}")
return None, str(e)
def create_post(access_token, author_urn, text, hashtags=None, visibility="PUBLIC"):
if not access_token or not author_urn:
return None, "Missing access_token or author_urn"
body = {"author": author_urn, "lifecycleState": "PUBLISHED", "visibility": visibility}
full_text = text
if hashtags:
tag_str = " ".join(f"#{h.lstrip('#')}" for h in hashtags[:5])
full_text = f"{text}\n\n{tag_str}"
body["specificContent"] = {
"com.linkedin.ugc.ShareContent": {
"shareCommentary": {"text": full_text},
"shareMediaCategory": "NONE",
}
}
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"X-Restli-Protocol-Version": "2.0.0",
}
try:
resp = requests.post(f"{API_BASE}/v2/ugcPosts", json=body, headers=headers, timeout=30)
if resp.status_code in (200, 201):
post_id = resp.headers.get("X-RestLi-Id", "")
post_url = f"https://www.linkedin.com/feed/update/{post_id}" if post_id else ""
_log(f"Post created: {post_id}")
return {"post_id": post_id, "post_url": post_url}, None
_log(f"Post failed ({resp.status_code}): {resp.text[:300]}")
if resp.status_code == 401:
return None, "Access token expired. Please reconnect LinkedIn."
if resp.status_code == 403:
return None, "Missing permissions. Re-authenticate with w_member_social scope."
if resp.status_code == 429:
return None, "Rate limited. Try again later."
detail = ""
try:
detail = resp.json().get("message", resp.text[:100])
except Exception:
detail = resp.text[:100]
return None, f"LinkedIn API error: {detail}"
except requests.exceptions.RequestException as e:
_log(f"Post request error: {e}")
return None, str(e)