ReRolls commited on
Commit
ac7fbce
·
verified ·
1 Parent(s): dc65261

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +304 -397
app.py CHANGED
@@ -7,523 +7,430 @@ import os
7
  from PIL import Image
8
  from io import BytesIO
9
  from datetime import datetime
10
- from typing import Tuple, Dict, Optional, Any, List, Union
11
  import logging
12
 
13
  from requests.adapters import HTTPAdapter
14
  from urllib3.util.retry import Retry
 
15
 
16
- # Setup logging
17
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
18
  logger = logging.getLogger(__name__)
19
 
20
-
21
  class DanbooruAPI:
22
- """Class to handle all Danbooru API interactions"""
23
- # Load base URL from environment variable, fallback to default
24
  BASE_URL = os.environ.get('DANBOORU_API_URL', 'https://danbooru.donmai.us')
 
 
 
 
 
25
 
26
  def __init__(self, username: Optional[str] = None, api_key: Optional[str] = None):
27
- self.username = username or os.environ.get('DANBOORU_USERNAME')
28
- self.api_key = api_key or os.environ.get('DANBOORU_API_KEY')
29
  self.session = self._create_retrying_session()
30
 
31
  def _create_retrying_session(self) -> requests.Session:
32
  session = requests.Session()
 
33
  retries = Retry(
34
- total=5,
35
- backoff_factor=1,
36
- status_forcelist=[429, 500, 502, 503, 504],
37
  allowed_methods={"GET"}
38
  )
39
  session.mount('https://', HTTPAdapter(max_retries=retries))
40
- session.mount('http://', HTTPAdapter(max_retries=retries))
41
  return session
42
 
43
  def _get(self, url: str, params: Optional[Dict] = None) -> requests.Response:
44
  auth = (self.username, self.api_key) if self.username and self.api_key else None
45
- return self.session.get(url, params=params, auth=auth)
46
 
47
  @staticmethod
48
  def ensure_https_url(url: str) -> str:
49
- """Ensure URL starts with https"""
50
  if url and not (url.startswith("https") or url.startswith("http")):
51
  return f"https:{url}"
52
  return url
53
 
54
  def fetch_post_by_id(self, post_id: str) -> Tuple[dict, Optional[str]]:
55
- """Fetch a post by its ID"""
56
  try:
57
  response = self._get(f'{self.BASE_URL}/posts/{post_id}.json')
58
  response.raise_for_status()
59
  return response.json(), None
60
- except requests.exceptions.RequestException as e:
61
- logger.error(f"Error fetching post {post_id}: {e}")
62
- return {}, f"Error fetching post: {e}"
63
- except json.JSONDecodeError as e:
64
- logger.error(f"Error decoding JSON for post {post_id}: {e}")
65
- return {}, f"Error decoding JSON: {e}"
66
-
67
- def fetch_random_posts(self, keywords: Optional[str] = None, limit: int = 100) -> Tuple[List[dict], Optional[str]]:
68
- """Fetch random posts based on keywords"""
69
  params = {'limit': limit}
70
  if keywords:
71
  params['tags'] = keywords
 
 
72
  else:
73
- params['random'] = 'true' # True randomness when no keywords are provided
74
 
75
  try:
76
  response = self._get(f'{self.BASE_URL}/posts.json', params=params)
77
  response.raise_for_status()
78
- return response.json(), None
79
- except requests.exceptions.RequestException as e:
80
- logger.error(f"Error fetching random posts with keywords '{keywords}': {e}")
81
- return [], f"Error fetching random posts: {e}"
82
- except json.JSONDecodeError as e:
83
- logger.error(f"Error decoding JSON for random posts with keywords '{keywords}': {e}")
84
- return [], f"Error decoding JSON: {e}"
85
-
86
- @staticmethod
87
- def fetch_image(image_url: str) -> Tuple[Optional[Image.Image], Optional[str]]:
88
- """Fetch an image from URL"""
89
  try:
90
- # Using requests.get directly here as it's typically for public CDN URLs and doesn't need API auth/retries
91
- response = requests.get(image_url, stream=True)
92
  response.raise_for_status()
93
- return Image.open(BytesIO(response.content)), None
 
 
94
  except Exception as e:
95
- logger.error(f"Error loading image from {image_url}: {e}")
96
- return None, f"Error loading image: {e}"
97
 
98
 
99
  class TagProcessor:
100
- """Class to handle tag processing"""
101
-
102
- DEFAULT_CENSOR_TAGS = {'mosaic_censoring', 'bar_censor', 'censored', 'artist_name', 'pixelated'}
 
 
103
 
104
  @staticmethod
105
- def format_text(text: str) -> str:
106
- """Format text by escaping parentheses"""
107
- if not isinstance(text, str):
108
- return ""
109
- return re.sub(r'([()])', r'\\\1', text)
 
 
110
 
111
- @staticmethod
112
- def humanize_text(text: str) -> str:
113
- """Convert underscores to spaces for human-readable output"""
114
- if not isinstance(text, str):
 
 
115
  return ""
116
- return text.replace('_', ' ')
117
 
118
- @staticmethod
119
- def join_tags(tags: str) -> str:
120
- """Join tags with commas"""
121
- if not isinstance(tags, str):
122
- return ""
123
- # Filter out empty strings that might result from splitting or filtering
124
- return ', '.join(filter(None, tags.split()))
125
 
126
- @classmethod
127
- def filter_censor_tags(cls, tags: str, censor_tags: Optional[List[str]] = None) -> str:
128
- """Remove censorship-related tags"""
129
- if not isinstance(tags, str):
130
- return ""
131
- tags_to_censor = set(censor_tags) if censor_tags else cls.DEFAULT_CENSOR_TAGS
132
- return ' '.join([tag for tag in tags.split() if tag not in tags_to_censor])
 
 
 
 
133
 
134
  @classmethod
135
- def process_post_data(cls, data: dict, prompt_template: str = "{character}, {artist}, {origin}, {tags}", user_censor_tags: Optional[List[str]] = None) -> Dict[str, Any]:
136
- """Process post data to extract and format tags and metadata"""
137
- character = data.get('tag_string_character', "")
138
- origin = data.get('tag_string_copyright', "")
139
- general_tags = data.get('tag_string_general', "")
140
- artist_tags = data.get('tag_string_artist', "")
141
- meta_tags = data.get('tag_string_meta', "")
142
-
143
- # Filter and format general tags using configurable censor tags
144
- filtered_general_tags = cls.filter_censor_tags(general_tags, user_censor_tags)
145
-
146
- # Format other tag categories for prompt creation
147
- formatted_character = cls.format_text(character)
148
- formatted_origin = cls.format_text(origin)
149
- formatted_artist = cls.format_text(artist_tags)
150
- formatted_meta = cls.format_text(meta_tags)
151
-
152
- # Join tags for display and prompt
153
- tags_with_commas = cls.join_tags(filtered_general_tags)
154
- artist_tags_with_commas = cls.join_tags(artist_tags)
155
- meta_tags_with_commas = cls.join_tags(meta_tags)
156
-
157
- # Prepare context for prompt template
158
  prompt_context = {
159
- 'character': formatted_character,
160
- 'artist': formatted_artist,
161
- 'origin': formatted_origin,
162
- 'meta': formatted_meta,
163
- 'tags': tags_with_commas,
164
  }
165
 
166
- # Create prompt using the template and available data
167
- prompt = ""
168
  try:
169
  prompt = prompt_template.format(**prompt_context)
170
- # Remove any leading/trailing commas or spaces that might result from empty fields
171
- prompt = re.sub(r'(, )+', ', ', prompt).strip(', ').strip()
172
- except KeyError as e:
173
- logger.warning(f"Prompt template contains invalid key: {e}. Falling back to default prompt.")
174
- # Fallback prompt, similar to original but including artist
175
- default_fallback_prompt = f'{formatted_character}, {formatted_artist}, {formatted_origin}, {tags_with_commas}'
176
- prompt = re.sub(r'(, )+', ', ', default_fallback_prompt).strip(', ').strip()
177
- except Exception as e:
178
- logger.warning(f"Error formatting prompt with template '{prompt_template}': {e}. Falling back to default.")
179
- default_fallback_prompt = f'{formatted_character}, {formatted_artist}, {formatted_origin}, {tags_with_commas}'
180
- prompt = re.sub(r'(, )+', ', ', default_fallback_prompt).strip(', ').strip()
 
 
 
 
181
 
182
-
183
- # Humanize for display
184
  return {
185
- 'character': cls.humanize_text(formatted_character),
186
- 'origin': cls.humanize_text(formatted_origin),
187
- 'artist': cls.humanize_text(artist_tags_with_commas),
188
- 'meta': cls.humanize_text(meta_tags_with_commas),
189
- 'tags': cls.humanize_text(tags_with_commas),
190
- 'prompt': cls.humanize_text(prompt),
191
  'rating': data.get('rating', 'unknown'),
192
  'score': str(data.get('score', 0)),
193
  'created_at': data.get('created_at', 'unknown'),
194
- # Include various image URLs for the processor to pick from
195
- 'file_url': data.get('file_url', ''),
196
- 'large_file_url': data.get('large_file_url', ''),
197
- 'preview_file_url': data.get('preview_file_url', '')
198
  }
199
 
 
 
 
 
 
 
200
 
201
  class FileManager:
202
- """Class to handle file operations"""
 
 
 
203
 
204
  @staticmethod
205
- def save_to_file(data: Dict[str, str], output_dir: str = ".", filename_prefix: str = "danbooru_output", image_url: Optional[str] = None) -> str:
206
- """Save processed data to a file"""
207
  try:
208
- # Create directory if it doesn't exist
209
  os.makedirs(output_dir, exist_ok=True)
 
 
 
 
 
 
 
 
 
 
 
210
 
211
- # Generate filename with current timestamp
212
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
213
- filename = os.path.join(output_dir, f"{filename_prefix}_{timestamp}.txt")
214
-
215
- with open(filename, "a", encoding="utf-8") as file:
216
- file.write(f"--- Entry from {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---\n")
217
- if image_url:
218
- file.write(f"Image Post URL: {image_url}\n")
219
- file.write(f"Character: {data.get('character', 'N/A')}\n")
220
- file.write(f"Origin: {data.get('origin', 'N/A')}\n")
221
- file.write(f"Artist: {data.get('artist', 'N/A')}\n")
222
- file.write(f"Meta Tags: {data.get('meta', 'N/A')}\n")
223
- file.write(f"General Tags: {data.get('tags', 'N/A')}\n")
224
- file.write(f"Rating: {data.get('rating', 'N/A')}\n")
225
- file.write(f"Score: {data.get('score', 'N/A')}\n")
226
- file.write(f"Created At: {data.get('created_at', 'N/A')}\n")
227
- file.write(f"Generated Prompt: {data.get('prompt', 'N/A')}\n\n")
228
- return f"✅ Data saved to '{filename}'"
229
  except Exception as e:
230
- logger.error(f"Error saving to file '{filename}': {e}")
231
- return f"❌ Error saving to file: {e}"
232
 
233
 
234
  class DanbooruProcessor:
235
- """Main class to handle the entire process"""
236
-
237
- def __init__(self):
238
- # API keys are loaded from environment variables in DanbooruAPI.__init__
239
- self.api = DanbooruAPI()
240
  self.tag_processor = TagProcessor()
241
  self.file_manager = FileManager()
242
 
243
- def process_by_id(self, image_id: str, prompt_template: str, user_censor_tags: Optional[List[str]]) -> Tuple[Dict[str, Any], Optional[str]]:
244
- """Process a post by ID"""
245
  post_data, error = self.api.fetch_post_by_id(image_id)
246
- if error:
247
- return {}, error
248
-
249
- # Prioritize large_file_url for display, fallback to file_url, then preview
250
- display_image_url_cdn = self.api.ensure_https_url(
251
- post_data.get('large_file_url') or
252
- post_data.get('file_url') or
253
- post_data.get('preview_file_url', '')
254
- )
255
-
256
- post_page_url = f"{DanbooruAPI.BASE_URL}/posts/{image_id}" # Link to the Danbooru post page
257
 
258
- image, img_error = (None, "No image URL found")
259
- if display_image_url_cdn:
260
- image, img_error = self.api.fetch_image(display_image_url_cdn)
261
- else:
262
- logger.warning(f"No displayable image URL found for post ID {image_id}")
263
-
264
- # Log image fetching errors but don't halt the entire process
265
- if img_error and not image:
266
- logger.warning(f"Could not fetch image for post ID {image_id}: {img_error}")
267
-
268
- processed_data = self.tag_processor.process_post_data(
269
- post_data,
270
- prompt_template=prompt_template,
271
- user_censor_tags=user_censor_tags
272
- )
 
 
 
 
 
 
 
 
 
 
 
 
273
 
274
  return {
275
  **processed_data,
276
- 'image': image,
277
- 'post_page_url': post_page_url, # Link to the Danbooru post page
278
- 'display_image_cdn_url': display_image_url_cdn, # Direct CDN URL for the displayed image
279
  }, None
280
 
281
- def process_random(self, keywords: Optional[str] = None, prompt_template: str = "{character}, {artist}, {origin}, {tags}", user_censor_tags: Optional[List[str]] = None) -> Tuple[Dict[str, Any], Optional[str]]:
282
- """Process a random post based on keywords"""
283
- posts, error = self.api.fetch_random_posts(keywords)
284
- if error:
285
- return {}, error
286
-
287
- if not posts:
288
- return {}, "No posts found with provided keywords."
289
-
290
- random_post = random.choice(posts)
291
- post_id = str(random_post.get('id', ''))
292
- if not post_id:
293
- return {}, "Selected random post has no ID."
294
-
295
- # Now use the process_by_id function to get all details
296
- return self.process_by_id(post_id, prompt_template, user_censor_tags)
297
-
298
- def save_results(self, data: Dict[str, Any], output_dir: str = ".", filename_prefix: str = "danbooru_output") -> str:
299
- """Save results to file"""
300
- save_data = {
301
- k: data.get(k, 'N/A') for k in [
302
- 'character', 'origin', 'artist', 'meta', 'tags', 'prompt',
303
- 'rating', 'score', 'created_at'
304
- ]
305
- }
306
- # Use post_page_url for the saved record, as it's the stable link to the post
307
- return self.file_manager.save_to_file(
308
- save_data,
309
- output_dir=output_dir,
310
- filename_prefix=filename_prefix,
311
- image_url=data.get('post_page_url')
312
- )
313
 
314
 
315
  def process_danbooru(
316
  choice: str,
317
  image_id_input: str,
318
  keywords_input: str,
 
 
319
  prompt_template_input: str,
320
  censor_tags_input: str,
 
321
  output_directory_input: str,
322
- output_filename_prefix_input: str
323
- ) -> Tuple[
324
- Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], Optional[str], # character, origin, artist, meta, tags, prompt
325
- Optional[str], Optional[str], Optional[str], # rating, score, created_at
326
- Optional[str], Optional[str], # post_page_url, message
327
- Optional[Image.Image]
328
- ]:
329
- """Main function to process Danbooru requests for the Gradio interface"""
330
- # Parse censor tags from input string
331
- user_censor_tags = [tag.strip() for tag in censor_tags_input.split(',') if tag.strip()] if censor_tags_input else None
332
-
333
- # Initialize processor. API keys are loaded from environment variables in DanbooruAPI.__init__
334
- processor = DanbooruProcessor()
335
-
336
- # Helper for returning a consistent error state across all 12 outputs
337
  def default_error_return(msg: str):
338
- return (
339
- None, None, None, None, None, None, # character, origin, artist, meta, tags, prompt (6)
340
- None, None, None, # rating, score, created_at (3)
341
- None, msg, # post_page_url, message (2)
342
- None # image_out (1)
343
- ) # Total 12
344
 
345
  try:
346
  if choice == 'Enter Image ID':
347
- if not image_id_input:
348
- return default_error_return("❌ Image ID cannot be empty!")
349
- try:
350
- image_id = str(int(image_id_input)) # Validate it's an integer
351
- except ValueError:
352
- return default_error_return("❌ Image ID must be a valid number!")
353
-
354
- result, error = processor.process_by_id(image_id, prompt_template_input, user_censor_tags)
355
- elif choice == 'Find Random Image':
356
- result, error = processor.process_random(keywords_input, prompt_template_input, user_censor_tags)
357
  else:
358
- return default_error_return("❌ Invalid choice")
359
-
360
- if error:
361
- return default_error_return(f"❌ {error}")
362
-
363
- # Extract results for UI display
364
- character_out = result.get('character', '')
365
- origin_out = result.get('origin', '')
366
- artist_out = result.get('artist', '')
367
- meta_out = result.get('meta', '')
368
- tags_out = result.get('tags', '')
369
- prompt_out = result.get('prompt', '')
370
-
371
- rating_out = result.get('rating', '')
372
- score_out = result.get('score', '')
373
- created_at_out = result.get('created_at', '')
374
-
375
- post_page_url_out = result.get('post_page_url', '')
376
- display_image_cdn_url_out = result.get('display_image_cdn_url', '')
377
- image_out = result.get('image')
378
-
379
- # Save results to file
380
- save_message = processor.save_results(
381
- result,
382
- output_dir=output_directory_input,
383
- filename_prefix=output_filename_prefix_input
384
- )
385
- message_out = save_message
386
 
387
- except Exception as e:
388
- logger.exception("Unexpected error in process_danbooru") # Log exception details
389
- return default_error_return(f"❌ Unexpected error: {e}")
390
 
391
- return (
392
- character_out, origin_out, artist_out, meta_out, tags_out, prompt_out,
393
- rating_out, score_out, created_at_out,
394
- post_page_url_out, message_out,
395
- image_out
396
- )
397
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
398
 
399
- def create_interface():
400
- """Creates the Gradio interface for the Danbooru Tag Explorer."""
401
- # Initial values for advanced settings
402
- default_prompt_template = "{character}, {artist}, {origin}, {tags}"
403
- default_censor_tags_str = ", ".join(TagProcessor.DEFAULT_CENSOR_TAGS)
404
- default_output_dir = "danbooru_outputs"
405
- default_output_filename_prefix = "danbooru_tags"
406
 
407
- with gr.Blocks(theme=gr.themes.Soft(), title="Danbooru Tag Explorer") as iface:
408
- gr.Markdown("""
409
- # 🏷️ Danbooru Tag Explorer
410
 
411
- Extract tags from Danbooru posts by ID or find random images with keywords.
412
- Results will be saved to a text file for your reference.
413
- """)
 
 
 
 
414
 
415
  with gr.Row():
416
  with gr.Column(scale=1):
417
- choice = gr.Radio(
418
- ["Enter Image ID", "Find Random Image"],
419
- label="Choose an option",
420
- value="Find Random Image"
421
- )
422
-
423
  with gr.Group():
424
- image_id = gr.Number(
425
- label="Enter Image ID",
426
- precision=0,
427
- interactive=True,
428
- visible=False # Initially hidden as "Find Random Image" is default
429
- )
430
-
431
- keywords = gr.Textbox(
432
- label="Enter keywords (space-separated; use underscores for multi-word tags; leave blank for any random)",
433
- placeholder="Example: landscape blue_sky 1girl",
434
- visible=True # Initially visible
435
- )
436
-
437
- with gr.Accordion("⚙️ Advanced Settings", open=False):
438
- prompt_template = gr.Textbox(
439
- label="Prompt Template",
440
- value=default_prompt_template,
441
- placeholder="Example: {character}, {artist}, {origin}, {tags}",
442
- info="Available placeholders: {character}, {artist}, {origin}, {meta}, {tags}"
443
- )
444
- censor_tags = gr.Textbox(
445
- label="Censor Tags (comma-separated)",
446
- value=default_censor_tags_str,
447
- placeholder="Example: mosaic_censoring, bar_censor",
448
- info="Tags to be removed from the 'General Tags' section."
449
- )
450
- output_directory = gr.Textbox(
451
- label="Output Directory",
452
- value=default_output_dir,
453
- placeholder="e.g., danbooru_outputs"
454
- )
455
- output_filename_prefix = gr.Textbox(
456
- label="Output Filename Prefix",
457
- value=default_output_filename_prefix,
458
- placeholder="e.g., danbooru_tags"
459
- )
460
-
461
- submit_btn = gr.Button("🔍 Search", variant="primary")
462
 
463
  with gr.Column(scale=2):
464
- with gr.Tab("Results"):
 
 
 
 
 
 
465
  with gr.Row():
466
- image_display = gr.Image(label="Image Preview", type="pil", height=400)
467
-
468
- with gr.Group():
469
- post_page_url_display = gr.Textbox(label="🔗 Danbooru Post URL", show_copy_button=True)
470
- display_image_cdn_url_display = gr.Textbox(label="🖼️ Displayed Image CDN URL", show_copy_button=True)
471
- character = gr.Textbox(label="👤 Character", show_copy_button=True)
472
- origin = gr.Textbox(label="🌍 Origin", show_copy_button=True)
473
- artist = gr.Textbox(label="🎨 Artist", show_copy_button=True)
474
- meta_tags = gr.Textbox(label="⚙️ Meta Tags", show_copy_button=True)
475
-
476
- with gr.Accordion("General Tags", open=False):
477
- tags = gr.Textbox(label="🏷️ General Tags", show_copy_button=True, lines=5)
478
-
479
- prompt = gr.Textbox(
480
- label="✨ Generated Prompt",
481
- show_copy_button=True,
482
- lines=3
483
- )
484
-
485
- with gr.Accordion("Additional Metadata", open=False):
486
- rating = gr.Textbox(label="🔞 Rating", show_copy_button=True)
487
- score = gr.Textbox(label="👍 Score", show_copy_button=True)
488
- created_at = gr.Textbox(label="🗓️ Created At", show_copy_button=True)
489
-
490
- with gr.Tab("Status"):
491
- message = gr.Textbox(label="Status Messages")
492
-
493
- # Logic for showing/hiding input fields based on choice
494
- def update_visibility(selected_choice):
495
- return [
496
- gr.update(visible=(selected_choice == "Enter Image ID")),
497
- gr.update(visible=(selected_choice == "Find Random Image"))
498
- ]
499
-
500
- choice.change(
501
- update_visibility,
502
- inputs=[choice],
503
- outputs=[image_id, keywords]
504
- )
505
 
506
- # Connect submit button to processing function
507
- submit_btn.click(
508
- process_danbooru,
509
- inputs=[
510
- choice, image_id, keywords,
511
- prompt_template, censor_tags, output_directory, output_filename_prefix
512
- ],
513
- outputs=[
514
- character, origin, artist, meta_tags, tags, prompt,
515
- rating, score, created_at,
516
- post_page_url_display, message,
517
- image_display
518
- ]
519
- )
520
 
521
- # Set initial visibility when the interface loads
522
- iface.load(
523
- update_visibility,
524
- inputs=[choice], # Use the default value of the choice radio button
525
- outputs=[image_id, keywords],
526
- queue=False # Not strictly necessary to queue this initial setup
527
  )
528
 
529
  return iface
 
7
  from PIL import Image
8
  from io import BytesIO
9
  from datetime import datetime
10
+ from typing import Tuple, Dict, Optional, Any, List
11
  import logging
12
 
13
  from requests.adapters import HTTPAdapter
14
  from urllib3.util.retry import Retry
15
+ from requests.exceptions import Timeout, RequestException
16
 
17
+ # Configuración de Logging
18
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
19
  logger = logging.getLogger(__name__)
20
 
 
21
  class DanbooruAPI:
 
 
22
  BASE_URL = os.environ.get('DANBOORU_API_URL', 'https://danbooru.donmai.us')
23
+
24
+ HEADERS = {
25
+ 'User-Agent': 'DanbooruTagExplorer/2.2 (Gradio; +http://github.com/yourusername)',
26
+ 'Accept': 'application/json'
27
+ }
28
 
29
  def __init__(self, username: Optional[str] = None, api_key: Optional[str] = None):
30
+ self.username = username if username and username.strip() else os.environ.get('DANBOORU_USERNAME')
31
+ self.api_key = api_key if api_key and api_key.strip() else os.environ.get('DANBOORU_API_KEY')
32
  self.session = self._create_retrying_session()
33
 
34
  def _create_retrying_session(self) -> requests.Session:
35
  session = requests.Session()
36
+ session.headers.update(self.HEADERS)
37
  retries = Retry(
38
+ total=3,
39
+ backoff_factor=0.5,
40
+ status_forcelist=[403, 429, 500, 502, 503, 504],
41
  allowed_methods={"GET"}
42
  )
43
  session.mount('https://', HTTPAdapter(max_retries=retries))
 
44
  return session
45
 
46
  def _get(self, url: str, params: Optional[Dict] = None) -> requests.Response:
47
  auth = (self.username, self.api_key) if self.username and self.api_key else None
48
+ return self.session.get(url, params=params, auth=auth, timeout=10)
49
 
50
  @staticmethod
51
  def ensure_https_url(url: str) -> str:
 
52
  if url and not (url.startswith("https") or url.startswith("http")):
53
  return f"https:{url}"
54
  return url
55
 
56
  def fetch_post_by_id(self, post_id: str) -> Tuple[dict, Optional[str]]:
 
57
  try:
58
  response = self._get(f'{self.BASE_URL}/posts/{post_id}.json')
59
  response.raise_for_status()
60
  return response.json(), None
61
+ except Timeout:
62
+ return {}, "Error: Timeout conectando a Danbooru."
63
+ except RequestException as e:
64
+ return {}, f"Error de red: {e}"
65
+ except json.JSONDecodeError:
66
+ return {}, "Error decodificando JSON."
67
+
68
+ def fetch_random_posts(self, keywords: Optional[str] = None, limit: int = 20) -> Tuple[List[dict], Optional[str]]:
 
69
  params = {'limit': limit}
70
  if keywords:
71
  params['tags'] = keywords
72
+ if 'order:' not in keywords:
73
+ params['tags'] += ' order:random'
74
  else:
75
+ params['random'] = 'true'
76
 
77
  try:
78
  response = self._get(f'{self.BASE_URL}/posts.json', params=params)
79
  response.raise_for_status()
80
+ data = response.json()
81
+ if not isinstance(data, list):
82
+ return [], "Error API: La respuesta no es una lista."
83
+ return data, None
84
+ except Timeout:
85
+ return [], "Error: Búsqueda lenta (Timeout)."
86
+ except RequestException as e:
87
+ return [], f"Error fetching posts: {e}"
88
+
89
+ def fetch_image(self, image_url: str) -> Tuple[Optional[Image.Image], Optional[str]]:
90
+ """Descarga la imagen. Para preview usa timeout corto, para guardado uno más largo."""
91
  try:
92
+ # Timeout de 15s para evitar bloqueos eternos
93
+ response = self.session.get(image_url, stream=True, timeout=20)
94
  response.raise_for_status()
95
+ img = Image.open(BytesIO(response.content))
96
+ img.load()
97
+ return img, None
98
  except Exception as e:
99
+ logger.error(f"Error cargando imagen {image_url}: {e}")
100
+ return None, str(e)
101
 
102
 
103
  class TagProcessor:
104
+ DEFAULT_CENSOR_TAGS = {'mosaic_censoring', 'bar_censor', 'censored', 'artist_name', 'pixelated', 'censor'}
105
+ DEFAULT_BLACKLIST_TAGS = {
106
+ 'highres', 'absurdres', 'commentary', 'translated', 'text_bubble',
107
+ 'speech_bubble', 'comic', 'monochrome', 'greyscale', 'bad_id', 'bad_pixiv_id'
108
+ }
109
 
110
  @staticmethod
111
+ def format_tag(tag: str) -> str:
112
+ """Limpia una sola etiqueta: quita guiones bajos y escapa paréntesis"""
113
+ # 1. Reemplazar guión bajo por espacio
114
+ tag = tag.replace('_', ' ')
115
+ # 2. Escapar paréntesis para el prompt
116
+ tag = re.sub(r'([()])', r'\\\1', tag)
117
+ return tag
118
 
119
+ @classmethod
120
+ def process_category(cls, raw_string: str, is_general: bool, censor: List[str], blacklist: List[str]) -> str:
121
+ """
122
+ Toma un string de tags (ej: 'tag1_a tag2_(b)'), los separa, filtra, formatea y une con comas.
123
+ """
124
+ if not raw_string or not isinstance(raw_string, str):
125
  return ""
 
126
 
127
+ tags_list = raw_string.split()
128
+ processed_tags = []
129
+
130
+ banned = set(censor).union(set(blacklist))
 
 
 
131
 
132
+ for tag in tags_list:
133
+ # Filtrar si está en lista negra (solo para general tags normalmente, pero aplicamos a todo por seguridad)
134
+ if is_general and tag in banned:
135
+ continue
136
+
137
+ # Formatear (humanizar + escapar)
138
+ formatted = cls.format_tag(tag)
139
+ processed_tags.append(formatted)
140
+
141
+ # Unir con coma y espacio
142
+ return ', '.join(processed_tags)
143
 
144
  @classmethod
145
+ def process_post_data(cls, data: dict, prompt_template: str, user_censor_tags: List[str], user_blacklist_tags: List[str]) -> Dict[str, Any]:
146
+ # Obtener strings crudos (separados por espacios)
147
+ raw_char = data.get('tag_string_character', "")
148
+ raw_copy = data.get('tag_string_copyright', "")
149
+ raw_artist = data.get('tag_string_artist', "")
150
+ raw_meta = data.get('tag_string_meta', "")
151
+ raw_general = data.get('tag_string_general', "")
152
+
153
+ # Procesar cada categoría: Separa -> Limpia -> Une con comas
154
+ p_char = cls.process_category(raw_char, False, user_censor_tags, user_blacklist_tags)
155
+ p_origin = cls.process_category(raw_copy, False, user_censor_tags, user_blacklist_tags)
156
+ p_artist = cls.process_category(raw_artist, False, user_censor_tags, user_blacklist_tags)
157
+ p_meta = cls.process_category(raw_meta, False, user_censor_tags, user_blacklist_tags)
158
+ p_tags = cls.process_category(raw_general, True, user_censor_tags, user_blacklist_tags)
159
+
 
 
 
 
 
 
 
 
160
  prompt_context = {
161
+ 'character': p_char,
162
+ 'artist': p_artist,
163
+ 'origin': p_origin,
164
+ 'meta': p_meta,
165
+ 'tags': p_tags,
166
  }
167
 
168
+ # Generar Prompt
 
169
  try:
170
  prompt = prompt_template.format(**prompt_context)
171
+ # Limpieza final de comas fantasmas generadas por campos vacíos
172
+ prompt = re.sub(r',\s*,', ',', prompt) # ", ," -> ","
173
+ prompt = re.sub(r'\s+,', ',', prompt) # " ," -> ","
174
+ prompt = re.sub(r'^,\s*', '', prompt) # Inicio sucio
175
+ prompt = re.sub(r',\s*$', '', prompt) # Final sucio
176
+ prompt = re.sub(r',\s+', ', ', prompt) # Asegurar espacio tras coma
177
+ prompt = prompt.strip()
178
+ except Exception:
179
+ # Fallback simple
180
+ parts = [p_char, p_origin, p_artist, p_tags]
181
+ prompt = ', '.join([p for p in parts if p])
182
+
183
+ # URLs
184
+ file_url = data.get('file_url') or data.get('large_file_url')
185
+ preview_url = data.get('preview_file_url') or file_url # Fallback al grande si no hay pequeño
186
 
 
 
187
  return {
188
+ 'character': p_char.replace('\\', ''), # Sin escapes para display
189
+ 'origin': p_origin.replace('\\', ''),
190
+ 'artist': p_artist.replace('\\', ''),
191
+ 'meta': p_meta.replace('\\', ''),
192
+ 'tags': p_tags.replace('\\', ''),
193
+ 'prompt': prompt,
194
  'rating': data.get('rating', 'unknown'),
195
  'score': str(data.get('score', 0)),
196
  'created_at': data.get('created_at', 'unknown'),
197
+ 'full_file_url': cls.ensure_https(data.get('large_file_url') or data.get('file_url')),
198
+ 'preview_file_url': cls.ensure_https(preview_url)
 
 
199
  }
200
 
201
+ @staticmethod
202
+ def ensure_https(url):
203
+ if url and not url.startswith(('http', 'https')):
204
+ return f"https:{url}"
205
+ return url
206
+
207
 
208
  class FileManager:
209
+ @staticmethod
210
+ def get_filename(output_dir: str, prefix: str, ext: str) -> str:
211
+ timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
212
+ return os.path.join(output_dir, f"{prefix}_{timestamp}.{ext}")
213
 
214
  @staticmethod
215
+ def save_data_file(data: Dict[str, str], output_dir: str, filename_prefix: str) -> str:
 
216
  try:
 
217
  os.makedirs(output_dir, exist_ok=True)
218
+ filename = FileManager.get_filename(output_dir, filename_prefix, "txt")
219
+
220
+ with open(filename, "w", encoding="utf-8") as file:
221
+ file.write(f"--- Danbooru Data {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ---\n")
222
+ file.write(f"Post URL: {data.get('post_page_url', 'N/A')}\n")
223
+ file.write(f"Prompt: {data.get('prompt', '')}\n\n")
224
+ file.write(f"Tags: {data.get('tags', '')}\n")
225
+ file.write(f"Character: {data.get('character', '')}\n")
226
+ return f"📝 Texto guardado en '{filename}'"
227
+ except Exception as e:
228
+ return f"❌ Error texto: {e}"
229
 
230
+ @staticmethod
231
+ def save_image_locally(api: DanbooruAPI, image_url: str, output_dir: str, filename_prefix: str) -> str:
232
+ """Descarga la imagen HD en el momento del guardado"""
233
+ if not image_url: return "❌ No hay URL de imagen HD para descargar."
234
+
235
+ try:
236
+ os.makedirs(output_dir, exist_ok=True)
237
+ filename = FileManager.get_filename(output_dir, filename_prefix, "png") # O jpg según corresponda
238
+
239
+ # Usamos la API para descargar (reutiliza sesión y headers)
240
+ img, err = api.fetch_image(image_url)
241
+ if err or not img:
242
+ return f" Falló descarga HD: {err}"
243
+
244
+ img.save(filename)
245
+ return f"🖼️ Imagen HD guardada en '{filename}'"
 
 
246
  except Exception as e:
247
+ return f"Error guardando imagen: {e}"
 
248
 
249
 
250
  class DanbooruProcessor:
251
+ def __init__(self, username: str, api_key: str):
252
+ self.api = DanbooruAPI(username, api_key)
 
 
 
253
  self.tag_processor = TagProcessor()
254
  self.file_manager = FileManager()
255
 
256
+ def process_by_id(self, image_id: str, template: str, censor: List[str], blacklist: List[str]) -> Tuple[Dict[str, Any], Optional[str]]:
 
257
  post_data, error = self.api.fetch_post_by_id(image_id)
258
+ if error: return {}, error
259
+ if 'id' not in post_data: return {}, "Post no encontrado."
260
+ return self._finalize(post_data, template, censor, blacklist)
 
 
 
 
 
 
 
 
261
 
262
+ def process_random(self, keywords: str, template: str, censor: List[str], blacklist: List[str]) -> Tuple[Dict[str, Any], Optional[str]]:
263
+ posts, error = self.api.fetch_random_posts(keywords)
264
+ if error: return {}, error
265
+ if not posts: return {}, "No se encontraron posts."
266
+
267
+ # Intentar buscar uno que tenga imagen válida
268
+ valid_posts = [p for p in posts if 'preview_file_url' in p or 'file_url' in p]
269
+ if not valid_posts: return {}, "Posts encontrados pero sin URLs de imagen válidas."
270
+
271
+ random_post = random.choice(valid_posts)
272
+ return self._finalize(random_post, template, censor, blacklist)
273
+
274
+ def _finalize(self, post_data: dict, template: str, censor: List[str], blacklist: List[str]) -> Tuple[Dict[str, Any], Optional[str]]:
275
+ # Procesar texto (Rápido)
276
+ processed_data = self.tag_processor.process_post_data(post_data, template, censor, blacklist)
277
+
278
+ post_id = post_data.get('id')
279
+ post_page_url = f"{DanbooruAPI.BASE_URL}/posts/{post_id}"
280
+
281
+ # OPTIMIZACIÓN: Usar preview para la UI (Rápido)
282
+ ui_image_url = processed_data.get('preview_file_url')
283
+ image = None
284
+
285
+ if ui_image_url:
286
+ # Descargar solo la miniatura para mostrar en pantalla
287
+ image, err = self.api.fetch_image(ui_image_url)
288
+ if err: logger.warning(f"Fallo preview: {err}")
289
 
290
  return {
291
  **processed_data,
292
+ 'image': image, # Objeto PIL pequeño
293
+ 'post_page_url': post_page_url,
 
294
  }, None
295
 
296
+ def save_results(self, data: Dict[str, Any], out_dir: str, prefix: str, save_img_bool: bool) -> str:
297
+ # Guardar Texto
298
+ msg_text = self.file_manager.save_data_file(data, out_dir, prefix)
299
+
300
+ msg_img = ""
301
+ if save_img_bool:
302
+ # AQUÍ descargamos la imagen GRANDE solo si el usuario quiere guardarla
303
+ full_url = data.get('full_file_url')
304
+ if full_url:
305
+ msg_img = self.file_manager.save_image_locally(self.api, full_url, out_dir, prefix)
306
+ else:
307
+ msg_img = "⚠️ No se encontró URL de alta calidad para descargar."
308
+
309
+ return f"{msg_text}\n{msg_img}".strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
310
 
311
 
312
  def process_danbooru(
313
  choice: str,
314
  image_id_input: str,
315
  keywords_input: str,
316
+ api_username: str,
317
+ api_key: str,
318
  prompt_template_input: str,
319
  censor_tags_input: str,
320
+ blacklist_tags_input: str,
321
  output_directory_input: str,
322
+ output_filename_prefix_input: str,
323
+ save_image_toggle: bool
324
+ ):
325
+ def parse_csv(text):
326
+ return [t.strip() for t in text.split(',') if t.strip()] if text else []
327
+
328
+ user_censor = parse_csv(censor_tags_input)
329
+ user_blacklist = parse_csv(blacklist_tags_input)
330
+
331
+ processor = DanbooruProcessor(api_username, api_key)
332
+
333
+ # 13 salidas
 
 
 
334
  def default_error_return(msg: str):
335
+ return (None,) * 11 + (msg, None)
 
 
 
 
 
336
 
337
  try:
338
  if choice == 'Enter Image ID':
339
+ if not image_id_input: return default_error_return("❌ ID vacío")
340
+ result, error = processor.process_by_id(str(int(image_id_input)), prompt_template_input, user_censor, user_blacklist)
 
 
 
 
 
 
 
 
341
  else:
342
+ result, error = processor.process_random(keywords_input, prompt_template_input, user_censor, user_blacklist)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
343
 
344
+ if error: return default_error_return(f"❌ {error}")
 
 
345
 
346
+ # Guardar (Esto puede tardar un poco más si "Guardar Imagen" está activo, pero la búsqueda ya terminó)
347
+ save_msg = processor.save_results(
348
+ result, output_directory_input, output_filename_prefix_input, save_image_toggle
349
+ )
 
 
350
 
351
+ return (
352
+ result.get('character', ''),
353
+ result.get('origin', ''),
354
+ result.get('artist', ''),
355
+ result.get('meta', ''),
356
+ result.get('tags', ''),
357
+ result.get('prompt', ''),
358
+ result.get('rating', ''),
359
+ result.get('score', ''),
360
+ result.get('created_at', ''),
361
+ result.get('post_page_url', ''),
362
+ result.get('full_file_url', ''), # Mostramos link a la full
363
+ save_msg,
364
+ result.get('image') # Preview para UI
365
+ )
366
 
367
+ except Exception as e:
368
+ logger.exception("Error")
369
+ return default_error_return(f"❌ Error Inesperado: {e}")
 
 
 
 
370
 
 
 
 
371
 
372
+ def create_interface():
373
+ default_template = "{character}, {artist}, {origin}, {tags}"
374
+ default_censor = ", ".join(TagProcessor.DEFAULT_CENSOR_TAGS)
375
+ default_blacklist = ", ".join(TagProcessor.DEFAULT_BLACKLIST_TAGS)
376
+
377
+ with gr.Blocks(theme=gr.themes.Soft(), title="Danbooru Tag Explorer V2.2") as iface:
378
+ gr.Markdown("# 🏷️ Danbooru Tag Explorer V2.2 (Optimized)")
379
 
380
  with gr.Row():
381
  with gr.Column(scale=1):
382
+ choice = gr.Radio(["Enter Image ID", "Find Random Image"], label="Modo", value="Find Random Image")
383
+
 
 
 
 
384
  with gr.Group():
385
+ image_id = gr.Number(label="ID Imagen", precision=0, visible=False)
386
+ keywords = gr.Textbox(label="Keywords", placeholder="Ej: 1girl blue_hair")
387
+
388
+ with gr.Accordion("🔑 Credenciales", open=False):
389
+ api_user = gr.Textbox(label="Usuario", placeholder="Opcional")
390
+ api_key = gr.Textbox(label="API Key", type="password", placeholder="Opcional")
391
+
392
+ with gr.Accordion("🛡️ Filtros", open=True):
393
+ censor_tags = gr.Textbox(label="Censura", value=default_censor)
394
+ blacklist_tags = gr.Textbox(label="Blacklist", value=default_blacklist)
395
+
396
+ with gr.Accordion("💾 Guardado", open=True):
397
+ save_image_chk = gr.Checkbox(label="Descargar Imagen Original (Lento)", value=False)
398
+ out_dir = gr.Textbox(label="Carpeta", value="danbooru_outputs")
399
+ out_prefix = gr.Textbox(label="Prefijo", value="img")
400
+ prompt_template = gr.Textbox(label="Template", value=default_template)
401
+
402
+ btn = gr.Button("🔍 Buscar", variant="primary", size="lg")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
403
 
404
  with gr.Column(scale=2):
405
+ with gr.Tab("Vista Previa"):
406
+ img_display = gr.Image(label="Preview (Baja Resolución)", height=500, type="pil")
407
+ status_msg = gr.Textbox(label="Log", lines=2)
408
+
409
+ with gr.Tab("Datos"):
410
+ prompt = gr.Textbox(label="Prompt Final", show_copy_button=True, lines=3)
411
+ tags = gr.Textbox(label="Tags", show_copy_button=True, lines=4)
412
  with gr.Row():
413
+ char = gr.Textbox(label="Personaje")
414
+ origin = gr.Textbox(label="Origen")
415
+ with gr.Row():
416
+ artist = gr.Textbox(label="Artista")
417
+ meta = gr.Textbox(label="Meta")
418
+ with gr.Row():
419
+ rating = gr.Textbox(label="Rating")
420
+ score = gr.Textbox(label="Score")
421
+ created = gr.Textbox(label="Fecha")
422
+ post_url = gr.Textbox(label="Link Post", show_copy_button=True)
423
+ full_url = gr.Textbox(label="Link HD", show_copy_button=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
424
 
425
+ def update_vis(val):
426
+ return [gr.update(visible=(val == "Enter Image ID")), gr.update(visible=(val == "Find Random Image"))]
427
+
428
+ choice.change(update_vis, inputs=choice, outputs=[image_id, keywords])
 
 
 
 
 
 
 
 
 
 
429
 
430
+ btn.click(
431
+ process_danbooru,
432
+ inputs=[choice, image_id, keywords, api_user, api_key, prompt_template, censor_tags, blacklist_tags, out_dir, out_prefix, save_image_chk],
433
+ outputs=[char, origin, artist, meta, tags, prompt, rating, score, created, post_url, full_url, status_msg, img_display]
 
 
434
  )
435
 
436
  return iface