CandidateExplorer / services /knowledge_base /PipelineKBIngestion.py
ishaq101's picture
clean init
478dec6
import io
import time
import base64
import asyncio
# import requests
import pandas as pd
import asyncio, aiohttp
from PyPDF2 import PdfReader
# from typing import List
from services.agents.AutograderAgent import AutograderAgent
# from multiprocessing import Pool
from functools import wraps
from typing import List
# LOAD URLS CV
urls = pd.read_excel("src/knowledge_base/CV_urls.xlsx", engine='openpyxl')
urls = urls["url_cv"].tolist()
print(len(urls))
autograder_agent = AutograderAgent()
def measure_runtime(func):
if asyncio.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start = time.perf_counter()
result = await func(*args, **kwargs)
end = time.perf_counter()
print(f"⏱️ Async function '{func.__name__}' executed in {end - start:.10f} seconds")
return result
return async_wrapper
else:
@wraps(func)
def sync_wrapper(*args, **kwargs):
start = time.perf_counter()
result = func(*args, **kwargs)
end = time.perf_counter()
print(f"⏱️ Function '{func.__name__}' executed in {end - start:.10f} seconds")
return result
return sync_wrapper
async def response_builder(url, session):
async with session.get(url) as response:
if response.status == 200:
content = await response.read()
# enc_content = base64.encodebytes(content)
return {url: content}
else:
print(f"Failed to fetch {url} with status {response.status}, {response}")
@measure_runtime
async def afetch_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [response_builder(url, session) for url in urls]
result = await asyncio.gather(*tasks)
return result
# INGEST TO DB
# res = asyncio.run(afetch_urls(urls[:3]))
# len(res)
# res[-1].keys()
# reader = PdfReader(io.BytesIO(base64.decodebytes(res[-1])))
# reader.pages[0].extract_text()
from externals.databases._pgdb import execute_query, execute_insert
# template_ingest_to_cv_raw = """
# insert into cv_raw (filename, file_content)
# values ('{filename}', '{file_content}');
# """.format(filename=list(res[-1].keys())[0],
# file_content=res[-1][list(res[-1].keys())[0]]
# )
async def RawIngest(urls: List[str]):
try:
response = await afetch_urls(urls)
for res in response:
await execute_insert(filename=list(res.keys())[0],
file_content=res[list(res.keys())[0]])
except Exception as E:
print(f"❌ Error when Ingesting to cv_raw, {E}")
async def run_rawingest_pipeline():
chunk_size = 20
urls_sample = urls[-2000:-1000]
for chunk in range(0, len(urls_sample), chunk_size):
chunk_urls = urls_sample[chunk:chunk+chunk_size]
await RawIngest(urls=chunk_urls)
asyncio.run(run_rawingest_pipeline())
# async def get_cv_from_url(url:str) -> dict:
# try:
# response = requests.get(url)
# return {f"{url}":response.content}
# except:
# print(f"Failed to get cv from {url}")
# return {f"{url}":b""}
# def get_cv_from_urls_na(urls: List) -> List:
# _st = time.time()
# cvs = []
# n_urls = len(urls)
# for i, url in enumerate(urls):
# response = requests.get(url)
# cv = {f"{url}":response.content}
# cvs.append(cv)
# print(f"Loading... {round((i+1)*100/n_urls)}%")
# _rt = time.time() - _st
# print(f"✅ Get CV from urls finished in {round(_rt,2)}s")
# return cvs
# async def get_cv_from_urls(urls: List, BATCH_SIZE:int=1000):
# _st = time.time()
# print(f"Loading... 0%")
# N_BATCH = (len(urls) // BATCH_SIZE) + (1 if (len(urls) % BATCH_SIZE) > 0 else 0)
# batched_urls = []
# for i in range(N_BATCH):
# one_batch_urls = urls[(i)*BATCH_SIZE:(i+1)*BATCH_SIZE]
# batched_urls.append(one_batch_urls)
# cvs = []
# for i, one_batch_urls in enumerate(batched_urls):
# tasks = []
# for url in one_batch_urls:
# task = asyncio.create_task(get_cv_from_url(url))
# tasks.append(task)
# one_batch_cvs = await asyncio.gather(*tasks)
# print(f"Loading... {round((i+1)*100/len(batched_urls))}%")
# cvs.extend(one_batch_cvs)
# _rt = time.time() - _st
# print(f"✅ Get CV from urls finished in {round(_rt,2)}s")
# return cvs
# import io
# from src.utils.utils import pdf_reader
# from PyPDF2 import PdfReader
# cvs = asyncio.run(get_cv_from_urls(urls[:100], BATCH_SIZE=2)) #154.68s
# cvs = asyncio.run(get_cv_from_urls(urls[:100], BATCH_SIZE=2)) #154.68s
# len(cvs)
# cvs = get_cv_from_urls_na(urls[:100]) #163.93s
# url_img = "https://api.typeform.com/responses/files/915940175d9054dec0a8cec98bc3d5345bed0153ed7ffdd60514a94e1055ee57/CV_Dhanendra_Wiryohutomo.pdf"
# res = asyncio.run(get_cv_from_url(url_img))
# cv_img = res[list(res.keys())[0]]
# cv = cvs[0][list(cvs[0].keys())[0]]
# reader = asyncio.run(pdf_reader(cv))
# print(reader)
# reader = PdfReader(io.BytesIO(cv_img))
# reader = asyncio.run(pdf_reader(io.BytesIO(cv_img)))
# reader = asyncio.run(pdf_reader(cv_img))
# user_profile = ""
# for page in reader.pages:
# text = page.extract_text()
# if text:
# user_profile += text + "\n"
# print(user_profile)