import io import time import base64 import asyncio # import requests import pandas as pd import asyncio, aiohttp from PyPDF2 import PdfReader # from typing import List from services.agents.AutograderAgent import AutograderAgent # from multiprocessing import Pool from functools import wraps from typing import List # LOAD URLS CV urls = pd.read_excel("src/knowledge_base/CV_urls.xlsx", engine='openpyxl') urls = urls["url_cv"].tolist() print(len(urls)) autograder_agent = AutograderAgent() def measure_runtime(func): if asyncio.iscoroutinefunction(func): @wraps(func) async def async_wrapper(*args, **kwargs): start = time.perf_counter() result = await func(*args, **kwargs) end = time.perf_counter() print(f"⏱️ Async function '{func.__name__}' executed in {end - start:.10f} seconds") return result return async_wrapper else: @wraps(func) def sync_wrapper(*args, **kwargs): start = time.perf_counter() result = func(*args, **kwargs) end = time.perf_counter() print(f"⏱️ Function '{func.__name__}' executed in {end - start:.10f} seconds") return result return sync_wrapper async def response_builder(url, session): async with session.get(url) as response: if response.status == 200: content = await response.read() # enc_content = base64.encodebytes(content) return {url: content} else: print(f"Failed to fetch {url} with status {response.status}, {response}") @measure_runtime async def afetch_urls(urls): async with aiohttp.ClientSession() as session: tasks = [response_builder(url, session) for url in urls] result = await asyncio.gather(*tasks) return result # INGEST TO DB # res = asyncio.run(afetch_urls(urls[:3])) # len(res) # res[-1].keys() # reader = PdfReader(io.BytesIO(base64.decodebytes(res[-1]))) # reader.pages[0].extract_text() from externals.databases._pgdb import execute_query, execute_insert # template_ingest_to_cv_raw = """ # insert into cv_raw (filename, file_content) # values ('{filename}', '{file_content}'); # """.format(filename=list(res[-1].keys())[0], # file_content=res[-1][list(res[-1].keys())[0]] # ) async def RawIngest(urls: List[str]): try: response = await afetch_urls(urls) for res in response: await execute_insert(filename=list(res.keys())[0], file_content=res[list(res.keys())[0]]) except Exception as E: print(f"❌ Error when Ingesting to cv_raw, {E}") async def run_rawingest_pipeline(): chunk_size = 20 urls_sample = urls[-2000:-1000] for chunk in range(0, len(urls_sample), chunk_size): chunk_urls = urls_sample[chunk:chunk+chunk_size] await RawIngest(urls=chunk_urls) asyncio.run(run_rawingest_pipeline()) # async def get_cv_from_url(url:str) -> dict: # try: # response = requests.get(url) # return {f"{url}":response.content} # except: # print(f"Failed to get cv from {url}") # return {f"{url}":b""} # def get_cv_from_urls_na(urls: List) -> List: # _st = time.time() # cvs = [] # n_urls = len(urls) # for i, url in enumerate(urls): # response = requests.get(url) # cv = {f"{url}":response.content} # cvs.append(cv) # print(f"Loading... {round((i+1)*100/n_urls)}%") # _rt = time.time() - _st # print(f"✅ Get CV from urls finished in {round(_rt,2)}s") # return cvs # async def get_cv_from_urls(urls: List, BATCH_SIZE:int=1000): # _st = time.time() # print(f"Loading... 0%") # N_BATCH = (len(urls) // BATCH_SIZE) + (1 if (len(urls) % BATCH_SIZE) > 0 else 0) # batched_urls = [] # for i in range(N_BATCH): # one_batch_urls = urls[(i)*BATCH_SIZE:(i+1)*BATCH_SIZE] # batched_urls.append(one_batch_urls) # cvs = [] # for i, one_batch_urls in enumerate(batched_urls): # tasks = [] # for url in one_batch_urls: # task = asyncio.create_task(get_cv_from_url(url)) # tasks.append(task) # one_batch_cvs = await asyncio.gather(*tasks) # print(f"Loading... {round((i+1)*100/len(batched_urls))}%") # cvs.extend(one_batch_cvs) # _rt = time.time() - _st # print(f"✅ Get CV from urls finished in {round(_rt,2)}s") # return cvs # import io # from src.utils.utils import pdf_reader # from PyPDF2 import PdfReader # cvs = asyncio.run(get_cv_from_urls(urls[:100], BATCH_SIZE=2)) #154.68s # cvs = asyncio.run(get_cv_from_urls(urls[:100], BATCH_SIZE=2)) #154.68s # len(cvs) # cvs = get_cv_from_urls_na(urls[:100]) #163.93s # url_img = "https://api.typeform.com/responses/files/915940175d9054dec0a8cec98bc3d5345bed0153ed7ffdd60514a94e1055ee57/CV_Dhanendra_Wiryohutomo.pdf" # res = asyncio.run(get_cv_from_url(url_img)) # cv_img = res[list(res.keys())[0]] # cv = cvs[0][list(cvs[0].keys())[0]] # reader = asyncio.run(pdf_reader(cv)) # print(reader) # reader = PdfReader(io.BytesIO(cv_img)) # reader = asyncio.run(pdf_reader(io.BytesIO(cv_img))) # reader = asyncio.run(pdf_reader(cv_img)) # user_profile = "" # for page in reader.pages: # text = page.extract_text() # if text: # user_profile += text + "\n" # print(user_profile)