File size: 4,300 Bytes
b60402f
 
 
 
 
 
ab0a73d
b60402f
ab0a73d
b60402f
 
ab0a73d
b60402f
 
 
 
 
 
 
 
 
 
ab0a73d
b60402f
 
ab0a73d
 
 
 
b60402f
ab0a73d
b60402f
 
ab0a73d
 
 
 
b60402f
ab0a73d
b60402f
 
ab0a73d
 
 
 
b60402f
ab0a73d
b60402f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab0a73d
b60402f
 
 
 
 
 
ab0a73d
 
 
 
 
 
 
 
 
 
 
 
b60402f
 
 
 
ab0a73d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
"""
Scraper database requests.
"""

import asyncio
import re
from datetime import timedelta, datetime
from app.api.common.schemas import FilterRequest
from app.api.scraper.dto import JobFilter
from app.api.scraper.models import JobModel
from app.core.config import settings
from app.api.scraper.schemas import StatisticsResponse


async def filter_jobs(
    filter_request: FilterRequest[JobFilter],
) -> tuple[list[JobModel], int]:
    """
    Filter jobs based on the filter request.
    """
    query = {}
    skip = filter_request.pageSize * filter_request.pageIndex

    if filter_request.filter.titles:
        query["title"] = {
            "$regex": "|".join(
                [re.escape(title) for title in filter_request.filter.titles]
            ),
            "$options": "i",
        }

    if filter_request.filter.companies:
        query["company"] = {
            "$regex": "|".join(
                [re.escape(company) for company in filter_request.filter.companies]
            ),
            "$options": "i",
        }

    if filter_request.filter.locations:
        query["location"] = {
            "$regex": "|".join(
                [re.escape(location) for location in filter_request.filter.locations]
            ),
            "$options": "i",
        }

    if filter_request.filter.minSalary:
        query["salary.min"] = {"$gte": filter_request.filter.minSalary}
    if filter_request.filter.maxSalary:
        query["salary.max"] = {"$lte": filter_request.filter.maxSalary}
    if filter_request.filter.isTop5:
        query["isTop5"] = filter_request.filter.isTop5
    if filter_request.filter.minDate:
        query["datetimeInserted"] = {"$gte": filter_request.filter.minDate.isoformat()}
    if filter_request.filter.maxDate:
        query["datetimeInserted"] = {"$lte": filter_request.filter.maxDate.isoformat()}

    jobs, total_count = await asyncio.gather(
        settings.DB_CLIENT.jobs.find(query)
        .sort(*("_id", -1))
        .skip(skip)
        .limit(filter_request.pageSize)
        .to_list(length=filter_request.pageSize),
        settings.DB_CLIENT.jobs.count_documents(query),  # Исправлено: было activitylogs
    )
    return [JobModel.from_mongo(job) for job in jobs], total_count


async def search_field_options(field: str, value: str) -> list[str]:
    """
    Search field options based on the field and value.
    """
    if field not in ["title", "company", "location"]:
        return []

    pipeline = [{"$match": {field: {"$regex": re.escape(value), "$options": "i"}}}]

    pipeline.extend(
        [
            {"$group": {"_id": f"${field}"}},
            {"$project": {"_id": 0, "value": "$_id"}},
            {"$sort": {"value": 1}},
            {"$limit": 5},
        ]
    )

    result = await settings.DB_CLIENT.jobs.aggregate(pipeline).to_list(length=5)
    return [item["value"] for item in result if item["value"] and item["value"].strip()]


async def get_statistics() -> StatisticsResponse:
    """
    Get the statistics.
    """
    result = (
        await settings.DB_CLIENT.jobs.find({}, {"_id": 0, "datetimeInserted": 1})
        .sort("_id", -1)
        .limit(1)
        .to_list(length=1)
    )

    lastUpdate = datetime.fromisoformat(result[0]["datetimeInserted"])
    now = datetime.now()
    days_until_sunday = (6 - now.weekday()) % 7
    if days_until_sunday == 0 and now.hour < 23:
        nextUpdate = now.replace(hour=23, minute=0, second=0, microsecond=0)
    else:
        if days_until_sunday == 0:
            days_until_sunday = 7
        nextUpdate = (now + timedelta(days=days_until_sunday)).replace(
            hour=23, minute=0, second=0, microsecond=0
        )
    return StatisticsResponse(lastUpdate=lastUpdate, nextUpdate=nextUpdate)


async def save_job_obj(job: JobModel):
    """
    Save a job object.
    """
    if await check_if_job_exists(job):
        print(f"Skipped job [{job.title}]!")
        return
    await settings.DB_CLIENT.jobs.insert_one(job.to_mongo())
    print(f"Job [{job.title}] saved!")


async def check_if_job_exists(job: JobModel):
    """
    Check if a job exists.
    """
    job = await settings.DB_CLIENT.jobs.find_one(
        {"title": job.title, "description": job.description}, {"_id": 1}
    )
    return True if job else False