import os import requests from dotenv import load_dotenv load_dotenv() DEFAULT_MODEL = os.getenv("GEMINI_MODEL") # def analyze_file_with_gemini(file_path: str, file_name: str) -> str: # # 1. Read file and encode in base64 # try: # with open(file_path, "rb") as f: # content = f.read() # mime_type = _get_mime_type(file_path) # base64_data = base64.b64encode(content).decode("utf-8") # except Exception as e: # return f"Error reading file: {e}" # file = create_file( # file=base64_data, # purpose="user_data", # extra_body={"custom_llm_provider": "gemini"}, # api_key=os.getenv("GEMINI_API_KEY"), # ) # # 2. Construct Gemini-style multimodal input # prompt = ( # f"Analyze the following {mime_type} file and provide a detailed report. " # "The file is encoded in base64 format. " # "Please include any relevant information or insights." # ) # try: # time.sleep(5) # response = completion( # model=DEFAULT_MODEL, # messages=[ # { # "role": "user", # "content": [ # {"type": "text", "text": prompt}, # { # "type": "file", # "file": { # "file_id": file.id, # "filename": file_name, # "format": "audio/wav", # }, # }, # ], # }, # ], # ) # return response.choices[0].message # except Exception as e: # return f"Error from Gemini: {e}" # def _get_mime_type(file_path: str) -> str: # if file_path.endswith(".png"): # return "image/png" # elif file_path.endswith(".jpg") or file_path.endswith(".jpeg"): # return "image/jpeg" # elif file_path.endswith(".mp3"): # return "audio/mpeg" # else: # raise ValueError( # "Unsupported file type: only .png, .jpg, .jpeg, .mp3 are supported" # ) # def download_file_from_url(url: str, save_dir: str = "./downloads") -> str: # """ # Downloads a file from a public URL and saves it locally. # Args: # url (str): The direct URL to the file (must not be a blob: URL). # save_dir (str): Directory to save the downloaded file (default: ./downloads). # Returns: # str: Full path to the downloaded file. # """ # try: # os.makedirs(save_dir, exist_ok=True) # # Get file name from the URL or fallback # local_filename = url.split("/")[-1] or "downloaded_file" # file_path = os.path.join(save_dir, local_filename) # # Perform streaming download # with requests.get(url, stream=True) as r: # r.raise_for_status() # with open(file_path, "wb") as f: # for chunk in r.iter_content(chunk_size=8192): # f.write(chunk) # return file_path # except Exception as e: # raise RuntimeError(f"Failed to download file from {url}: {e}") import mimetypes def download_file_from_url(url: str, save_dir: str = "./downloads") -> str: """ Downloads a file from a public URL and saves it locally with the correct extension. Args: url (str): The direct URL to the file (must not be a blob: URL). save_dir (str): Directory to save the downloaded file (default: ./downloads). Returns: str: Full path to the downloaded file. """ try: os.makedirs(save_dir, exist_ok=True) with requests.get(url, stream=True) as r: r.raise_for_status() # Try to get filename from Content-Disposition header cd = r.headers.get("content-disposition") if cd and "filename=" in cd: local_filename = cd.split("filename=")[-1].strip('"; ') else: # Fallback to URL local_filename = url.split("/")[-1] # If no extension, try to guess from Content-Type if not os.path.splitext(local_filename)[1]: content_type = r.headers.get("content-type") ext = ( mimetypes.guess_extension(content_type.split(";")[0]) if content_type else "" ) if ext: local_filename += ext else: local_filename += ".bin" # fallback file_path = os.path.join(save_dir, local_filename) with open(file_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return file_path except Exception as e: raise RuntimeError(f"Failed to download file from {url}: {e}")