Fred808 commited on
Commit
ba74f1a
·
verified ·
1 Parent(s): 3f391a4

Upload 12 files

Browse files
annotations/README.txt ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ # Annotation outputs (JSON) will be saved here
2
+ # Example: cursor_positions.json
app.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ from scripts.extract_frames import extract_frames
4
+ from scripts.cursor_tracker import track_cursor
5
+
6
+ def run_pipeline():
7
+ video_dir = "videos"
8
+ frames_dir = "frames"
9
+ cursor_dir = "cursors"
10
+ annotations_dir = "annotations"
11
+ os.makedirs(frames_dir, exist_ok=True)
12
+ os.makedirs(cursor_dir, exist_ok=True)
13
+ os.makedirs(annotations_dir, exist_ok=True)
14
+ # Step 1: Extract frames
15
+ for video_file in os.listdir(video_dir):
16
+ if video_file.lower().endswith(('.mp4', '.avi', '.mov')):
17
+ extract_frames(os.path.join(video_dir, video_file), frames_dir)
18
+ # Step 2: Track cursor
19
+ track_cursor(frames_dir, cursor_dir, os.path.join(annotations_dir, "cursor_positions.json"))
20
+ print("Pipeline complete.")
21
+
22
+ if __name__ == "__main__":
23
+ run_pipeline()
cursors/README.txt ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ # Place your cursor template PNGs here
2
+ # Example: cursor_template.png
frames/README.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ # Example extracted frame
2
+ # This folder will be filled by extract_frames.py
3
+ # Example filename: sample_0001.png
requirements.txt ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Core ML and API dependencies
2
+ opencv-python-headless>=4.8.0
3
+ numpy>=1.24.0
4
+ torch>=2.1.0
5
+ transformers>=4.40.0
6
+ pillow>=10.0.0
7
+
8
+ # Optional: for API or annotation
9
+ fastapi>=0.110.0
10
+ uvicorn[standard]>=0.29.0
11
+ requests>=2.31.0
scripts/blip2_infer.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from transformers import BlipProcessor, BlipForConditionalGeneration
2
+ from PIL import Image
3
+ import torch
4
+ import os
5
+
6
+ os.environ["HF_HOME"] = "/tmp/hf_cache"
7
+ os.environ["TRANSFORMERS_CACHE"] = "/tmp/hf_cache"
8
+
9
+ model_id = "Salesforce/blip-image-captioning-large"
10
+ processor = BlipProcessor.from_pretrained(model_id)
11
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
12
+ model = BlipForConditionalGeneration.from_pretrained(model_id).to(device)
13
+
14
+ def describe_image(image_path, prompt="Describe this image."):
15
+ image = Image.open(image_path).convert("RGB")
16
+ inputs = processor(image, prompt, return_tensors="pt").to(device)
17
+ output = model.generate(**inputs, max_new_tokens=100)
18
+ return processor.decode(output[0], skip_special_tokens=True)
scripts/cursor_tracker.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import cv2
3
+ import numpy as np
4
+ from pathlib import Path
5
+
6
+ def load_templates(cursor_dir):
7
+ templates = []
8
+ for fname in os.listdir(cursor_dir):
9
+ if fname.endswith('.png'):
10
+ templates.append(cv2.imread(os.path.join(cursor_dir, fname), cv2.IMREAD_UNCHANGED))
11
+ return templates
12
+
13
+ def track_cursor(frames_dir, cursor_dir, output_json):
14
+ import json
15
+ templates = load_templates(cursor_dir)
16
+ results = []
17
+ for frame_file in sorted(os.listdir(frames_dir)):
18
+ if not frame_file.endswith('.png'):
19
+ continue
20
+ frame_path = os.path.join(frames_dir, frame_file)
21
+ frame = cv2.imread(frame_path)
22
+ best_match = None
23
+ best_val = -np.inf
24
+ for template in templates:
25
+ res = cv2.matchTemplate(frame, template[..., :3], cv2.TM_CCOEFF_NORMED)
26
+ min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(res)
27
+ if max_val > best_val:
28
+ best_val = max_val
29
+ best_match = (max_loc, template.shape[:2], max_val)
30
+ if best_match:
31
+ (x, y), (h, w), score = best_match
32
+ results.append({"frame": frame_file, "cursor_pos": [int(x), int(y)], "score": float(score)})
33
+ with open(output_json, "w") as f:
34
+ json.dump(results, f, indent=2)
35
+ print(f"Cursor tracking complete. Results saved to {output_json}")
36
+
37
+ if __name__ == "__main__":
38
+ track_cursor("frames", "cursors", "annotations/cursor_positions.json")
scripts/extract_frames.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import cv2
3
+ from pathlib import Path
4
+
5
+ def extract_frames(video_path, output_dir, fps=2):
6
+ os.makedirs(output_dir, exist_ok=True)
7
+ vidcap = cv2.VideoCapture(video_path)
8
+ video_name = Path(video_path).stem
9
+ count = 0
10
+ frame_id = 1
11
+ success, image = vidcap.read()
12
+ while success:
13
+ if int(vidcap.get(cv2.CAP_PROP_POS_FRAMES)) % int(vidcap.get(cv2.CAP_PROP_FPS) // fps) == 0:
14
+ frame_name = f"{video_name}_{frame_id:04d}.png"
15
+ cv2.imwrite(os.path.join(output_dir, frame_name), image)
16
+ frame_id += 1
17
+ success, image = vidcap.read()
18
+ count += 1
19
+ vidcap.release()
20
+ print(f"Extracted {frame_id-1} frames from {video_path}")
21
+
22
+ if __name__ == "__main__":
23
+ video_dir = "videos"
24
+ frames_dir = "frames"
25
+ for video_file in os.listdir(video_dir):
26
+ if video_file.lower().endswith(('.mp4', '.avi', '.mov')):
27
+ extract_frames(os.path.join(video_dir, video_file), frames_dir)
scripts/pipeline.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ from scripts.extract_frames import extract_frames
4
+ from scripts.cursor_tracker import track_cursor
5
+
6
+ def run_pipeline():
7
+ video_dir = "videos"
8
+ frames_dir = "frames"
9
+ cursor_dir = "cursors"
10
+ annotations_dir = "annotations"
11
+ os.makedirs(frames_dir, exist_ok=True)
12
+ os.makedirs(cursor_dir, exist_ok=True)
13
+ os.makedirs(annotations_dir, exist_ok=True)
14
+ # Step 1: Extract frames
15
+ for video_file in os.listdir(video_dir):
16
+ if video_file.lower().endswith(('.mp4', '.avi', '.mov')):
17
+ extract_frames(os.path.join(video_dir, video_file), frames_dir)
18
+ # Step 2: Track cursor
19
+ track_cursor(frames_dir, cursor_dir, os.path.join(annotations_dir, "cursor_positions.json"))
20
+ print("Pipeline complete.")
21
+
22
+ if __name__ == "__main__":
23
+ run_pipeline()
scripts/test_vision_api.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import os
3
+
4
+ def test_vision_api(image_path, prompt, api_url, output_json_path):
5
+ with open(image_path, "rb") as img_file:
6
+ files = {"image": img_file}
7
+ data = {"prompt": prompt}
8
+ response = requests.post(api_url, files=files, data=data)
9
+ try:
10
+ result = response.json()
11
+ except Exception as e:
12
+ print("Non-JSON response from API:")
13
+ print(response.text)
14
+ result = {"error": str(e), "raw_response": response.text}
15
+ with open(output_json_path, "w") as f:
16
+ import json
17
+ json.dump({"frame": os.path.basename(image_path), "result": result}, f, indent=2)
18
+ print(f"Logged response to {output_json_path}")
19
+
20
+ if __name__ == "__main__":
21
+ # Example usage
22
+ test_vision_api(
23
+ image_path="frames/sample_task_0001.png",
24
+ prompt="Describe the user's task, app, actions, and the final goal.",
25
+ api_url="http://localhost:8000/vision",
26
+ output_json_path="annotations/test_vision_api_response.json"
27
+ )
scripts/vision_api.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from fastapi import FastAPI, File, UploadFile, Form
3
+ from scripts.blip2_infer import describe_image
4
+ import shutil
5
+
6
+ app = FastAPI()
7
+
8
+ @app.post("/vision")
9
+ async def process_vision(image: UploadFile = File(...), prompt: str = Form("Describe this image.")):
10
+ temp_path = "frame.jpg"
11
+ with open(temp_path, "wb") as f:
12
+ shutil.copyfileobj(image.file, f)
13
+ desc = describe_image(temp_path, prompt)
14
+ os.remove(temp_path)
15
+ return {"description": desc}
videos/README.txt ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ # Place your input videos here (e.g., .mp4, .avi, .mov)
2
+ # Example: sample.mp4