Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import cv2 | |
| from ultralytics import YOLO # For street detection | |
| import folium | |
| from streamlit_folium import st_folium | |
| import requests | |
| from PIL import Image | |
| from io import BytesIO | |
| import numpy as np | |
| import torch | |
| from sklearn.utils.extmath import softmax | |
| import open_clip | |
| import os | |
| knnpath = '20241204-ams-no-env-open_clip_ViT-H-14-378-quickgelu.npz' | |
| clip_model_name = 'ViT-H-14-378-quickgelu' | |
| pretrained_name = 'dfn5b' | |
| categories = ['walkability', 'bikeability', 'pleasantness', 'greenness', 'safety'] | |
| debug = False | |
| # Set page config | |
| st.set_page_config( | |
| page_title="Percept", | |
| layout="wide" | |
| ) | |
| # Securely get the token from environment variables | |
| MAPILLARY_ACCESS_TOKEN = os.environ.get('MAPILLARY_ACCESS_TOKEN') | |
| # Verify token exists | |
| if not MAPILLARY_ACCESS_TOKEN: | |
| st.error("Mapillary access token not found. Please configure it in the Space secrets.") | |
| st.stop() | |
| def detect_and_crop_street(panorama_url, use_yolo=True): | |
| """ | |
| Detect streets in a panoramic image and return a cropped normal-sized image | |
| Args: | |
| panorama_url: URL of the panoramic image | |
| use_yolo: Whether to use YOLOv8 (True) or simple edge detection (False) | |
| Returns: | |
| cropped_image: PIL Image containing the cropped street view | |
| """ | |
| # Download and convert image to CV2 format | |
| response = requests.get(panorama_url) | |
| img = Image.open(BytesIO(response.content)) | |
| cv_img = cv2.cvtColor(np.array(img), cv2.COLOR_RGB2BGR) | |
| if use_yolo: | |
| # Load YOLOv8 model | |
| model = YOLO('yolov8n.pt') | |
| # Detect objects | |
| results = model(cv_img) | |
| # Look for road/street class (index 0 in COCO dataset) | |
| street_boxes = [] | |
| for result in results: | |
| for box, cls in zip(result.boxes.xyxy, result.boxes.cls): | |
| if cls == 0: # road class | |
| street_boxes.append(box.cpu().numpy()) | |
| if street_boxes: | |
| # Take the largest street detection | |
| largest_box = max(street_boxes, key=lambda box: (box[2]-box[0])*(box[3]-box[1])) | |
| x1, y1, x2, y2 = map(int, largest_box) | |
| midx = (x2 - x1) / 2 | |
| # Add some padding | |
| padding = 200 | |
| height, width = cv_img.shape[:2] | |
| x1 = max(0, x1 - padding) | |
| y1 = max(0, y1 - padding) | |
| x2 = min(width, x2 + padding) | |
| y2 = min(height, y2 + padding) | |
| cropped = cv_img[y1:y2, x1:x2] | |
| else: | |
| # Fallback to edge detection if no streets found | |
| cropped = edge_based_crop(cv_img) | |
| else: | |
| cropped = edge_based_crop(cv_img) | |
| # Convert back to PIL Image | |
| cropped_pil = Image.fromarray(cv2.cvtColor(cropped, cv2.COLOR_BGR2RGB)) | |
| # Resize to standard dimensions while maintaining aspect ratio | |
| target_width = 1024 | |
| aspect_ratio = cropped.shape[1] / cropped.shape[0] | |
| target_height = int(target_width / aspect_ratio) | |
| cropped_pil = cropped_pil.resize((target_width, target_height), Image.Resampling.LANCZOS) | |
| return cropped_pil | |
| def edge_based_crop(cv_img): | |
| """ | |
| Use edge detection to find and crop around street areas | |
| """ | |
| # Convert to grayscale | |
| gray = cv2.cvtColor(cv_img, cv2.COLOR_BGR2GRAY) | |
| # Apply Gaussian blur | |
| blurred = cv2.GaussianBlur(gray, (5, 5), 0) | |
| # Detect edges | |
| edges = cv2.Canny(blurred, 50, 150) | |
| # Find contours | |
| contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if contours: | |
| # Find the largest contour | |
| largest_contour = max(contours, key=cv2.contourArea) | |
| x, y, w, h = cv2.boundingRect(largest_contour) | |
| # Add padding | |
| padding = 200 | |
| height, width = cv_img.shape[:2] | |
| x = max(0, x - padding) | |
| y = max(0, y - padding) | |
| w = min(width - x, w + 2*padding) | |
| h = min(height - y, h + 2*padding) | |
| return cv_img[y:y+h, x:x+w] | |
| else: | |
| # If no contours found, return center crop | |
| height, width = cv_img.shape[:2] | |
| center_x = width // 2 | |
| center_y = height // 2 | |
| crop_width = width // 3 | |
| crop_height = height // 3 | |
| return cv_img[center_y-crop_height//2:center_y+crop_height//2, | |
| center_x-crop_width//2:center_x+crop_width//2] | |
| # Example usage in your Streamlit app: | |
| def process_panorama(panorama_url): | |
| """ | |
| Process a panoramic image to get a street-centered crop | |
| """ | |
| try: | |
| cropped_image = detect_and_crop_street(panorama_url) | |
| return cropped_image | |
| except Exception as e: | |
| st.error(f"Error processing panorama: {str(e)}") | |
| return None | |
| def get_bounding_box(lat, lon): | |
| """ | |
| Create a bounding box around a point that extends roughly 25 meters in each direction | |
| at Amsterdam's latitude (52.37°N): | |
| - 0.000224 degrees latitude = 25 meters N/S | |
| - 0.000368 degrees longitude = 25 meters E/W | |
| """ | |
| lat_offset = 0.000224 # 25 meters in latitude | |
| lon_offset = 0.000368 # 25 meters in longitude | |
| return [ | |
| lon - lon_offset, # min longitude | |
| lat - lat_offset, # min latitude | |
| lon + lon_offset, # max longitude | |
| lat + lat_offset # max latitude | |
| ] | |
| def get_nearest_image(lat, lon): | |
| """ | |
| Get the nearest Mapillary image to given coordinates | |
| """ | |
| bbox = get_bounding_box(lat, lon) | |
| params = { | |
| 'fields': 'id,thumb_1024_url,is_pano', | |
| 'limit': 1, | |
| 'bbox': f'{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}' | |
| } | |
| header = {'Authorization' : 'OAuth {}'.format(MAPILLARY_ACCESS_TOKEN)} | |
| try: | |
| response = requests.get( | |
| "https://graph.mapillary.com/images", | |
| params=params, | |
| headers=header | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if 'data' in data and len(data['data']) > 0: | |
| return data['data'][0] | |
| return None | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"Error fetching Mapillary data: {str(e)}") | |
| return None | |
| def load_model(): | |
| """Load the OpenCLIP model and return model and processor""" | |
| model, _, preprocess = open_clip.create_model_and_transforms( | |
| clip_model_name, pretrained=pretrained_name | |
| ) | |
| tokenizer = open_clip.get_tokenizer(clip_model_name) | |
| return model, preprocess, tokenizer | |
| def process_image(image, preprocess): | |
| """Process image and return tensor""" | |
| if isinstance(image, str): | |
| # If image is a URL | |
| response = requests.get(image) | |
| image = Image.open(BytesIO(response.content)) | |
| # Ensure image is in RGB mode | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| processed_image = preprocess(image).unsqueeze(0) | |
| return processed_image | |
| def knn_get_score(knn, k, cat, vec): | |
| allvecs = knn[f'{cat}_vecs'] | |
| if debug: st.write('allvecs.shape', allvecs.shape) | |
| scores = knn[f'{cat}_scores'] | |
| if debug: st.write('scores.shape', scores.shape) | |
| # Compute cosine similiarity of vec against allvecs | |
| # (both are already normalized) | |
| cos_sim_table = vec @ allvecs.T | |
| if debug: st.write('cos_sim_table.shape', cos_sim_table.shape) | |
| # Get sorted array indices by similiarity in descending order | |
| sortinds = np.flip(np.argsort(cos_sim_table, axis=1), axis=1) | |
| if debug: st.write('sortinds.shape', sortinds.shape) | |
| # Get corresponding scores for the sorted vectors | |
| kscores = scores[sortinds][:,:k] | |
| if debug: st.write('kscores.shape', kscores.shape) | |
| # Get actual sorted similiarity scores | |
| # (line copied from clip_retrieval_knn.py even though sortinds.shape[0] == 1 here) | |
| ksims = cos_sim_table[np.expand_dims(np.arange(sortinds.shape[0]), axis=1), sortinds] | |
| ksims = ksims[:,:k] | |
| if debug: st.write('ksims.shape', ksims.shape) | |
| # Apply normalization after exponential formula | |
| ksims = softmax(10**ksims) | |
| # Weighted sum | |
| kweightedscore = np.sum(kscores * ksims) | |
| return kweightedscore | |
| def load_knn(): | |
| return np.load(knnpath) | |
| def main(): | |
| st.title("Percept: Map Explorer") | |
| try: | |
| with st.spinner('Loading CLIP model... This may take a moment.'): | |
| model, preprocess, tokenizer = load_model() | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model = model.to(device) | |
| except Exception as e: | |
| st.error(f"Error loading model: {str(e)}") | |
| st.info("Please make sure you have enough memory and the correct dependencies installed.") | |
| with st.spinner('Loading KNN model... This may take a moment.'): | |
| knn = load_knn() | |
| # Initialize the map centered on Amsterdam | |
| amsterdam_coords = [52.3676, 4.9041] | |
| m = folium.Map(location=amsterdam_coords, zoom_start=13) | |
| # Create a LayerGroup for the marker | |
| marker_group = folium.FeatureGroup(name="Marker") | |
| m.add_child(marker_group) | |
| # Display the map and get clicked coordinates | |
| map_data = st_folium(m, height=400, width=700) | |
| # Check if a location was clicked | |
| if map_data['last_clicked']: | |
| lat = map_data['last_clicked']['lat'] | |
| lng = map_data['last_clicked']['lng'] | |
| # Add a marker | |
| marker_group.add_child(folium.Marker( | |
| [lat, lng], | |
| popup=f"Selected Location\n{lat:.4f}, {lng:.4f}", | |
| icon=folium.Icon(color="red", icon="info-sign") | |
| )) | |
| st.write(f"Selected coordinates: {lat:.4f}, {lng:.4f}") | |
| # Get nearest Mapillary image | |
| with st.spinner('Fetching street view image...'): | |
| image_data = get_nearest_image(lat, lng) | |
| if image_data: | |
| # Display the image | |
| try: | |
| if image_data['is_pano']: | |
| st.write('Processing panoramic image') | |
| image = process_panorama(image_data['thumb_1024_url']) | |
| image_bytes = BytesIO() | |
| #st.write('Resaving image size =', image.size, ' image format = ', image.format) | |
| image.save(image_bytes, format='JPEG') | |
| image = Image.open(image_bytes) | |
| image_bytes = image_bytes.getvalue() | |
| #st.write('Panoramic image size = ', image.size, ' format = ', image.format) | |
| else: | |
| response = requests.get(image_data['thumb_1024_url']) | |
| image = Image.open(BytesIO(response.content)) | |
| image_bytes = response.content | |
| st.image(image, caption="Street View", width=400, output_format='JPEG') | |
| # Add download button | |
| st.download_button( | |
| label="Download Image", | |
| data=image_bytes, | |
| file_name=f"streetview_{lat}_{lng}.jpg", | |
| mime="image/jpeg" | |
| ) | |
| # Process image | |
| with st.spinner('Processing image...'): | |
| processed_image = process_image(image, preprocess) | |
| processed_image = processed_image.to(device) | |
| # Encode into CLIP vector | |
| with torch.no_grad(): | |
| vec = model.encode_image(processed_image) | |
| # Normalize vector | |
| vec /= vec.norm(dim=-1, keepdim=True) | |
| if debug: st.write(vec.shape) | |
| vec = vec.numpy() | |
| k = 40 | |
| for cat in categories: | |
| st.write(cat, f'rating = {knn_get_score(knn, k, cat, vec):.1f}') | |
| except Exception as e: | |
| st.error(f"Error displaying image: {str(e)}") | |
| else: | |
| st.warning("No street view images found at this location. Try a different spot.") | |
| if __name__ == "__main__": | |
| main() | |