eu-scrapper / app /api /scraper /db_requests.py
brestok's picture
init
ab0a73d
"""
Scraper database requests.
"""
import asyncio
import re
from datetime import timedelta, datetime
from app.api.common.schemas import FilterRequest
from app.api.scraper.dto import JobFilter
from app.api.scraper.models import JobModel
from app.core.config import settings
from app.api.scraper.schemas import StatisticsResponse
async def filter_jobs(
filter_request: FilterRequest[JobFilter],
) -> tuple[list[JobModel], int]:
"""
Filter jobs based on the filter request.
"""
query = {}
skip = filter_request.pageSize * filter_request.pageIndex
if filter_request.filter.titles:
query["title"] = {
"$regex": "|".join(
[re.escape(title) for title in filter_request.filter.titles]
),
"$options": "i",
}
if filter_request.filter.companies:
query["company"] = {
"$regex": "|".join(
[re.escape(company) for company in filter_request.filter.companies]
),
"$options": "i",
}
if filter_request.filter.locations:
query["location"] = {
"$regex": "|".join(
[re.escape(location) for location in filter_request.filter.locations]
),
"$options": "i",
}
if filter_request.filter.minSalary:
query["salary.min"] = {"$gte": filter_request.filter.minSalary}
if filter_request.filter.maxSalary:
query["salary.max"] = {"$lte": filter_request.filter.maxSalary}
if filter_request.filter.isTop5:
query["isTop5"] = filter_request.filter.isTop5
if filter_request.filter.minDate:
query["datetimeInserted"] = {"$gte": filter_request.filter.minDate.isoformat()}
if filter_request.filter.maxDate:
query["datetimeInserted"] = {"$lte": filter_request.filter.maxDate.isoformat()}
jobs, total_count = await asyncio.gather(
settings.DB_CLIENT.jobs.find(query)
.sort(*("_id", -1))
.skip(skip)
.limit(filter_request.pageSize)
.to_list(length=filter_request.pageSize),
settings.DB_CLIENT.jobs.count_documents(query), # Исправлено: было activitylogs
)
return [JobModel.from_mongo(job) for job in jobs], total_count
async def search_field_options(field: str, value: str) -> list[str]:
"""
Search field options based on the field and value.
"""
if field not in ["title", "company", "location"]:
return []
pipeline = [{"$match": {field: {"$regex": re.escape(value), "$options": "i"}}}]
pipeline.extend(
[
{"$group": {"_id": f"${field}"}},
{"$project": {"_id": 0, "value": "$_id"}},
{"$sort": {"value": 1}},
{"$limit": 5},
]
)
result = await settings.DB_CLIENT.jobs.aggregate(pipeline).to_list(length=5)
return [item["value"] for item in result if item["value"] and item["value"].strip()]
async def get_statistics() -> StatisticsResponse:
"""
Get the statistics.
"""
result = (
await settings.DB_CLIENT.jobs.find({}, {"_id": 0, "datetimeInserted": 1})
.sort("_id", -1)
.limit(1)
.to_list(length=1)
)
lastUpdate = datetime.fromisoformat(result[0]["datetimeInserted"])
now = datetime.now()
days_until_sunday = (6 - now.weekday()) % 7
if days_until_sunday == 0 and now.hour < 23:
nextUpdate = now.replace(hour=23, minute=0, second=0, microsecond=0)
else:
if days_until_sunday == 0:
days_until_sunday = 7
nextUpdate = (now + timedelta(days=days_until_sunday)).replace(
hour=23, minute=0, second=0, microsecond=0
)
return StatisticsResponse(lastUpdate=lastUpdate, nextUpdate=nextUpdate)
async def save_job_obj(job: JobModel):
"""
Save a job object.
"""
if await check_if_job_exists(job):
print(f"Skipped job [{job.title}]!")
return
await settings.DB_CLIENT.jobs.insert_one(job.to_mongo())
print(f"Job [{job.title}] saved!")
async def check_if_job_exists(job: JobModel):
"""
Check if a job exists.
"""
job = await settings.DB_CLIENT.jobs.find_one(
{"title": job.title, "description": job.description}, {"_id": 1}
)
return True if job else False