adgenesis-internal / apis /ourdata.py
userIdc2024's picture
Update apis/ourdata.py
29b66cc verified
import logging
import random
import asyncio
import concurrent.futures
from helpercodes.image_functions import generate_image, _encode_image_to_base64
from helpercodes.ourdata_advanced import get_links, generate_gpt_prompt_our_data, get_dataset_dict
# Configure Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def _process_single_style(style_key, keywords_list, data):
"""
Processes a single style synchronously:
1. Samples keywords.
2. Retrieves relevant images/links from DataFrame.
3. Generates GPT prompt and final image.
4. Builds the final response dictionary (title, body, des, images).
Returns:
A dictionary with { title, body, des, images } or None if there's an issue.
"""
try:
logging.info(f"Processing style: '{style_key}'")
if not keywords_list:
logging.warning(f"No keywords found for style: {style_key}")
return None
sampled_keywords = random.sample(keywords_list, min(len(keywords_list), 5))
input_keywords = ", ".join(sampled_keywords)
# Filter DF to match this style
df_subset = data[data['styles'] == style_key].copy()
if df_subset.empty:
return None
# Get top-2 relevant images from DF
results = get_links(df_subset, input_keywords)
if len(results) < 2:
return None
# Generate GPT prompt and final image
prompt = generate_gpt_prompt_our_data(results)
logging.info(f"Prompt: {prompt}")
image = generate_image(prompt, '1:1', "None")
if image:
logging.info(f"Successfully generated image for style: {style_key}")
else:
logging.warning(f"Image generation failed for style: {style_key}")
return image
except Exception as e:
logging.error(f"Error processing style '{style_key}': {e}", exc_info=True)
return None
async def process_our_data():
"""
Randomly picks 5 keywords from each style (if available),
finds top-2 relevant images, calls GPT for a prompt, and
streams the generated images in a Gradio Gallery.
This version uses multi-threading to process each style concurrently.
"""
try:
logging.info("Processing our data...")
response = []
# Get data, style dictionary
data, style_dict = get_dataset_dict()
# We'll gather coroutines so that each style is processed in a thread
loop = asyncio.get_running_loop()
# Collect async tasks here
tasks = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for style_key, keywords_list in style_dict.items():
# Schedule the synchronous processing function in a separate thread
tasks.append(
loop.run_in_executor(
executor,
_process_single_style, # function to run
style_key, # arguments
keywords_list,
data
)
)
# Await all results
results = await asyncio.gather(*tasks)
# Filter out any None values (cases where we skip)
for r in results:
if r is not None:
response.append(r)
if not response:
logging.warning("No valid images were generated.")
logging.info("process_our_data completed successfully.")
return response
except Exception as e:
logging.error(f"Unexpected error in process_our_data: {e}", exc_info=True)
return []
def process_our_data_sync():
try:
logging.info("Executing process_our_data_sync...")
return asyncio.run(process_our_data())
except RuntimeError as e:
logging.error("RuntimeError in process_our_data_sync: Async event loop issue", exc_info=True)
return []
except Exception as e:
logging.error(f"Unexpected error in process_our_data_sync: {e}", exc_info=True)
return []