ChandimaPrabath commited on
Commit
40685b6
·
1 Parent(s): 86056c3
Files changed (6) hide show
  1. .gitignore +4 -0
  2. app.py +47 -0
  3. benchmark.py +105 -0
  4. requirements.txt +3 -0
  5. smart_search.py +68 -0
  6. test_api.py +38 -0
.gitignore ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ # venv
2
+ .venv
3
+ # pycache
4
+ __pycache__
app.py ADDED
@@ -0,0 +1,47 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ from flask import Flask, request, jsonify
3
+ from smart_search import SmartSearch
4
+ from tempfile import NamedTemporaryFile
5
+
6
+ app = Flask(__name__)
7
+
8
+ # Function to download data from API and save to a temporary file
9
+ def download_and_save_data(url: str) -> str:
10
+ response = requests.get(url)
11
+ response.raise_for_status() # Ensure we raise an error for bad responses
12
+ # Create a temporary file
13
+ with NamedTemporaryFile(delete=False, suffix='.json') as tmp_file:
14
+ tmp_file.write(response.content)
15
+ return tmp_file.name
16
+
17
+ # URLs for the data
18
+ films_url = 'https://unicone-studio-load-balancer.hf.space/api/film/all'
19
+ tv_series_url = 'https://unicone-studio-load-balancer.hf.space/api/tv/all'
20
+
21
+ # Download and save data
22
+ films_file = download_and_save_data(films_url)
23
+ tv_series_file = download_and_save_data(tv_series_url)
24
+
25
+ # Initialize the SmartSearch instance with the paths to the data files
26
+ search_system = SmartSearch(films_file, tv_series_file)
27
+
28
+ @app.route('/api/search', methods=['POST'])
29
+ def search():
30
+ # Ensure the request is JSON
31
+ if not request.is_json:
32
+ return jsonify({"error": "Request must be JSON"}), 400
33
+
34
+ # Extract the query from the JSON body
35
+ data = request.get_json()
36
+ query = data.get('query')
37
+
38
+ if not query:
39
+ return jsonify({"error": "Missing 'query' field in JSON body"}), 400
40
+
41
+ # Perform the search
42
+ results = search_system.search(query)
43
+
44
+ return jsonify(results)
45
+
46
+ if __name__ == '__main__':
47
+ app.run(debug=True, host="0.0.0.0")
benchmark.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ from smart_search import SmartSearch
3
+ import matplotlib.pyplot as plt
4
+ from sklearn.metrics import precision_recall_fscore_support
5
+
6
+ # Initialize SmartSearch with the data files
7
+ films_file = 'films.json'
8
+ tv_series_file = 'tv_series.json'
9
+ search_system = SmartSearch(films_file, tv_series_file)
10
+
11
+ # Define a list of test queries (including misspelled queries) and expected results
12
+ test_queries = [
13
+ {"query": "my spy", "expected_films": ["My spy 2020", "My spy 2024"], "expected_tv_series": []},
14
+ {"query": "my sp", "expected_films": ["My spy 2020", "My spy 2024"], "expected_tv_series": []},
15
+ {"query": "funky monk", "expected_films": ["Funky Monkey 2004"], "expected_tv_series": []},
16
+ {"query": "yaariyan", "expected_films": ["Yaariayan (2014)", "Yaariyan 2 (2023)"], "expected_tv_series": []},
17
+ {"query": "grand blu", "expected_films": [], "expected_tv_series": ["Grand Blue"]},
18
+ {"query": "aho girl", "expected_films": [], "expected_tv_series": ["Aho Girl"]},
19
+ ]
20
+
21
+ # Benchmarking function
22
+ def benchmark_search(search_system, queries):
23
+ detailed_results = []
24
+ total_time = 0
25
+
26
+ for test_case in queries:
27
+ query = test_case['query']
28
+ expected_films = test_case['expected_films']
29
+ expected_tv_series = test_case['expected_tv_series']
30
+
31
+ start_time = time.time()
32
+ result = search_system.search(query)
33
+ end_time = time.time()
34
+
35
+ elapsed_time = end_time - start_time
36
+ total_time += elapsed_time
37
+
38
+ # Calculate precision, recall, and F1 score using 'weighted' average
39
+ films_precision, films_recall, films_f1, _ = precision_recall_fscore_support(
40
+ expected_films, result['films'], average='weighted')
41
+ tv_series_precision, tv_series_recall, tv_series_f1, _ = precision_recall_fscore_support(
42
+ expected_tv_series, result['tv_series'], average='weighted')
43
+
44
+ detailed_results.append({
45
+ "query": query,
46
+ "result": result,
47
+ "elapsed_time": elapsed_time,
48
+ "films_precision": films_precision,
49
+ "films_recall": films_recall,
50
+ "films_f1": films_f1,
51
+ "tv_series_precision": tv_series_precision,
52
+ "tv_series_recall": tv_series_recall,
53
+ "tv_series_f1": tv_series_f1
54
+ })
55
+
56
+ avg_time = total_time / len(queries)
57
+ return detailed_results, avg_time
58
+
59
+ # Run the benchmark
60
+ detailed_results, avg_time = benchmark_search(search_system, test_queries)
61
+
62
+ # Displaying results
63
+ def display_results(detailed_results, avg_time):
64
+ queries = [result['query'] for result in detailed_results]
65
+ times = [result['elapsed_time'] for result in detailed_results]
66
+ films_f1_scores = [result['films_f1'] for result in detailed_results]
67
+ tv_series_f1_scores = [result['tv_series_f1'] for result in detailed_results]
68
+
69
+ print(f"Average search time: {avg_time:.4f} seconds\n")
70
+
71
+ for result in detailed_results:
72
+ print(f"Query: '{result['query']}'")
73
+ print(f"Time taken: {result['elapsed_time']:.4f} seconds")
74
+ print("Films found:", result['result']['films'])
75
+ print("TV Series found:", result['result']['tv_series'])
76
+ print(f"Films Precision: {result['films_precision']:.2f}")
77
+ print(f"Films Recall: {result['films_recall']:.2f}")
78
+ print(f"Films F1 Score: {result['films_f1']:.2f}")
79
+ print(f"TV Series Precision: {result['tv_series_precision']:.2f}")
80
+ print(f"TV Series Recall: {result['tv_series_recall']:.2f}")
81
+ print(f"TV Series F1 Score: {result['tv_series_f1']:.2f}")
82
+ print("-" * 50)
83
+
84
+ # Plotting the time taken for each query
85
+ plt.figure(figsize=(10, 6))
86
+ plt.barh(queries, times, color='skyblue')
87
+ plt.xlabel('Time (seconds)')
88
+ plt.title('Time Taken for Each Search Query')
89
+ plt.show()
90
+
91
+ # Plotting the F1 scores for films and TV series
92
+ plt.figure(figsize=(10, 6))
93
+ width = 0.35
94
+ indices = range(len(queries))
95
+ plt.bar(indices, films_f1_scores, width, label='Films F1 Score', color='green')
96
+ plt.bar([i + width for i in indices], tv_series_f1_scores, width, label='TV Series F1 Score', color='orange')
97
+ plt.xticks([i + width / 2 for i in indices], queries)
98
+ plt.xlabel('Queries')
99
+ plt.ylabel('F1 Score')
100
+ plt.title('F1 Scores for Films and TV Series')
101
+ plt.legend(loc='best')
102
+ plt.show()
103
+
104
+ # Run the display function to show results
105
+ display_results(detailed_results, avg_time)
requirements.txt ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ Flask
2
+ fuzzywuzzy
3
+ python-Levenshtein
smart_search.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import re
3
+ from fuzzywuzzy import fuzz
4
+ from typing import List, Dict, Tuple, Union
5
+
6
+ class SmartSearch:
7
+ def __init__(self, films_file: str, tv_series_file: str):
8
+ self.films, self.tv_series = self.load_data(films_file, tv_series_file)
9
+ self.index = self.create_index(self.films, self.tv_series)
10
+
11
+ def load_data(self, films_file: str, tv_series_file: str) -> Tuple[List[str], Dict[str, List[Dict[str, str]]]]:
12
+ with open(films_file, 'r') as f:
13
+ films = json.load(f)
14
+
15
+ with open(tv_series_file, 'r') as f:
16
+ tv_series = json.load(f)
17
+
18
+ # Create a mapping for normalized titles to their original titles
19
+ films_normalized = {}
20
+ for film in films:
21
+ normalized_film = re.sub(r'^films/', '', film).lower()
22
+ films_normalized[normalized_film] = re.sub(r'^films/', '', film)
23
+
24
+ # Normalize TV series titles and episodes, and map to original
25
+ tv_series_normalized = {}
26
+ for series, episodes in tv_series.items():
27
+ series_normalized = re.sub(r'^tv/', '', series).lower()
28
+ episodes_normalized = [
29
+ {
30
+ "episode": re.sub(r'^tv/.*?/.*?/', '', ep['episode']),
31
+ "path": ep['path'],
32
+ "season": ep['season']
33
+ }
34
+ for ep in episodes
35
+ ]
36
+ tv_series_normalized[series_normalized] = {
37
+ "original": series,
38
+ "episodes": episodes_normalized
39
+ }
40
+
41
+ return films_normalized, tv_series_normalized
42
+
43
+ def create_index(self, films: Dict[str, str], tv_series: Dict[str, Dict[str, Union[str, List[Dict[str, str]]]]]) -> Dict[str, Union[Dict[str, str], Dict[str, Dict[str, Union[str, List[Dict[str, str]]]]]]]:
44
+ return {
45
+ 'films': films,
46
+ 'tv_series': tv_series
47
+ }
48
+
49
+ def search(self, query: str) -> Dict[str, List[str]]:
50
+ query = query.lower()
51
+ results = {'films': [], 'tv_series': []}
52
+
53
+ # Search films
54
+ films = self.index['films']
55
+ results['films'] = [films[film] for film in films if query in film or fuzz.partial_ratio(query, film) > 80]
56
+
57
+ # Search TV series
58
+ tv_series = self.index['tv_series']
59
+ for series, data in tv_series.items():
60
+ if query in series or fuzz.partial_ratio(query, series) > 80:
61
+ results['tv_series'].append(data['original'])
62
+ else:
63
+ for episode in data['episodes']:
64
+ if query in episode['episode'].lower() or fuzz.partial_ratio(query, episode['episode'].lower()) > 80:
65
+ results['tv_series'].append(data['original'])
66
+ break
67
+
68
+ return results
test_api.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import json
3
+
4
+ # Define the API endpoint
5
+ url = "http://127.0.0.1:5000/api/search"
6
+
7
+ # Define the search queries you want to test
8
+ test_queries = [
9
+ {"query": "my spy"},
10
+ {"query": "ahoh girl"},
11
+ {"query": "yarian"},
12
+ {"query": "grand blue"},
13
+ {"query": "Non-existent Title"},
14
+ {"query": "yariyan 203"},
15
+ {"query": "My spey"} # Intentional typo to test fuzzy matching
16
+ ]
17
+
18
+ def test_search_api():
19
+ for query in test_queries:
20
+ print(f"Testing query: {query['query']}")
21
+
22
+ # Send the POST request
23
+ response = requests.post(url, json=query)
24
+
25
+ # Check if the request was successful
26
+ if response.status_code == 200:
27
+ # Parse the JSON response
28
+ results = response.json()
29
+ print(f"Results for '{query['query']}':")
30
+ print(json.dumps(results, indent=4))
31
+ else:
32
+ print(f"Failed to get results for '{query['query']}'. Status code: {response.status_code}")
33
+ print(response.text)
34
+
35
+ print("\n" + "="*50 + "\n")
36
+
37
+ if __name__ == "__main__":
38
+ test_search_api()