WebashalarForML commited on
Commit
45de5e4
·
verified ·
1 Parent(s): 80ea083

Update server.py

Browse files
Files changed (1) hide show
  1. server.py +92 -19
server.py CHANGED
@@ -182,42 +182,115 @@ async def process_text(file_bytes: bytes) -> str:
182
  except UnicodeDecodeError:
183
  return "Error: File is not valid UTF-8 text"
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  @mcp.tool
186
  async def analyze_file(
187
  ctx: Context,
188
- file: bytes,
189
  file_type: Optional[Literal["text", "image", "pdf"]] = None
190
  ) -> str:
191
  """
192
- Analyzes a file.
193
 
194
  Args:
195
- file: The file content
196
- file_type: Type of file (text/image/pdf). If not provided, will ask user.
197
  """
198
- # Basic info
199
- info = f"Received file of size {len(file)} bytes.\n\n"
 
 
 
200
 
201
- # If file_type not provided, elicit it from user
 
 
202
  if file_type is None:
203
- result = await ctx.elicit(
204
- message="What is the file type?",
205
- response_type=["text", "image", "pdf"]
206
- )
207
 
208
- if result.action == "accept":
209
- file_type = result.data
210
- else:
211
- return "Elicitation cancelled by user"
 
 
 
 
 
 
212
 
213
- # Route to appropriate processor
214
  try:
215
  if file_type == "pdf":
216
- result = await process_pdf(file)
217
  elif file_type == "image":
218
- result = await process_image(file)
219
  elif file_type == "text":
220
- result = await process_text(file)
221
  else:
222
  return info + f"Unknown file type: {file_type}"
223
 
 
182
  except UnicodeDecodeError:
183
  return "Error: File is not valid UTF-8 text"
184
 
185
+ def decode_file(file_data: str | bytes) -> bytes:
186
+ """Decode base64 file data to bytes"""
187
+ if isinstance(file_data, bytes):
188
+ return file_data
189
+
190
+ # If it's a string, decode from base64
191
+ try:
192
+ return base64.b64decode(file_data)
193
+ except Exception as e:
194
+ raise ValueError(f"Failed to decode base64: {e}")
195
+
196
+ def detect_file_type(file_bytes: bytes) -> str:
197
+ """Detect file type from magic bytes"""
198
+ if file_bytes.startswith(b'%PDF'):
199
+ return "pdf"
200
+ elif file_bytes.startswith(b'\x89PNG'):
201
+ return "image"
202
+ elif file_bytes.startswith(b'\xff\xd8\xff'):
203
+ return "image" # JPEG
204
+ elif file_bytes.startswith(b'GIF87a') or file_bytes.startswith(b'GIF89a'):
205
+ return "image" # GIF
206
+ elif file_bytes.startswith(b'\x42\x4d'):
207
+ return "image" # BMP
208
+ else:
209
+ # Try to decode as text
210
+ try:
211
+ file_bytes[:1000].decode('utf-8')
212
+ return "text"
213
+ except:
214
+ return "unknown"
215
+
216
+ async def process_pdf(file_bytes: bytes) -> str:
217
+ """Process PDF file and extract text/info"""
218
+ try:
219
+ pdf_file = io.BytesIO(file_bytes)
220
+ reader = PdfReader(pdf_file)
221
+
222
+ num_pages = len(reader.pages)
223
+ text = ""
224
+ for page in reader.pages:
225
+ text += page.extract_text()
226
+
227
+ return f"PDF Analysis:\n- Pages: {num_pages}\n- Text length: {len(text)} chars\n\nFirst 500 chars:\n{text[:500]}"
228
+ except Exception as e:
229
+ return f"PDF processing failed: {str(e)}\nThis might not be a valid PDF file."
230
+
231
+ async def process_image(file_bytes: bytes) -> str:
232
+ """Process image file"""
233
+ try:
234
+ img = Image.open(io.BytesIO(file_bytes))
235
+ return f"Image Analysis:\n- Format: {img.format}\n- Size: {img.size[0]}x{img.size[1]} pixels\n- Mode: {img.mode}"
236
+ except Exception as e:
237
+ return f"Image processing failed: {str(e)}"
238
+
239
+ async def process_text(file_bytes: bytes) -> str:
240
+ """Process text file"""
241
+ try:
242
+ text = file_bytes.decode('utf-8')
243
+ lines = text.split('\n')
244
+ words = len(text.split())
245
+ return f"Text Analysis:\n- Lines: {len(lines)}\n- Words: {words}\n- Characters: {len(text)}\n\nFirst 500 chars:\n{text[:500]}"
246
+ except UnicodeDecodeError:
247
+ return "Error: File is not valid UTF-8 text"
248
+
249
  @mcp.tool
250
  async def analyze_file(
251
  ctx: Context,
252
+ file: str, # MCP sends base64-encoded string
253
  file_type: Optional[Literal["text", "image", "pdf"]] = None
254
  ) -> str:
255
  """
256
+ Analyzes a file with auto-detection.
257
 
258
  Args:
259
+ file: The file content (base64-encoded)
260
+ file_type: Optional. If not provided, will auto-detect.
261
  """
262
+ # Decode base64 to bytes
263
+ try:
264
+ file_bytes = decode_file(file)
265
+ except ValueError as e:
266
+ return f"Error decoding file: {e}"
267
 
268
+ info = f"Received file of size {len(file_bytes)} bytes.\n"
269
+
270
+ # Auto-detect if not provided
271
  if file_type is None:
272
+ file_type = detect_file_type(file_bytes)
273
+ info += f"Auto-detected type: {file_type}\n\n"
 
 
274
 
275
+ if file_type == "unknown":
276
+ return info + "Could not detect file type. Please call again with file_type parameter (text/image/pdf)"
277
+ else:
278
+ # Verify the file_type matches actual content
279
+ detected = detect_file_type(file_bytes)
280
+ info += f"Specified type: {file_type}\n"
281
+ info += f"Detected type: {detected}\n\n"
282
+
283
+ if detected != file_type and detected != "unknown":
284
+ info += f"⚠️ Warning: Specified type '{file_type}' doesn't match detected type '{detected}'\n\n"
285
 
286
+ # Process based on file_type
287
  try:
288
  if file_type == "pdf":
289
+ result = await process_pdf(file_bytes)
290
  elif file_type == "image":
291
+ result = await process_image(file_bytes)
292
  elif file_type == "text":
293
+ result = await process_text(file_bytes)
294
  else:
295
  return info + f"Unknown file type: {file_type}"
296