File size: 4,807 Bytes
655c38a
 
 
 
 
 
 
 
 
 
 
57b5eed
 
 
 
 
 
 
 
 
655c38a
 
 
 
57b5eed
 
 
 
 
 
 
655c38a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57b5eed
655c38a
 
57b5eed
 
655c38a
 
 
57b5eed
655c38a
 
57b5eed
655c38a
57b5eed
655c38a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import os
import logging
import asyncio
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import List
from query_processing import process_query, vector_search, extract_attributes
from scraper import is_url, scrape_job_description

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('shl-api')

# Initialize FastAPI app
app = FastAPI()

@app.middleware("http")
async def log_requests(request, call_next):
    logger.info(f"Incoming {request.method} request to {request.url}")
    response = await call_next(request)
    logger.info(f"Completed {request.method} request to {request.url} with status {response.status_code}")
    return response

# Pydantic models for request/response validation
class Query(BaseModel):
    query: str

class Assessment(BaseModel):
    url: str
    adaptive_support: str
    description: str
    duration: int
    remote_support: str
    test_type: List[str]

class RecommendationResponse(BaseModel):
    recommended_assessments: List[Assessment]

@app.get("/health")
async def health_check():
    """Health check endpoint to verify API is running."""
    return {"status": "healthy"}

@app.post("/recommend", response_model=RecommendationResponse)
async def recommend(query: Query):
    """Endpoint to get assessment recommendations based on job description or query."""
    try:
        query_text = query.query.strip()
        if not query_text:
            logger.error("Empty query received")
            raise HTTPException(status_code=400, detail="Query cannot be empty")
        
        logger.info(f"Processing query: {query_text[:100]}...")
        
        # Check if input is a URL and scrape if needed
        if is_url(query_text):
            try:
                logger.info(f"Scraping job description from URL: {query_text}")
                query_text = await asyncio.to_thread(scrape_job_description, query_text)
                if not query_text:
                    logger.error(f"Failed to extract job description from URL: {query_text}")
                    raise HTTPException(status_code=400, detail="Could not extract job description from URL")
                logger.info("Successfully scraped job description")
            except Exception as e:
                logging.error(f"Error scraping URL {query_text}: {str(e)}")
                raise HTTPException(status_code=400, detail="Failed to scrape job description from URL")
        
        # Process the query asynchronously
        query_embedding = await asyncio.to_thread(process_query, query_text)
        
        # Perform vector search asynchronously
        distances, indices = await asyncio.to_thread(vector_search, query_embedding)
        
        # Reshape indices and distances if needed
        if len(indices.shape) == 1:
            indices = indices.reshape(1, -1)
            distances = distances.reshape(1, -1)
            
        # Extract attributes from search results asynchronously
        raw_results = await asyncio.to_thread(extract_attributes, distances=distances, indices=indices)
        
        # Transform results to match required response format
        recommended_assessments = []
        for idx, result in enumerate(raw_results[:10]):  # Limit to 10 assessments
            # Calculate similarity score (normalized between 0 and 1)
            similarity_score = 1 / (1 + distances[0][idx]) if idx < len(distances[0]) else 0.0
            
            assessment = Assessment(
                url=result['URL'],
                description=result.get('description', '').strip(),
                duration=int(''.join(filter(str.isdigit, result.get('Duration', '60')))) if result.get('Duration') and any(c.isdigit() for c in result.get('Duration')) else 60,
                remote_support="Yes" if result.get('Remote_Testing', False) else "No",
                adaptive_support="Yes" if any("adaptive" in feature.lower() for feature in result.get('Key_Features', [])) else "No",
                test_type=[feature.strip() for feature in result.get('Key_Features', []) if feature.strip()]
            )
            recommended_assessments.append(assessment)
        
        return RecommendationResponse(recommended_assessments=recommended_assessments)
        
    except Exception as e:
        logging.error(f"Error in recommendation pipeline: {str(e)}")
        raise HTTPException(status_code=500, detail="Internal server error")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(
        "api:app",
        host="0.0.0.0",
        port=8000,
        workers=1,
        log_level="info"
    )