| import os |
| import json |
| import requests |
| import time |
| from pathlib import Path |
| from dotenv import load_dotenv |
|
|
| env_path = Path(__file__).resolve().parent.parent / ".env" |
| load_dotenv(dotenv_path=env_path) |
|
|
| CLIENT_ID = os.getenv("LINKEDIN_CLIENT_ID", "") |
| CLIENT_SECRET = os.getenv("LINKEDIN_CLIENT_SECRET", "") |
| REDIRECT_URI = os.getenv("LINKEDIN_REDIRECT_URI", "http://localhost:8000/linkedin/callback/") |
| SCOPE = "w_member_social profile email openid" |
| API_BASE = "https://api.linkedin.com" |
| AUTH_BASE = "https://www.linkedin.com/oauth/v2" |
|
|
|
|
| def _log(msg): |
| print(f"[LinkedInAPI] {msg}") |
|
|
|
|
| def get_oauth_url(state=""): |
| params = { |
| "response_type": "code", |
| "client_id": CLIENT_ID, |
| "redirect_uri": REDIRECT_URI, |
| "scope": SCOPE, |
| } |
| if state: |
| params["state"] = state |
| qs = "&".join(f"{k}={requests.utils.quote(str(v))}" for k, v in params.items()) |
| return f"{AUTH_BASE}/authorization?{qs}" |
|
|
|
|
| def exchange_code(code): |
| if not CLIENT_ID or not CLIENT_SECRET: |
| return None, "LinkedIn app not configured. Set LINKEDIN_CLIENT_ID and LINKEDIN_CLIENT_SECRET." |
|
|
| data = { |
| "grant_type": "authorization_code", |
| "code": code, |
| "client_id": CLIENT_ID, |
| "client_secret": CLIENT_SECRET, |
| "redirect_uri": REDIRECT_URI, |
| } |
| try: |
| resp = requests.post(f"{AUTH_BASE}/accessToken", data=data, timeout=30) |
| if resp.status_code != 200: |
| _log(f"Token exchange failed ({resp.status_code}): {resp.text[:200]}") |
| return None, f"Failed to get access token: {resp.text[:100]}" |
| body = resp.json() |
| token = body.get("access_token") |
| expires = body.get("expires_in", 86400) |
| if not token: |
| return None, "No access_token in response" |
| _log("Token exchange successful") |
| return {"access_token": token, "expires_at": time.time() + expires}, None |
| except requests.exceptions.RequestException as e: |
| _log(f"Token exchange error: {e}") |
| return None, str(e) |
|
|
|
|
| def get_user_info(access_token): |
| headers = {"Authorization": f"Bearer {access_token}"} |
| try: |
| resp = requests.get(f"{API_BASE}/v2/userinfo", headers=headers, timeout=15) |
| if resp.status_code != 200: |
| _log(f"User info failed ({resp.status_code}): {resp.text[:200]}") |
| return None, "Failed to get user info" |
| body = resp.json() |
| sub = body.get("sub", "") |
| name = body.get("name", "") |
| picture = body.get("picture", "") |
| _log(f"User info: {name} ({sub})") |
| return {"urn": f"urn:li:person:{sub}", "name": name, "picture": picture, "sub": sub}, None |
| except requests.exceptions.RequestException as e: |
| _log(f"User info error: {e}") |
| return None, str(e) |
|
|
|
|
| def create_post(access_token, author_urn, text, hashtags=None, visibility="PUBLIC"): |
| if not access_token or not author_urn: |
| return None, "Missing access_token or author_urn" |
|
|
| body = {"author": author_urn, "lifecycleState": "PUBLISHED", "visibility": visibility} |
|
|
| full_text = text |
| if hashtags: |
| tag_str = " ".join(f"#{h.lstrip('#')}" for h in hashtags[:5]) |
| full_text = f"{text}\n\n{tag_str}" |
|
|
| body["specificContent"] = { |
| "com.linkedin.ugc.ShareContent": { |
| "shareCommentary": {"text": full_text}, |
| "shareMediaCategory": "NONE", |
| } |
| } |
|
|
| headers = { |
| "Authorization": f"Bearer {access_token}", |
| "Content-Type": "application/json", |
| "X-Restli-Protocol-Version": "2.0.0", |
| } |
|
|
| try: |
| resp = requests.post(f"{API_BASE}/v2/ugcPosts", json=body, headers=headers, timeout=30) |
| if resp.status_code in (200, 201): |
| post_id = resp.headers.get("X-RestLi-Id", "") |
| post_url = f"https://www.linkedin.com/feed/update/{post_id}" if post_id else "" |
| _log(f"Post created: {post_id}") |
| return {"post_id": post_id, "post_url": post_url}, None |
|
|
| _log(f"Post failed ({resp.status_code}): {resp.text[:300]}") |
|
|
| if resp.status_code == 401: |
| return None, "Access token expired. Please reconnect LinkedIn." |
| if resp.status_code == 403: |
| return None, "Missing permissions. Re-authenticate with w_member_social scope." |
| if resp.status_code == 429: |
| return None, "Rate limited. Try again later." |
|
|
| detail = "" |
| try: |
| detail = resp.json().get("message", resp.text[:100]) |
| except Exception: |
| detail = resp.text[:100] |
| return None, f"LinkedIn API error: {detail}" |
|
|
| except requests.exceptions.RequestException as e: |
| _log(f"Post request error: {e}") |
| return None, str(e) |
|
|