Spaces:
Sleeping
Sleeping
| import requests | |
| import time | |
| from openpyxl import load_workbook | |
| def read_excel(file_path): | |
| # 读取Excel文件 | |
| wb = load_workbook('music_id.xlsx') | |
| # 获取指定表 | |
| ws = wb['Results'] | |
| # 创建一个music_id到title的字典 | |
| music_dict = {} | |
| # 读取数据,并填充字典,假设第一行为表头 | |
| for row in ws.iter_rows(min_row=2, values_only=True): # 从第二行开始,跳过表头 | |
| title = row[0] # 第一列为 title | |
| music_id = row[1] # 第二列为 music_id | |
| if music_id and title: # 确保数据有效 | |
| music_dict[music_id] = title | |
| return music_dict | |
| # 匹配search_score中的MusicId返回对应的Title | |
| def match_title_from_json(excel_dict, json_data): | |
| search_scores = json_data.get('search_score', []) | |
| matched_titles = {music_id: excel_dict.get(music_id, '') for music_id in search_scores} | |
| first_value = next(iter(matched_titles.values())) | |
| return first_value | |
| class EvaluationProcessor: | |
| def __init__(self, api_key): | |
| self.api_key = api_key | |
| self.audio_url = 'https://test.aitanzou.com/web/api/submit_audio' | |
| self.video_url = 'https://test.aitanzou.com/web/api/submit_video' | |
| self.result_url = 'https://test.aitanzou.com/web/api/getEvaluationResult' | |
| self.headers = { | |
| 'API-Key': self.api_key | |
| } | |
| self.excel_file_path = 'music_id.xlsx' | |
| def submit_evaluation(self, file_path, is_video=False, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0): | |
| url = self.video_url if is_video else self.audio_url | |
| print(url) | |
| # 根据文件类型选择 MIME 类型 | |
| file_type = 'video/mp4' if is_video else 'audio/mp3' | |
| file_name = "video_file.mp4" if is_video else "audio_file.mp3" | |
| # 打开文件并指定文件类型 | |
| files = { | |
| 'file': (file_name, open(file_path, 'rb'), file_type) | |
| } | |
| data = { | |
| 'musicId': music_id, | |
| 'hand': hand, | |
| 'orderStart': order_start, | |
| 'orderEnd': order_end, | |
| 'repeatType': repeat_type | |
| } | |
| response = requests.post(url, headers=self.headers, files=files, data=data) | |
| if response.status_code == 200: | |
| data = response.json() | |
| task_id = data['data']['taskId'] | |
| return task_id | |
| else: | |
| raise Exception(f'Error: {response.status_code}, {response.text}') | |
| def get_evaluation_result(self, task_id): | |
| params = {'taskId': task_id} | |
| while True: | |
| result_response = requests.get(self.result_url, headers=self.headers, params=params) | |
| #print(result_response) | |
| if result_response.status_code == 200: | |
| result_data = result_response.json() | |
| status = result_data['data']['status'] | |
| #print(result_data) | |
| if status in ['pending', 'processing']: | |
| print(f'Task is still {status}...') | |
| time.sleep(2) | |
| elif status == 'completed': | |
| title_musicid_dict = read_excel(self.excel_file_path) | |
| # 匹配并输出对应的Title | |
| title = match_title_from_json(title_musicid_dict, result_data['data']) | |
| return result_data['data'], title | |
| else: | |
| raise Exception(f'Task failed: {result_data["message"]}') | |
| else: | |
| raise Exception(f'Error: {result_response.status_code}, {result_response.text}') | |
| def process_evaluation(self, file_path, is_video, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0): | |
| task_id = self.submit_evaluation(file_path, is_video, music_id, hand, order_start, order_end, repeat_type) | |
| return self.get_evaluation_result(task_id) | |