GeorgeSherif commited on
Commit
179fca4
·
1 Parent(s): d34d332
Files changed (1) hide show
  1. app.py +62 -56
app.py CHANGED
@@ -4,6 +4,8 @@ import threading
4
  import random
5
  from datasets import load_dataset, Dataset, Features, Value, concatenate_datasets
6
  from huggingface_hub import login
 
 
7
 
8
  # Authenticate with Hugging Face
9
  token = os.getenv("HUGGINGFACE_TOKEN")
@@ -13,62 +15,70 @@ else:
13
  print("HUGGINGFACE_TOKEN environment variable not set.")
14
  dataset_name = "GeorgeIbrahim/EGYCOCO" # Replace with your dataset name
15
 
16
- # Load the existing dataset or create it if not available
17
  try:
18
  dataset = load_dataset(dataset_name, split="train")
19
  print("Loaded existing dataset:", dataset)
20
  except Exception as e:
21
- print("Failed to load dataset:", e)
22
- dataset = None
23
-
24
- # Check if "annotation_count" exists, if not, add it
25
- if dataset is not None:
26
- if "annotation_count" not in dataset.column_names:
27
- # Define the updated features with annotation_count added
28
- features = dataset.features.copy()
29
- features["annotation_count"] = Value(dtype="int32")
30
-
31
- # Update dataset with new feature, initializing annotation_count based on existing annotations
32
- dataset = dataset.map(
33
- lambda row: {"annotation_count": 1 if "val" in row["image_id"] else 0},
34
- features=features
35
- )
36
-
37
- # Push the updated dataset with the new feature to Hugging Face Hub
38
- dataset.push_to_hub(dataset_name)
39
- print("Updated dataset with annotation_count and pushed to Hub")
40
 
41
  image_folder = "images"
42
  image_files = [f for f in os.listdir(image_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
43
  lock = threading.Lock()
44
 
45
- # Function to get a random image that hasn’t been fully annotated
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
46
  def get_next_image(session_data):
47
  with lock:
48
- # Retrieve set of annotated images with counts
49
- annotated_images = {item["image_id"]: item["annotation_count"] for item in dataset}
50
-
51
- # Available images filter
52
- available_images = [
53
- img for img in image_files
54
- if img not in annotated_images or
55
- ("val" in img and annotated_images[img] < 2) or
56
- ("val" not in img and annotated_images[img] == 0)
57
- ]
58
-
59
  # Check if the user already has an image
60
  if session_data["current_image"] is None and available_images:
61
  # Assign a new random image to the user
62
  session_data["current_image"] = random.choice(available_images)
63
-
64
  return os.path.join(image_folder, session_data["current_image"]) if session_data["current_image"] else None
65
 
66
- # Function to save the annotation to the Hugging Face dataset and fetch the next image
67
  def save_annotation(caption, session_data):
68
- global dataset # Declare global at the start of the function
69
 
70
  if session_data["current_image"] is None:
71
- return gr.update(visible=False), gr.update(value="All images have been annotated!")
72
 
73
  with lock:
74
  image_id = session_data["current_image"]
@@ -79,7 +89,8 @@ def save_annotation(caption, session_data):
79
 
80
  # Check if image is already in dataset to update count
81
  existing_image = dataset.filter(lambda x: x["image_id"] == image_id)
82
- if len(existing_image):
 
83
  annotation_count = existing_image[0]["annotation_count"]
84
  else:
85
  annotation_count = 0
@@ -88,56 +99,51 @@ def save_annotation(caption, session_data):
88
  new_data = Dataset.from_dict({
89
  "image_id": [image_id],
90
  "caption": [caption],
91
- "annotation_count": [annotation_count + 1]
92
  })
93
 
 
94
  dataset = concatenate_datasets([dataset, new_data])
95
-
96
- # Save updated dataset to Hugging Face
97
  dataset.push_to_hub(dataset_name)
98
  print("Pushed updated dataset")
99
 
100
- # Clear user's current image if the validation image has been annotated five times
101
- if ("val" not in image_id) or (annotation_count + 1 >= 5):
102
- session_data["current_image"] = None
103
 
104
  # Fetch the next image
105
  next_image = get_next_image(session_data)
106
  if next_image:
107
- return gr.update(value=next_image), gr.update(value="")
 
108
  else:
109
- return gr.update(visible=False), gr.update(value="All images have been annotated!")
110
-
111
- # Function to skip the current image
112
- def skip_image(session_data):
113
- return save_annotation("skip", session_data)
114
 
115
- # Function to initialize the interface
116
  def initialize_interface(session_data):
117
  next_image = get_next_image(session_data)
118
  if next_image:
119
- return gr.update(value=next_image), gr.update(value="")
 
 
120
  else:
121
  return gr.update(visible=False), gr.update(value="All images have been annotated!")
122
 
123
  # Build the Gradio interface
124
  with gr.Blocks() as demo:
125
  gr.Markdown("# Image Captioning Tool")
126
- gr.Markdown("Please provide a caption for each image displayed. Click 'Submit' after writing your caption, or type 'skip' if you don’t want to annotate this image.")
127
 
128
  session_data = gr.State({"current_image": None}) # Session-specific state
129
 
130
  with gr.Row():
131
  image = gr.Image()
132
  caption = gr.Textbox(placeholder="Enter caption here...")
 
133
  submit = gr.Button("Submit")
134
- skip = gr.Button("Skip") # Skip button
135
 
136
  # Define actions for buttons
137
- submit.click(fn=save_annotation, inputs=[caption, session_data], outputs=[image, caption])
138
- skip.click(fn=skip_image, inputs=session_data, outputs=[image, caption])
139
 
140
  # Load initial image
141
- demo.load(fn=initialize_interface, inputs=session_data, outputs=[image, caption])
142
 
143
  demo.launch(share=True)
 
4
  import random
5
  from datasets import load_dataset, Dataset, Features, Value, concatenate_datasets
6
  from huggingface_hub import login
7
+ import json
8
+ import re
9
 
10
  # Authenticate with Hugging Face
11
  token = os.getenv("HUGGINGFACE_TOKEN")
 
15
  print("HUGGINGFACE_TOKEN environment variable not set.")
16
  dataset_name = "GeorgeIbrahim/EGYCOCO" # Replace with your dataset name
17
 
18
+ # Load or create the dataset
19
  try:
20
  dataset = load_dataset(dataset_name, split="train")
21
  print("Loaded existing dataset:", dataset)
22
  except Exception as e:
23
+ # Create an empty dataset if it doesn't exist
24
+ features = Features({
25
+ 'image_id': Value(dtype='string'),
26
+ 'caption': Value(dtype='string'),
27
+ 'annotation_count': Value(dtype='int32') # Add annotation count feature
28
+ })
29
+ dataset = Dataset.from_dict({'image_id': [], 'caption': [], 'annotation_count': []}, features=features)
30
+ dataset.push_to_hub(dataset_name) # Push the empty dataset to Hugging Face
 
 
 
 
 
 
 
 
 
 
 
31
 
32
  image_folder = "images"
33
  image_files = [f for f in os.listdir(image_folder) if f.endswith(('.png', '.jpg', '.jpeg'))]
34
  lock = threading.Lock()
35
 
36
+ with open('nearest_neighbors_with_captions.json', 'r') as f:
37
+ results = json.load(f)
38
+
39
+ def get_caption_for_image_id(image_path):
40
+ """
41
+ Retrieve the caption for a given image_id from the JSON data.
42
+ """
43
+ # Extract the numeric part of the image ID
44
+ match = re.search(r'_(\d+)\.', image_path)
45
+ if match:
46
+ image_id = match.group(1).lstrip('0') # Remove leading zeros
47
+ print("Searching for image_id:", image_id) # Debugging line
48
+
49
+ # Check if image_id is a test image
50
+ if image_id in results:
51
+ print("Found caption in results:", results[image_id]["caption"]) # Debugging line
52
+ return results[image_id]["caption"]
53
+
54
+ # If image_id is not a test image, search in nearest neighbors
55
+ for test_image_data in results.values():
56
+ for neighbor in test_image_data["nearest_neighbors"]:
57
+ if neighbor["image_id"] == image_id:
58
+ print("Found caption in nearest neighbors:", neighbor["caption"]) # Debugging line
59
+ return neighbor["caption"]
60
+
61
+ # Return None if the image_id is not found
62
+ print("Caption not found for image_id:", image_id) # Debugging line
63
+ return None
64
+
65
+ # Function to get a random image that hasn’t been annotated or skipped
66
  def get_next_image(session_data):
67
  with lock:
68
+ annotated_images = set(dataset["image_id"]) # Set of annotated images
69
+ available_images = [img for img in image_files if img not in annotated_images]
 
 
 
 
 
 
 
 
 
70
  # Check if the user already has an image
71
  if session_data["current_image"] is None and available_images:
72
  # Assign a new random image to the user
73
  session_data["current_image"] = random.choice(available_images)
 
74
  return os.path.join(image_folder, session_data["current_image"]) if session_data["current_image"] else None
75
 
76
+ # Function to save the annotation to Hugging Face dataset and fetch the next image
77
  def save_annotation(caption, session_data):
78
+ global dataset # Declare global dataset at the start of the function
79
 
80
  if session_data["current_image"] is None:
81
+ return gr.update(visible=False), gr.update(value="All images have been annotated!"), gr.update(value="")
82
 
83
  with lock:
84
  image_id = session_data["current_image"]
 
89
 
90
  # Check if image is already in dataset to update count
91
  existing_image = dataset.filter(lambda x: x["image_id"] == image_id)
92
+ if len(existing_image) > 0:
93
+ # Get current annotation count
94
  annotation_count = existing_image[0]["annotation_count"]
95
  else:
96
  annotation_count = 0
 
99
  new_data = Dataset.from_dict({
100
  "image_id": [image_id],
101
  "caption": [caption],
102
+ "annotation_count": [annotation_count + 1] # Increment the annotation count
103
  })
104
 
105
+ # Concatenate with the existing dataset and push the updated dataset to Hugging Face
106
  dataset = concatenate_datasets([dataset, new_data])
 
 
107
  dataset.push_to_hub(dataset_name)
108
  print("Pushed updated dataset")
109
 
110
+ # Clear user's current image so they get a new one next time
111
+ session_data["current_image"] = None
 
112
 
113
  # Fetch the next image
114
  next_image = get_next_image(session_data)
115
  if next_image:
116
+ next_caption = get_caption_for_image_id(os.path.basename(next_image)) # Retrieve the caption for the new image
117
+ return gr.update(value=next_image), gr.update(value=""), gr.update(value=next_caption or "")
118
  else:
119
+ return gr.update(visible=False), gr.update(value="All images have been annotated!"), gr.update(value="")
 
 
 
 
120
 
 
121
  def initialize_interface(session_data):
122
  next_image = get_next_image(session_data)
123
  if next_image:
124
+ next_caption = get_caption_for_image_id(os.path.basename(next_image)) # Retrieve caption for initial image
125
+ print(next_caption)
126
+ return gr.update(value=next_image), gr.update(value=next_caption or "")
127
  else:
128
  return gr.update(visible=False), gr.update(value="All images have been annotated!")
129
 
130
  # Build the Gradio interface
131
  with gr.Blocks() as demo:
132
  gr.Markdown("# Image Captioning Tool")
133
+ gr.Markdown("Please provide your caption in Egyptian Arabic 'Masri'")
134
 
135
  session_data = gr.State({"current_image": None}) # Session-specific state
136
 
137
  with gr.Row():
138
  image = gr.Image()
139
  caption = gr.Textbox(placeholder="Enter caption here...")
140
+ existing_caption = gr.Textbox(label="Existing Caption", interactive=False) # Display existing caption
141
  submit = gr.Button("Submit")
 
142
 
143
  # Define actions for buttons
144
+ submit.click(fn=save_annotation, inputs=[caption, session_data], outputs=[image, caption, existing_caption])
 
145
 
146
  # Load initial image
147
+ demo.load(fn=initialize_interface, inputs=session_data, outputs=[image, existing_caption])
148
 
149
  demo.launch(share=True)