wynai commited on
Commit
8e55232
·
verified ·
1 Parent(s): 421371a

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +151 -24
main.py CHANGED
@@ -1,29 +1,156 @@
1
- from fastapi import FastAPI, Request
2
- from pydantic import BaseModel
3
- from typing import Optional
4
- import random
 
 
 
 
 
5
 
6
- app = FastAPI()
7
 
8
- class ImageRequest(BaseModel):
9
  prompt: str
10
- width: Optional[int] = 720
11
- height: Optional[int] = 1280
12
- n: Optional[int] = 1
13
-
14
- @app.post("/v1/images/generations")
15
- async def generate_image(request: ImageRequest):
16
- images = []
17
- for _ in range(request.n):
18
- seed = random.randint(0, 1000000)
19
- url = (
20
- f"https://image.pollinations.ai/prompt/{request.prompt}"
21
- f"?width={request.width}&height={request.height}"
22
- f"&seed={seed}&enhance=true&nologo=true&model=flux"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  )
24
- images.append({"url": url})
25
 
26
- return {
27
- "created": int(random.time()),
28
- "data": images
29
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import fastapi
2
+ import uvicorn
3
+ import httpx
4
+ import time
5
+ import base64
6
+ import urllib.parse
7
+ import os # Thêm import os
8
+ from pydantic import BaseModel, Field
9
+ from typing import List, Optional, Union, Literal
10
 
11
+ # --- Pydantic Models for Request and Response ---
12
 
13
+ class OpenAIImageRequest(BaseModel):
14
  prompt: str
15
+ n: int = Field(default=1, description="Number of images to generate.", ge=1, le=4)
16
+ size: str = Field(default="1024x1024", description="Size of the generated images. e.g., 'widthxheight'.")
17
+ response_format: Optional[Literal['url', 'b64_json']] = "url"
18
+ user: Optional[str] = None
19
+
20
+ # Pollinations specific parameters
21
+ seed_value: Optional[int] = Field(default=None, alias="seed", description="Seed for Pollinations. The query param in Pollinations is 'seed'.")
22
+ model: Optional[str] = Field(default=None, description="Model to use for Pollinations.")
23
+ enhance: Optional[bool] = Field(default=True, description="Enhance parameter for Pollinations.")
24
+ nologo: Optional[bool] = Field(default=True, description="NoLogo parameter for Pollinations.")
25
+
26
+ class Config:
27
+ allow_population_by_field_name = True
28
+
29
+
30
+ class ImageURL(BaseModel):
31
+ url: str
32
+
33
+ class ImageB64(BaseModel):
34
+ b64_json: str
35
+
36
+ class OpenAIImageResponse(BaseModel):
37
+ created: int = Field(default_factory=lambda: int(time.time()))
38
+ data: List[Union[ImageURL, ImageB64]]
39
+
40
+
41
+ # --- FastAPI Application ---
42
+ app = fastapi.FastAPI(
43
+ title="OpenAI-compatible Image Generation API for Pollinations",
44
+ description="This API wraps the image.pollinations.ai service to provide an OpenAI-like interface.",
45
+ version="1.0.0"
46
+ )
47
+
48
+ # --- Helper Functions ---
49
+ def parse_size(size_str: str) -> tuple[Optional[int], Optional[int]]:
50
+ """Parses size string like '1024x768' into (width, height)."""
51
+ parts = size_str.lower().split('x')
52
+ if len(parts) == 2:
53
+ try:
54
+ return int(parts[0]), int(parts[1])
55
+ except ValueError:
56
+ return None, None
57
+ return None, None
58
+
59
+ async def fetch_and_encode_image(client: httpx.AsyncClient, url: str) -> Optional[str]:
60
+ """Fetches an image from a URL and returns its Base64 encoded string."""
61
+ try:
62
+ response = await client.get(url, timeout=60.0)
63
+ response.raise_for_status()
64
+ image_bytes = await response.aread()
65
+ return base64.b64encode(image_bytes).decode('utf-8')
66
+ except httpx.HTTPStatusError as e:
67
+ print(f"HTTP error fetching image from {url}: {e.response.status_code} - {e.response.text}")
68
+ except httpx.RequestError as e:
69
+ print(f"Request error fetching image from {url}: {e}")
70
+ except Exception as e:
71
+ print(f"An unexpected error occurred while fetching/encoding image from {url}: {e}")
72
+ return None
73
+
74
+ # --- API Endpoint ---
75
+ @app.post("/v1/images/generations", response_model=OpenAIImageResponse)
76
+ async def create_image_generation(request: OpenAIImageRequest):
77
+ """
78
+ Mimics the OpenAI image generation endpoint.
79
+ Receives a prompt and other parameters, then calls the Pollinations API.
80
+ """
81
+ pollinations_base_url = "https://image.pollinations.ai/prompt/"
82
+ results_data: List[Union[ImageURL, ImageB64]] = []
83
+
84
+ width, height = parse_size(request.size)
85
+ if not width or not height:
86
+ raise fastapi.HTTPException(
87
+ status_code=400,
88
+ detail="Invalid 'size' format. Expected 'widthxheight', e.g., '1024x1024'."
89
  )
 
90
 
91
+ async with httpx.AsyncClient() as client:
92
+ for _ in range(request.n):
93
+ encoded_prompt = urllib.parse.quote(request.prompt)
94
+ current_pollinations_url_path = f"{pollinations_base_url}{encoded_prompt}"
95
+
96
+ query_params = {}
97
+ if width:
98
+ query_params["width"] = width
99
+ if height:
100
+ query_params["height"] = height
101
+ if request.seed_value is not None:
102
+ query_params["seed"] = request.seed_value
103
+ if request.model:
104
+ query_params["model"] = request.model
105
+ if request.enhance is not None:
106
+ query_params["enhance"] = str(request.enhance).lower()
107
+ if request.nologo is not None:
108
+ query_params["nologo"] = str(request.nologo).lower()
109
+
110
+ if query_params:
111
+ pollinations_image_url = f"{current_pollinations_url_path}?{urllib.parse.urlencode(query_params)}"
112
+ else:
113
+ pollinations_image_url = current_pollinations_url_path
114
+
115
+ print(f"Requesting Pollinations URL: {pollinations_image_url}")
116
+
117
+ if request.response_format == "url":
118
+ results_data.append(ImageURL(url=pollinations_image_url))
119
+ elif request.response_format == "b64_json":
120
+ b64_data = await fetch_and_encode_image(client, pollinations_image_url)
121
+ if b64_data:
122
+ results_data.append(ImageB64(b64_json=b64_data))
123
+ else:
124
+ raise fastapi.HTTPException(
125
+ status_code=500,
126
+ detail=f"Failed to fetch or encode image from Pollinations: {pollinations_image_url}"
127
+ )
128
+ else:
129
+ raise fastapi.HTTPException(status_code=400, detail="Invalid response_format.")
130
+
131
+ if not results_data and request.n > 0:
132
+ raise fastapi.HTTPException(
133
+ status_code=500,
134
+ detail="No images were successfully generated or processed."
135
+ )
136
+
137
+ return OpenAIImageResponse(data=results_data)
138
+
139
+ # --- Main guard for running with Uvicorn ---
140
+ if __name__ == "__main__":
141
+ # Khối này dành cho việc thực thi cục bộ (ví dụ: python main.py)
142
+ # Khi chạy trong Docker trên Hugging Face, CMD trong Dockerfile sẽ được sử dụng.
143
+ port_to_use = 7860 # Mặc định cho HF Spaces và yêu cầu của bạn
144
+ try:
145
+ # Hugging Face Spaces có thể đặt biến môi trường PORT
146
+ port_from_env = os.environ.get("PORT")
147
+ if port_from_env:
148
+ port_to_use = int(port_from_env)
149
+ except ValueError:
150
+ print(f"Warning: Invalid PORT environment variable '{port_from_env}'. Using default port {port_to_use}.")
151
+ except Exception as e:
152
+ print(f"Error reading PORT environment variable: {e}. Using default port {port_to_use}.")
153
+
154
+ print(f"Starting Uvicorn server on host 0.0.0.0, port {port_to_use}")
155
+ # reload=True hữu ích cho phát triển cục bộ, Docker CMD sẽ không dùng reload.
156
+ uvicorn.run("main:app", host="0.0.0.0", port=port_to_use, reload=True)