File size: 5,000 Bytes
1d256d0 4223c89 1d256d0 4223c89 1d256d0 4223c89 1d256d0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | import os
import requests
from dotenv import load_dotenv
load_dotenv()
DEFAULT_MODEL = os.getenv("GEMINI_MODEL")
# def analyze_file_with_gemini(file_path: str, file_name: str) -> str:
# # 1. Read file and encode in base64
# try:
# with open(file_path, "rb") as f:
# content = f.read()
# mime_type = _get_mime_type(file_path)
# base64_data = base64.b64encode(content).decode("utf-8")
# except Exception as e:
# return f"Error reading file: {e}"
# file = create_file(
# file=base64_data,
# purpose="user_data",
# extra_body={"custom_llm_provider": "gemini"},
# api_key=os.getenv("GEMINI_API_KEY"),
# )
# # 2. Construct Gemini-style multimodal input
# prompt = (
# f"Analyze the following {mime_type} file and provide a detailed report. "
# "The file is encoded in base64 format. "
# "Please include any relevant information or insights."
# )
# try:
# time.sleep(5)
# response = completion(
# model=DEFAULT_MODEL,
# messages=[
# {
# "role": "user",
# "content": [
# {"type": "text", "text": prompt},
# {
# "type": "file",
# "file": {
# "file_id": file.id,
# "filename": file_name,
# "format": "audio/wav",
# },
# },
# ],
# },
# ],
# )
# return response.choices[0].message
# except Exception as e:
# return f"Error from Gemini: {e}"
# def _get_mime_type(file_path: str) -> str:
# if file_path.endswith(".png"):
# return "image/png"
# elif file_path.endswith(".jpg") or file_path.endswith(".jpeg"):
# return "image/jpeg"
# elif file_path.endswith(".mp3"):
# return "audio/mpeg"
# else:
# raise ValueError(
# "Unsupported file type: only .png, .jpg, .jpeg, .mp3 are supported"
# )
# def download_file_from_url(url: str, save_dir: str = "./downloads") -> str:
# """
# Downloads a file from a public URL and saves it locally.
# Args:
# url (str): The direct URL to the file (must not be a blob: URL).
# save_dir (str): Directory to save the downloaded file (default: ./downloads).
# Returns:
# str: Full path to the downloaded file.
# """
# try:
# os.makedirs(save_dir, exist_ok=True)
# # Get file name from the URL or fallback
# local_filename = url.split("/")[-1] or "downloaded_file"
# file_path = os.path.join(save_dir, local_filename)
# # Perform streaming download
# with requests.get(url, stream=True) as r:
# r.raise_for_status()
# with open(file_path, "wb") as f:
# for chunk in r.iter_content(chunk_size=8192):
# f.write(chunk)
# return file_path
# except Exception as e:
# raise RuntimeError(f"Failed to download file from {url}: {e}")
import mimetypes
def download_file_from_url(url: str, save_dir: str = "./downloads") -> str:
"""
Downloads a file from a public URL and saves it locally with the correct extension.
Args:
url (str): The direct URL to the file (must not be a blob: URL).
save_dir (str): Directory to save the downloaded file (default: ./downloads).
Returns:
str: Full path to the downloaded file.
"""
try:
os.makedirs(save_dir, exist_ok=True)
with requests.get(url, stream=True) as r:
r.raise_for_status()
# Try to get filename from Content-Disposition header
cd = r.headers.get("content-disposition")
if cd and "filename=" in cd:
local_filename = cd.split("filename=")[-1].strip('"; ')
else:
# Fallback to URL
local_filename = url.split("/")[-1]
# If no extension, try to guess from Content-Type
if not os.path.splitext(local_filename)[1]:
content_type = r.headers.get("content-type")
ext = (
mimetypes.guess_extension(content_type.split(";")[0])
if content_type
else ""
)
if ext:
local_filename += ext
else:
local_filename += ".bin" # fallback
file_path = os.path.join(save_dir, local_filename)
with open(file_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
return file_path
except Exception as e:
raise RuntimeError(f"Failed to download file from {url}: {e}")
|