Fred808 commited on
Commit
2de604e
·
verified ·
1 Parent(s): c32cd59

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +170 -247
app.py CHANGED
@@ -1,269 +1,192 @@
 
 
1
  import os
2
- import numpy as np
3
- from PIL import Image
4
- import requests
5
- import time
6
- import multiprocessing
7
  import json
8
- import sys
9
- from typing import Tuple, List, Dict, Any
10
 
11
- # Add Florence model path to Python path
12
- florence_path = os.path.join(os.path.dirname(__file__), 'florence-2-large')
13
- sys.path.append(florence_path)
14
-
15
- try:
16
- from processing_florence2 import Florence2Processor
17
- from configuration_florence2 import Florence2Config
18
- import torch
19
- import torch.nn.functional as F
20
-
21
- # Initialize processor with local files
22
- config = Florence2Config.from_json_file(os.path.join(florence_path, 'config.json'))
23
- processor = Florence2Processor(config)
24
- HAVE_PROCESSOR = True
25
- print("Successfully loaded Florence processor")
26
- except Exception as e:
27
- print(f"Warning: Could not load Florence processor: {e}")
28
- print("Using basic output interpretation")
29
- HAVE_PROCESSOR = False
30
 
31
- # Task-specific configuration
32
- TASK = "<MORE_DETAILED_CAPTION>" # For detailed image captioning
 
33
 
34
- # Model configuration
35
- MODEL_ID = "microsoft/florence-2-base"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
36
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
37
 
 
 
 
 
 
 
 
 
 
 
 
38
 
39
- def load_and_preprocess_image(image_path):
40
- # Load image and resize to 32x32
41
- img = Image.open(image_path)
42
- img = img.resize((32, 32))
43
-
44
- # Convert to numpy array and normalize to [0,1]
45
- img_array = np.array(img).astype(np.float32) / 255.0
46
-
47
- # Ensure array has shape (32, 32, 3)
48
- if len(img_array.shape) == 2:
49
- img_array = np.stack([img_array] * 3, axis=-1)
50
-
51
- # Add batch dimension
52
- img_array = img_array[np.newaxis, ...]
53
-
54
- # Convert tensor to list of single-element lists for API
55
- tensor_data = [[float(x)] for x in img_array.flatten()]
56
-
57
- return tensor_data
58
 
59
- def run_inference(args: Tuple[str, str, int]) -> dict:
60
- """Run inference on a specific server with given chunk ID."""
61
- server_url, image_path, chunk_id = args
62
-
63
- try:
64
- print(f"\nProcessing server {server_url} with chunk {chunk_id}...")
65
 
66
- # Load and preprocess image
67
- input_tensor = load_and_preprocess_image(image_path)
68
-
69
- # Prepare request data
70
- data = {
71
- "inputs": input_tensor
72
- }
73
-
74
- # Send request with timeout
75
- print(f"Sending request to {server_url}/compute/{chunk_id}")
76
- start_time = time.time()
77
- response = requests.post(
78
- f"{server_url}/compute/{chunk_id}",
79
- json=data,
80
- headers={"Content-Type": "application/json"},
81
- timeout=10
82
- )
83
-
84
- inference_time = time.time() - start_time
85
-
86
- if response.status_code == 200:
87
- result = response.json()
88
- return {
89
- "server": server_url,
90
- "chunk_id": chunk_id,
91
- "success": True,
92
- "time": inference_time,
93
- "result": result
94
- }
95
- else:
96
- error_msg = f"HTTP {response.status_code}"
97
- if hasattr(response, 'text'):
98
- error_msg += f": {response.text}"
99
- return {
100
- "server": server_url,
101
- "chunk_id": chunk_id,
102
- "success": False,
103
- "error": error_msg,
104
- "time": inference_time
105
- }
106
-
107
- except Exception as e:
108
- return {
109
- "server": server_url,
110
- "chunk_id": chunk_id,
111
- "success": False,
112
- "error": str(e),
113
- "time": time.time() - start_time if 'start_time' in locals() else None
114
- }
115
-
116
- def process_model_outputs(outputs, original_shape=(1, -1, 51289)):
117
- """Process model outputs using Florence processor for sequence generation."""
118
- # Convert outputs to numpy array
119
- outputs_array = np.array([x[0] for x in outputs])
120
-
121
- if HAVE_PROCESSOR:
122
  try:
123
- # Reshape logits to [batch, seq_len, vocab_size]
124
- logits = outputs_array.reshape(original_shape)
 
 
 
125
 
126
- if torch.is_tensor(logits):
127
- # Use torch operations if available
128
- token_ids = torch.argmax(logits, dim=-1)
129
- else:
130
- # Fallback to numpy
131
- token_ids = np.argmax(logits, axis=-1)
132
 
133
- # Decode tokens to text
134
- text = processor.batch_decode(token_ids, skip_special_tokens=True)
 
 
135
 
136
- # Post-process for the specific task
137
- processed_text = processor.post_process_generation(
138
- text[0] if isinstance(text, list) else text,
139
- task=TASK
140
- )
141
 
142
- return {
143
- 'text': processed_text,
144
- 'tokens': token_ids.tolist() if torch.is_tensor(token_ids) else token_ids.tolist(),
145
- 'logits_shape': logits.shape,
146
- 'distribution': {
147
- 'min': float(outputs_array.min()),
148
- 'max': float(outputs_array.max()),
149
- 'mean': float(outputs_array.mean()),
150
- 'std': float(outputs_array.std())
151
- }
152
- }
153
  except Exception as e:
154
- print(f"Warning: Error in sequence processing: {e}")
155
-
156
- # Fallback to basic statistics if processor not available
157
- return {
158
- 'overall_mean': float(outputs_array.mean()),
159
- 'overall_std': float(outputs_array.std()),
160
- 'shape': outputs_array.shape,
161
- 'distribution': {
162
- 'min': float(outputs_array.min()),
163
- 'max': float(outputs_array.max()),
164
- 'median': float(np.median(outputs_array))
165
- }
166
- }
167
 
168
- def process_results(results):
169
- """Process and combine results from all servers."""
170
- # Filter successful results
171
- successful_results = [r for r in results if r['success']]
172
- if not successful_results:
173
- print("\nError: No servers returned successful results")
174
- return
175
-
176
- # Sort successful results by chunk ID
177
- successful_results.sort(key=lambda x: x['chunk_id'])
178
-
179
- print(f"\nModel Output Analysis ({len(successful_results)}/{len(results)} servers succeeded):")
180
- print("-" * 80)
181
-
182
- # Get total sequence length from all chunks
183
- total_outputs = []
184
- for result in successful_results:
185
- total_outputs.extend(result['result']['outputs'])
186
-
187
- # Process the combined sequence
188
- print("\nProcessing complete sequence...")
189
- analysis = process_model_outputs(total_outputs, original_shape=(1, -1, 51289))
190
-
191
- if 'text' in analysis:
192
- print("\nGenerated Description:")
193
- print("-" * 80)
194
- print(analysis['text'])
195
-
196
- print("\nSequence Statistics:")
197
- print(f"- Logits shape: {analysis['logits_shape']}")
198
- print(f"- Distribution:")
199
- for key, value in analysis['distribution'].items():
200
- print(f" {key}: {value:.4f}")
201
- else:
202
- print("\nBasic Analysis (Florence processor not available):")
203
- print(f"- Sequence length: {len(total_outputs)}")
204
- print(f"- Overall activation: {analysis['overall_mean']:.4f} ± {analysis['overall_std']:.4f}")
205
- print("\nValue Distribution:")
206
- for key, value in analysis['distribution'].items():
207
- print(f"- {key}: {value:.4f}")
208
-
209
- # Check server consistency
210
- if len(successful_results) > 1:
211
- all_outputs = [np.array([x[0] for x in r['result']['outputs']])
212
- for r in successful_results]
213
- differences = [np.max(np.abs(all_outputs[0] - tensor))
214
- for tensor in all_outputs[1:]]
215
-
216
- print("\nServer Consistency:")
217
- if np.max(differences) < 1e-6:
218
- print("Successful servers provided identical results")
219
- else:
220
- print(f"Variations detected between servers (max diff: {np.max(differences):.6f})")
221
-
222
- # Print timing summary
223
- successful_times = [r['time'] for r in successful_results]
224
- print(f"\nProcessing Time Summary:")
225
- print(f"- Average: {np.mean(successful_times):.2f}s")
226
- print(f"- Range: {min(successful_times):.2f}s - {max(successful_times):.2f}s")
227
 
228
- def main():
229
- # Server configurations with their respective chunk IDs
230
- servers = [
231
- ("https://fred808-ilob.hf.space", 0),
232
- ("https://fred808-tserv.hf.space", 1),
233
- ("https://fred808-tserve2.hf.space", 2)
234
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
235
 
236
- # Image path - using the same image for all servers
237
- image_path = "sample_task/test1.png"
238
- print(f"\nTesting with image: {image_path}")
239
 
240
- # Create process pool
241
- with multiprocessing.Pool() as pool:
242
- # Prepare arguments for each server
243
- args = [(server_url, image_path, chunk_id) for server_url, chunk_id in servers]
244
-
245
- # Run inference in parallel
246
- print("\nStarting parallel inference across all servers...")
247
- results = pool.map(run_inference, args)
248
-
249
- # Display individual server results
250
- print("\nServer Results:")
251
- print("-" * 80)
252
- for result in results:
253
- print(f"\nServer: {result['server']}")
254
- print(f"Chunk ID: {result['chunk_id']}")
255
- print(f"Success: {result['success']}")
256
- print(f"Time: {result['time']:.4f}s" if result['time'] else "Time: N/A")
257
-
258
- if result['success']:
259
- print(f"Output shape: {len(result['result']['outputs'])} elements")
260
- print("First few outputs:", result['result']['outputs'][:5])
261
- else:
262
- print(f"Error: {result['error']}")
263
- print("-" * 80)
264
-
265
- # Process and display combined results
266
- process_results(results)
267
 
268
  if __name__ == "__main__":
269
- main()
 
1
+ import httpx
2
+ import asyncio
3
  import os
4
+ import uuid
5
+ from pathlib import Path
6
+ from typing import Optional, Dict, Any
 
 
7
  import json
8
+ from datetime import datetime
 
9
 
10
+ class MiddlewareClient:
11
+ def __init__(self, base_url: str = "https://fred808-vssee.hf.space"):
12
+ self.base_url = base_url.rstrip('/')
13
+ self.client = httpx.AsyncClient(timeout=30.0) # 30 second timeout
14
+ self.requester_id = str(uuid.uuid4()) # Unique ID for this client
15
+ self.download_dir = Path("downloads")
16
+ self.download_dir.mkdir(exist_ok=True)
17
+
18
+ # Keep track of our current locks
19
+ self.current_course: Optional[str] = None
20
+ self.current_image: Optional[str] = None
21
+
22
+ # Statistics
23
+ self.stats = {
24
+ "downloads_started": 0,
25
+ "downloads_completed": 0,
26
+ "bytes_downloaded": 0,
27
+ "start_time": datetime.now().isoformat()
28
+ }
29
 
30
+ async def close(self):
31
+ """Close the HTTP client"""
32
+ await self.client.aclose()
33
 
34
+ async def get_next_course(self) -> Optional[Dict[str, Any]]:
35
+ """Get next available course"""
36
+ try:
37
+ response = await self.client.get(
38
+ f"{self.base_url}/middleware/next/course",
39
+ params={"requester_id": self.requester_id}
40
+ )
41
+ response.raise_for_status()
42
+ course_data = response.json()
43
+ self.current_course = course_data["course_id"]
44
+ return course_data
45
+ except httpx.HTTPError as e:
46
+ if e.response.status_code == 404:
47
+ print("No more courses available")
48
+ return None
49
+ raise
50
 
51
+ async def get_next_image(self, course_id: str) -> Optional[Dict[str, Any]]:
52
+ """Get next available image from a course"""
53
+ try:
54
+ response = await self.client.get(
55
+ f"{self.base_url}/middleware/next/image/{course_id}",
56
+ params={"requester_id": self.requester_id}
57
+ )
58
+ response.raise_for_status()
59
+ image_data = response.json()
60
+ self.current_image = image_data["file_id"]
61
+ return image_data
62
+ except httpx.HTTPError as e:
63
+ if e.response.status_code == 404:
64
+ print(f"No more images available in course {course_id}")
65
+ return None
66
+ raise
67
 
68
+ async def release_course(self, course_id: str):
69
+ """Release lock on a course"""
70
+ try:
71
+ response = await self.client.post(
72
+ f"{self.base_url}/middleware/release/course/{course_id}",
73
+ params={"requester_id": self.requester_id}
74
+ )
75
+ response.raise_for_status()
76
+ self.current_course = None
77
+ except httpx.HTTPError as e:
78
+ print(f"Error releasing course {course_id}: {e}")
79
 
80
+ async def release_image(self, course_id: str, file_id: str):
81
+ """Release lock on an image"""
82
+ try:
83
+ response = await self.client.post(
84
+ f"{self.base_url}/middleware/release/image/{course_id}/{file_id}",
85
+ params={"requester_id": self.requester_id}
86
+ )
87
+ response.raise_for_status()
88
+ self.current_image = None
89
+ except httpx.HTTPError as e:
90
+ print(f"Error releasing image {file_id}: {e}")
 
 
 
 
 
 
 
 
91
 
92
+ async def download_file(self, course: str, file_id: str) -> bool:
93
+ """Download a file to local storage"""
94
+ save_path = self.download_dir / course / file_id
95
+ save_path.parent.mkdir(exist_ok=True)
 
 
96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97
  try:
98
+ response = await self.client.get(
99
+ f"{self.base_url}/download",
100
+ params={"course": course, "file": file_id}
101
+ )
102
+ response.raise_for_status()
103
 
104
+ self.stats["downloads_started"] += 1
 
 
 
 
 
105
 
106
+ async with await aiofiles.open(save_path, 'wb') as f:
107
+ async for chunk in response.aiter_bytes():
108
+ await f.write(chunk)
109
+ self.stats["bytes_downloaded"] += len(chunk)
110
 
111
+ self.stats["downloads_completed"] += 1
112
+ return True
 
 
 
113
 
 
 
 
 
 
 
 
 
 
 
 
114
  except Exception as e:
115
+ print(f"Error downloading {file_id}: {e}")
116
+ return False
 
 
 
 
 
 
 
 
 
 
 
117
 
118
+ def save_stats(self):
119
+ """Save download statistics"""
120
+ self.stats["end_time"] = datetime.now().isoformat()
121
+ stats_file = self.download_dir / "download_stats.json"
122
+ with open(stats_file, 'w') as f:
123
+ json.dump(self.stats, f, indent=2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
124
 
125
+ async def download_all(self, max_courses: int = None, max_files: int = None):
126
+ """Download all available files with optional limits"""
127
+ try:
128
+ courses_processed = 0
129
+ files_downloaded = 0
130
+
131
+ while True:
132
+ if max_courses and courses_processed >= max_courses:
133
+ print(f"Reached maximum courses limit ({max_courses})")
134
+ break
135
+
136
+ course_data = await self.get_next_course()
137
+ if not course_data:
138
+ print("No more courses available")
139
+ break
140
+
141
+ course_id = course_data["course_id"]
142
+ print(f"\nProcessing course: {course_id}")
143
+ courses_processed += 1
144
+
145
+ course_files = 0
146
+ while True:
147
+ if max_files and files_downloaded >= max_files:
148
+ print(f"Reached maximum files limit ({max_files})")
149
+ break
150
+
151
+ image_data = await self.get_next_image(course_id)
152
+ if not image_data:
153
+ break
154
+
155
+ file_id = image_data["file_id"]
156
+ print(f"Downloading: {file_id}")
157
+
158
+ if await self.download_file(course_id, file_id):
159
+ files_downloaded += 1
160
+ course_files += 1
161
+ print(f"Successfully downloaded: {file_id}")
162
+
163
+ await self.release_image(course_id, file_id)
164
+
165
+ print(f"Completed course {course_id} - Downloaded {course_files} files")
166
+ await self.release_course(course_id)
167
+
168
+ print(f"\nDownload complete!")
169
+ print(f"Processed {courses_processed} courses")
170
+ print(f"Downloaded {files_downloaded} files")
171
+ print(f"Total bytes: {self.stats['bytes_downloaded']:,}")
172
+
173
+ finally:
174
+ self.save_stats()
175
+ await self.close()
176
+
177
+ async def main():
178
+ # Create downloads directory
179
+ Path("downloads").mkdir(exist_ok=True)
180
 
181
+ client = MiddlewareClient()
 
 
182
 
183
+ try:
184
+ # Download 2 courses with up to 5 files each as an example
185
+ await client.download_all(max_courses=2, max_files=10)
186
+ except KeyboardInterrupt:
187
+ print("\nDownload interrupted by user")
188
+ finally:
189
+ await client.close()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
190
 
191
  if __name__ == "__main__":
192
+ asyncio.run(main())