File size: 4,219 Bytes
f80e9b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import httpx
import os
from typing import Dict, Any, Optional
from datetime import datetime

LINKEDIN_API_BASE = "https://api.linkedin.com/v2"

class LinkedInService:
    def __init__(self, access_token: str):
        self.access_token = access_token
        self.headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
            "X-Restli-Protocol-Version": "2.0.0"
        }
    
    async def get_user_profile(self) -> Dict[str, Any]:
        """Get LinkedIn user profile"""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{LINKEDIN_API_BASE}/userinfo",
                headers=self.headers,
                timeout=30.0
            )
            response.raise_for_status()
            return response.json()
    
    async def create_post(self, text: str, media_uris: Optional[list] = None) -> Dict[str, Any]:
        """Create a LinkedIn post"""
        # First, register upload if media is provided
        media_urn = None
        if media_uris:
            # Register media upload
            media_urn = await self._register_upload(media_uris[0])
        
        # Create the post
        post_data = {
            "author": f"urn:li:person:{os.getenv('LINKEDIN_PERSON_URN', '')}",
            "lifecycleState": "PUBLISHED",
            "specificContent": {
                "com.linkedin.ugc.ShareContent": {
                    "shareCommentary": {
                        "text": text
                    },
                    "shareMediaCategory": "IMAGE" if media_urn else "NONE"
                }
            },
            "visibility": {
                "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
            }
        }
        
        if media_urn:
            post_data["specificContent"]["com.linkedin.ugc.ShareContent"]["media"] = [
                {
                    "status": "READY",
                    "media": media_urn
                }
            ]
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{LINKEDIN_API_BASE}/ugcPosts",
                headers=self.headers,
                json=post_data,
                timeout=30.0
            )
            response.raise_for_status()
            return response.json()
    
    async def _register_upload(self, image_url: str) -> str:
        """Register an image upload for LinkedIn"""
        # This is a simplified version - actual implementation would upload the image
        # and get a URN back
        async with httpx.AsyncClient() as client:
            # Register upload
            register_response = await client.post(
                f"{LINKEDIN_API_BASE}/assets?action=registerUpload",
                headers=self.headers,
                json={
                    "registerUploadRequest": {
                        "recipes": ["urn:li:digitalmediaRecipe:feedshare-image"],
                        "owner": f"urn:li:person:{os.getenv('LINKEDIN_PERSON_URN', '')}",
                        "serviceRelationships": [
                            {
                                "relationshipType": "OWNER",
                                "identifier": "urn:li:userGeneratedContent"
                            }
                        ]
                    }
                },
                timeout=30.0
            )
            register_response.raise_for_status()
            register_data = register_response.json()
            
            # Upload the image
            upload_url = register_data["value"]["uploadMechanism"]["com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest"]["uploadUrl"]
            asset_urn = register_data["value"]["asset"]
            
            # Download and upload the image
            image_response = await client.get(image_url, timeout=30.0)
            image_response.raise_for_status()
            
            await client.put(
                upload_url,
                content=image_response.content,
                headers={"Content-Type": "application/octet-stream"},
                timeout=30.0
            )
            
            return asset_urn