| import os | |
| import base64 | |
| import io | |
| from PIL import Image | |
| from concurrent.futures import ThreadPoolExecutor | |
| import math | |
| if os.name == 'nt': | |
| import ctypes | |
| from ctypes import wintypes | |
| import comtypes | |
| from comtypes import IUnknown, GUID, HRESULT, COMMETHOD | |
| class SIZE(ctypes.Structure): | |
| _fields_ = [("cx", wintypes.LONG), ("cy", wintypes.LONG)] | |
| class IShellItemImageFactory(IUnknown): | |
| _case_insensitive_ = True | |
| _iid_ = GUID('{bcc18b79-ba16-442f-80c4-8a59c30c463b}') | |
| _idlflags_ = [] | |
| _methods_ = [COMMETHOD([], HRESULT, 'GetImage', (['in'], SIZE, 'size'), (['in'], ctypes.c_uint, 'flags'), (['out', 'retval'], ctypes.POINTER(wintypes.HBITMAP), 'phbm'))] | |
| SIIGBF_THUMBNAILONLY = 0x8 | |
| shell32 = ctypes.windll.shell32 | |
| gdi32 = ctypes.windll.gdi32 | |
| SHCreateItemFromParsingName = shell32.SHCreateItemFromParsingName | |
| SHCreateItemFromParsingName.argtypes = [ | |
| wintypes.LPCWSTR, | |
| ctypes.c_void_p, | |
| ctypes.POINTER(GUID), | |
| ctypes.POINTER(ctypes.c_void_p) | |
| ] | |
| SHCreateItemFromParsingName.restype = HRESULT | |
| class BITMAP(ctypes.Structure): | |
| _fields_ = [("bmType", wintypes.LONG), ("bmWidth", wintypes.LONG), ("bmHeight", wintypes.LONG), ("bmWidthBytes", wintypes.LONG), ("bmPlanes", wintypes.WORD), ("bmBitsPixel", wintypes.WORD), ("bmBits", wintypes.LPVOID)] | |
| GetObjectW = gdi32.GetObjectW | |
| GetObjectW.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.c_void_p] | |
| GetObjectW.restype = ctypes.c_int | |
| DeleteObject = gdi32.DeleteObject | |
| DeleteObject.argtypes = [ctypes.c_void_p] | |
| DeleteObject.restype = wintypes.BOOL | |
| def get_thumbnail_as_base64(file_path, size=128): | |
| hbitmap_handle = 0 | |
| bmp_copy = None | |
| factory_ptr = None | |
| ppv = ctypes.c_void_p(None) | |
| try: | |
| hr = SHCreateItemFromParsingName( | |
| file_path, | |
| None, | |
| ctypes.byref(IShellItemImageFactory._iid_), | |
| ctypes.byref(ppv) | |
| ) | |
| if hr == 0 and ppv.value: | |
| factory_ptr = ctypes.cast(ppv, ctypes.POINTER(IShellItemImageFactory)) | |
| size_struct = SIZE(size, size) | |
| hbitmap_handle = factory_ptr.GetImage(size_struct, SIIGBF_THUMBNAILONLY) | |
| if hbitmap_handle: | |
| try: | |
| bitmap_info = BITMAP() | |
| if GetObjectW(hbitmap_handle, ctypes.sizeof(bitmap_info), ctypes.byref(bitmap_info)) == 0: return None, file_path | |
| bmp = Image.frombuffer('RGB', (bitmap_info.bmWidth, bitmap_info.bmHeight), ctypes.string_at(bitmap_info.bmBits, bitmap_info.bmWidthBytes * bitmap_info.bmHeight), 'raw', 'BGRX', bitmap_info.bmWidthBytes, -1) | |
| bmp_copy = bmp.transpose(Image.FLIP_TOP_BOTTOM) | |
| finally: | |
| DeleteObject(hbitmap_handle) | |
| except comtypes.COMError: | |
| return None, file_path | |
| except Exception as e: | |
| print(f"Error extracting native thumbnail for {os.path.basename(file_path)}: {e}") | |
| return None, file_path | |
| finally: | |
| if factory_ptr: | |
| del factory_ptr | |
| if bmp_copy: | |
| try: | |
| buffer = io.BytesIO() | |
| bmp_copy.save(buffer, format="JPEG", quality=80) | |
| return base64.b64encode(buffer.getvalue()).decode('utf-8'), file_path | |
| except Exception as e: | |
| print(f"Failed to convert or save image for '{os.path.basename(file_path)}': {e}") | |
| return None, file_path | |
| def process_thumbnail_chunk(file_paths_chunk): | |
| comtypes.CoInitialize() | |
| results = [] | |
| try: | |
| for file_path in file_paths_chunk: | |
| results.append(get_thumbnail_as_base64(file_path)) | |
| finally: | |
| comtypes.CoUninitialize() | |
| return results | |
| else: | |
| def get_thumbnail_as_base64(file_path, size=256): | |
| return None, file_path | |
| def process_thumbnail_chunk(file_paths_chunk): | |
| return [get_thumbnail_as_base64(path) for path in file_paths_chunk] | |
| def get_thumbnails_in_batch_windows(file_paths): | |
| if not file_paths or os.name != 'nt': | |
| return {} | |
| results = {} | |
| num_workers = min(os.cpu_count() * 2, 16) | |
| chunk_size = math.ceil(len(file_paths) / num_workers) if file_paths else 0 | |
| if chunk_size == 0: return {} | |
| chunks = [file_paths[i:i + chunk_size] for i in range(0, len(file_paths), chunk_size)] | |
| with ThreadPoolExecutor(max_workers=num_workers) as executor: | |
| for chunk_result in executor.map(process_thumbnail_chunk, chunks): | |
| for thumbnail, path in chunk_result: | |
| if thumbnail: | |
| results[path] = thumbnail | |
| return results |