Spaces:
Sleeping
Sleeping
| import httpx | |
| import os | |
| from typing import Dict, Any, Optional | |
| from datetime import datetime | |
| LINKEDIN_API_BASE = "https://api.linkedin.com/v2" | |
| class LinkedInService: | |
| def __init__(self, access_token: str): | |
| self.access_token = access_token | |
| self.headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json", | |
| "X-Restli-Protocol-Version": "2.0.0" | |
| } | |
| async def get_user_profile(self) -> Dict[str, Any]: | |
| """Get LinkedIn user profile""" | |
| async with httpx.AsyncClient() as client: | |
| response = await client.get( | |
| f"{LINKEDIN_API_BASE}/userinfo", | |
| headers=self.headers, | |
| timeout=30.0 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| async def create_post(self, text: str, media_uris: Optional[list] = None) -> Dict[str, Any]: | |
| """Create a LinkedIn post""" | |
| # First, register upload if media is provided | |
| media_urn = None | |
| if media_uris: | |
| # Register media upload | |
| media_urn = await self._register_upload(media_uris[0]) | |
| # Create the post | |
| post_data = { | |
| "author": f"urn:li:person:{os.getenv('LINKEDIN_PERSON_URN', '')}", | |
| "lifecycleState": "PUBLISHED", | |
| "specificContent": { | |
| "com.linkedin.ugc.ShareContent": { | |
| "shareCommentary": { | |
| "text": text | |
| }, | |
| "shareMediaCategory": "IMAGE" if media_urn else "NONE" | |
| } | |
| }, | |
| "visibility": { | |
| "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC" | |
| } | |
| } | |
| if media_urn: | |
| post_data["specificContent"]["com.linkedin.ugc.ShareContent"]["media"] = [ | |
| { | |
| "status": "READY", | |
| "media": media_urn | |
| } | |
| ] | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{LINKEDIN_API_BASE}/ugcPosts", | |
| headers=self.headers, | |
| json=post_data, | |
| timeout=30.0 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| async def _register_upload(self, image_url: str) -> str: | |
| """Register an image upload for LinkedIn""" | |
| # This is a simplified version - actual implementation would upload the image | |
| # and get a URN back | |
| async with httpx.AsyncClient() as client: | |
| # Register upload | |
| register_response = await client.post( | |
| f"{LINKEDIN_API_BASE}/assets?action=registerUpload", | |
| headers=self.headers, | |
| json={ | |
| "registerUploadRequest": { | |
| "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"], | |
| "owner": f"urn:li:person:{os.getenv('LINKEDIN_PERSON_URN', '')}", | |
| "serviceRelationships": [ | |
| { | |
| "relationshipType": "OWNER", | |
| "identifier": "urn:li:userGeneratedContent" | |
| } | |
| ] | |
| } | |
| }, | |
| timeout=30.0 | |
| ) | |
| register_response.raise_for_status() | |
| register_data = register_response.json() | |
| # Upload the image | |
| upload_url = register_data["value"]["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"] | |
| asset_urn = register_data["value"]["asset"] | |
| # Download and upload the image | |
| image_response = await client.get(image_url, timeout=30.0) | |
| image_response.raise_for_status() | |
| await client.put( | |
| upload_url, | |
| content=image_response.content, | |
| headers={"Content-Type": "application/octet-stream"}, | |
| timeout=30.0 | |
| ) | |
| return asset_urn | |