File size: 4,956 Bytes
618f472 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | import os
import base64
import io
from PIL import Image
from concurrent.futures import ThreadPoolExecutor
import math
if os.name == 'nt':
import ctypes
from ctypes import wintypes
import comtypes
from comtypes import IUnknown, GUID, HRESULT, COMMETHOD
class SIZE(ctypes.Structure):
_fields_ = [("cx", wintypes.LONG), ("cy", wintypes.LONG)]
class IShellItemImageFactory(IUnknown):
_case_insensitive_ = True
_iid_ = GUID('{bcc18b79-ba16-442f-80c4-8a59c30c463b}')
_idlflags_ = []
_methods_ = [COMMETHOD([], HRESULT, 'GetImage', (['in'], SIZE, 'size'), (['in'], ctypes.c_uint, 'flags'), (['out', 'retval'], ctypes.POINTER(wintypes.HBITMAP), 'phbm'))]
SIIGBF_THUMBNAILONLY = 0x8
shell32 = ctypes.windll.shell32
gdi32 = ctypes.windll.gdi32
SHCreateItemFromParsingName = shell32.SHCreateItemFromParsingName
SHCreateItemFromParsingName.argtypes = [
wintypes.LPCWSTR,
ctypes.c_void_p,
ctypes.POINTER(GUID),
ctypes.POINTER(ctypes.c_void_p)
]
SHCreateItemFromParsingName.restype = HRESULT
class BITMAP(ctypes.Structure):
_fields_ = [("bmType", wintypes.LONG), ("bmWidth", wintypes.LONG), ("bmHeight", wintypes.LONG), ("bmWidthBytes", wintypes.LONG), ("bmPlanes", wintypes.WORD), ("bmBitsPixel", wintypes.WORD), ("bmBits", wintypes.LPVOID)]
GetObjectW = gdi32.GetObjectW
GetObjectW.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p]
GetObjectW.restype = ctypes.c_int
DeleteObject = gdi32.DeleteObject
DeleteObject.argtypes = [ctypes.c_void_p]
DeleteObject.restype = wintypes.BOOL
def get_thumbnail_as_base64(file_path, size=128):
hbitmap_handle = 0
bmp_copy = None
factory_ptr = None
ppv = ctypes.c_void_p(None)
try:
hr = SHCreateItemFromParsingName(
file_path,
None,
ctypes.byref(IShellItemImageFactory._iid_),
ctypes.byref(ppv)
)
if hr == 0 and ppv.value:
factory_ptr = ctypes.cast(ppv, ctypes.POINTER(IShellItemImageFactory))
size_struct = SIZE(size, size)
hbitmap_handle = factory_ptr.GetImage(size_struct, SIIGBF_THUMBNAILONLY)
if hbitmap_handle:
try:
bitmap_info = BITMAP()
if GetObjectW(hbitmap_handle, ctypes.sizeof(bitmap_info), ctypes.byref(bitmap_info)) == 0: return None, file_path
bmp = Image.frombuffer('RGB', (bitmap_info.bmWidth, bitmap_info.bmHeight), ctypes.string_at(bitmap_info.bmBits, bitmap_info.bmWidthBytes * bitmap_info.bmHeight), 'raw', 'BGRX', bitmap_info.bmWidthBytes, -1)
bmp_copy = bmp.transpose(Image.FLIP_TOP_BOTTOM)
finally:
DeleteObject(hbitmap_handle)
except comtypes.COMError:
return None, file_path
except Exception as e:
print(f"Error extracting native thumbnail for {os.path.basename(file_path)}: {e}")
return None, file_path
finally:
if factory_ptr:
del factory_ptr
if bmp_copy:
try:
buffer = io.BytesIO()
bmp_copy.save(buffer, format="JPEG", quality=80)
return base64.b64encode(buffer.getvalue()).decode('utf-8'), file_path
except Exception as e:
print(f"Failed to convert or save image for '{os.path.basename(file_path)}': {e}")
return None, file_path
def process_thumbnail_chunk(file_paths_chunk):
comtypes.CoInitialize()
results = []
try:
for file_path in file_paths_chunk:
results.append(get_thumbnail_as_base64(file_path))
finally:
comtypes.CoUninitialize()
return results
else:
def get_thumbnail_as_base64(file_path, size=256):
return None, file_path
def process_thumbnail_chunk(file_paths_chunk):
return [get_thumbnail_as_base64(path) for path in file_paths_chunk]
def get_thumbnails_in_batch_windows(file_paths):
if not file_paths or os.name != 'nt':
return {}
results = {}
num_workers = min(os.cpu_count() * 2, 16)
chunk_size = math.ceil(len(file_paths) / num_workers) if file_paths else 0
if chunk_size == 0: return {}
chunks = [file_paths[i:i + chunk_size] for i in range(0, len(file_paths), chunk_size)]
with ThreadPoolExecutor(max_workers=num_workers) as executor:
for chunk_result in executor.map(process_thumbnail_chunk, chunks):
for thumbnail, path in chunk_result:
if thumbnail:
results[path] = thumbnail
return results |