Spaces:
Runtime error
Runtime error
| import pytest | |
| import os | |
| import shutil | |
| import os | |
| import sys | |
| # Add the parent directory to sys.path to find the src module | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from src.video_processing_tool import VideoProcessingTool | |
| # --- Test Configuration --- | |
| # URLs for testing different functionalities | |
| # Ensure these videos are publicly accessible and have expected features (transcripts, specific objects) | |
| # Using videos from the common_questions.json for relevance | |
| VIDEO_URL_TRANSCRIPT_DIALOGUE = "https://www.youtube.com/watch?v=1htKBjuUWec" # Stargate SG-1 "Isn't that hot?" | |
| VIDEO_URL_OBJECT_COUNT = "https://www.youtube.com/watch?v=L1vXCYZAYYM" # Birds video | |
| VIDEO_URL_NO_TRANSCRIPT = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" # Rick Astley (often no official transcript) | |
| VIDEO_URL_SHORT_GENERAL = "https://www.youtube.com/watch?v=jNQXAC9IVRw" # Short Creative Commons video (Big Buck Bunny) | |
| # --- Fixtures --- | |
| def model_files(): | |
| """Creates dummy model files for testing CV functionality if real ones aren't provided.""" | |
| # Using a sub-directory within the test directory for these files | |
| test_model_dir = os.path.join(os.path.dirname(__file__), "test_cv_models") | |
| os.makedirs(test_model_dir, exist_ok=True) | |
| cfg_path = os.path.join(test_model_dir, "dummy-yolov3-tiny.cfg") | |
| weights_path = os.path.join(test_model_dir, "dummy-yolov3-tiny.weights") | |
| names_path = os.path.join(test_model_dir, "dummy-coco.names") | |
| # Create minimal dummy files if they don't exist | |
| # These won't make OpenCV's DNN module load a functional model but allow testing the file handling logic. | |
| # For actual DNN model loading, valid model files are required. | |
| if not os.path.exists(cfg_path): open(cfg_path, 'w').write("[net]\nwidth=416\nheight=416") | |
| if not os.path.exists(weights_path): open(weights_path, 'wb').write(b'dummyweights') | |
| if not os.path.exists(names_path): open(names_path, 'w').write("bird\ncat\ndog\nperson\n") | |
| yield { | |
| "cfg": cfg_path, | |
| "weights": weights_path, | |
| "names": names_path, | |
| "dir": test_model_dir | |
| } | |
| # Cleanup: remove the dummy model directory after tests | |
| # shutil.rmtree(test_model_dir, ignore_errors=True) # Keep for inspection if tests fail | |
| def video_tool(model_files): | |
| """Initializes VideoProcessingTool with dummy model paths for testing.""" | |
| # Using a specific temp directory for test artifacts | |
| test_temp_dir_base = os.path.join(os.path.dirname(__file__), "test_temp_videos") | |
| os.makedirs(test_temp_dir_base, exist_ok=True) | |
| tool = VideoProcessingTool( | |
| model_cfg_path=model_files["cfg"], | |
| model_weights_path=model_files["weights"], | |
| class_names_path=model_files["names"], | |
| temp_dir_base=test_temp_dir_base | |
| ) | |
| yield tool | |
| tool.cleanup() # Ensure temp files for this tool instance are removed | |
| # Optional: Clean up the base test_temp_dir if it's empty or after all tests | |
| # if os.path.exists(test_temp_dir_base) and not os.listdir(test_temp_dir_base): | |
| # shutil.rmtree(test_temp_dir_base) | |
| # --- Test Cases --- | |
| def test_extract_video_id(video_tool): | |
| assert video_tool._extract_video_id("https://www.youtube.com/watch?v=1htKBjuUWec") == "1htKBjuUWec" | |
| assert video_tool._extract_video_id("https://youtu.be/1htKBjuUWec") == "1htKBjuUWec" | |
| assert video_tool._extract_video_id("https://www.youtube.com/embed/1htKBjuUWec") == "1htKBjuUWec" | |
| assert video_tool._extract_video_id("https://www.youtube.com/watch?v=1htKBjuUWec&t=10s") == "1htKBjuUWec" | |
| assert video_tool._extract_video_id("invalid_url") is None | |
| # Marks as integration test (requires network) | |
| def test_download_video(video_tool): | |
| result = video_tool.download_video(VIDEO_URL_SHORT_GENERAL, resolution="240p") | |
| assert result.get("success"), f"Download failed: {result.get('error')}" | |
| assert "file_path" in result | |
| assert os.path.exists(result["file_path"]) | |
| assert result["file_path"].endswith(".mp4") or result["file_path"].startswith(video_tool._extract_video_id(VIDEO_URL_SHORT_GENERAL)) | |
| def test_get_video_transcript_success(video_tool): | |
| result = video_tool.get_video_transcript(VIDEO_URL_TRANSCRIPT_DIALOGUE) | |
| assert result.get("success"), f"Transcript fetch failed: {result.get('error')}" | |
| assert "transcript" in result and len(result["transcript"]) > 0 | |
| assert "transcript_entries" in result and len(result["transcript_entries"]) > 0 | |
| # Making the check case-insensitive and more flexible | |
| assert "isn't that hot" in result["transcript"].lower() # Check for expected content (removed ?) | |
| def test_get_video_transcript_no_transcript(video_tool): | |
| # This video is unlikely to have official transcripts in many languages | |
| # However, YouTube might auto-generate. The API should handle it gracefully. | |
| result = video_tool.get_video_transcript(VIDEO_URL_NO_TRANSCRIPT, languages=['xx-YY']) # Non-existent language | |
| assert not result.get("success") | |
| assert "error" in result | |
| assert "No transcript found" in result["error"] or "Transcripts are disabled" in result["error"] | |
| def test_find_dialogue_response_success(video_tool): | |
| transcript_data = video_tool.get_video_transcript(VIDEO_URL_TRANSCRIPT_DIALOGUE) | |
| assert transcript_data.get("success"), f"Transcript fetch failed for dialogue test: {transcript_data.get('error')}" | |
| result = video_tool.find_dialogue_response(transcript_data["transcript_entries"], "Isn't that hot?") | |
| assert result.get("success"), f"Dialogue search failed: {result.get('error')}" | |
| assert "response_text" in result | |
| # The expected response is "Extremely" but can vary slightly with transcript generation | |
| assert "Extremely".lower() in result["response_text"].lower() | |
| def test_find_dialogue_response_not_found(video_tool): | |
| transcript_data = video_tool.get_video_transcript(VIDEO_URL_TRANSCRIPT_DIALOGUE) | |
| assert transcript_data.get("success") | |
| result = video_tool.find_dialogue_response(transcript_data["transcript_entries"], "This phrase is not in the video") | |
| assert not result.get("success") | |
| assert "not found in transcript" in result.get("error", "") | |
| # Marks tests that rely on (even dummy) CV model setup | |
| def test_object_counting_interface(video_tool): | |
| """Tests the object counting call, expecting it to run with dummy models even if counts are zero.""" | |
| if not video_tool.object_detection_model: | |
| pytest.skip("CV model not loaded, skipping object count test.") | |
| download_result = video_tool.download_video(VIDEO_URL_OBJECT_COUNT, resolution="240p") # Use a short video | |
| assert download_result.get("success"), f"Video download failed for object counting: {download_result.get('error')}" | |
| video_path = download_result["file_path"] | |
| result = video_tool.count_objects_in_video(video_path, target_classes=["bird"], confidence_threshold=0.1, frame_skip=30) | |
| # With dummy models, we don't expect actual detections, but the function should complete. | |
| assert result.get("success"), f"Object counting failed: {result.get('error')}" | |
| assert "max_simultaneous_bird" in result # Even if it's 0 | |
| # If using real models and a video with birds, you would assert result["max_simultaneous_bird"] > 0 | |
| def test_process_video_object_count_flow(video_tool): | |
| if not video_tool.object_detection_model: | |
| pytest.skip("CV model not loaded, skipping process_video object count test.") | |
| query_params = { | |
| "target_classes": ["bird"], | |
| "resolution": "240p", | |
| "confidence_threshold": 0.1, | |
| "frame_skip": 30 # Process fewer frames for faster test | |
| } | |
| result = video_tool.process_video(VIDEO_URL_OBJECT_COUNT, "object_count", query_params=query_params) | |
| assert result.get("success"), f"process_video for object_count failed: {result.get('error')}" | |
| assert "max_simultaneous_bird" in result | |
| def test_process_video_dialogue_flow(video_tool): | |
| query_params = {"query_phrase": "Isn't that hot?"} | |
| result = video_tool.process_video(VIDEO_URL_TRANSCRIPT_DIALOGUE, "dialogue_response", query_params=query_params) | |
| assert result.get("success"), f"process_video for dialogue_response failed: {result.get('error')}" | |
| assert "extremely" in result.get("response_text", "").lower() | |
| def test_process_video_transcript_flow(video_tool): | |
| result = video_tool.process_video(VIDEO_URL_TRANSCRIPT_DIALOGUE, "transcript") | |
| assert result.get("success"), f"process_video for transcript failed: {result.get('error')}" | |
| assert "transcript" in result and len(result["transcript"]) > 0 | |
| def test_cleanup_removes_temp_dir(model_files): # Test cleanup more directly | |
| test_temp_dir_base = os.path.join(os.path.dirname(__file__), "test_temp_cleanup") | |
| os.makedirs(test_temp_dir_base, exist_ok=True) | |
| tool = VideoProcessingTool( | |
| model_cfg_path=model_files["cfg"], | |
| model_weights_path=model_files["weights"], | |
| class_names_path=model_files["names"], | |
| temp_dir_base=test_temp_dir_base | |
| ) | |
| # Create a dummy file in its temp dir | |
| temp_file_in_tool_dir = os.path.join(tool.temp_dir, "dummy.txt") | |
| open(temp_file_in_tool_dir, 'w').write("test") | |
| assert os.path.exists(tool.temp_dir) | |
| assert os.path.exists(temp_file_in_tool_dir) | |
| tool_temp_dir_path = tool.temp_dir # Store path before cleanup | |
| tool.cleanup() | |
| assert not os.path.exists(tool_temp_dir_path), f"Temp directory {tool_temp_dir_path} was not removed." | |
| # shutil.rmtree(test_temp_dir_base, ignore_errors=True) # Clean up the base for this specific test | |
| # To run these tests: | |
| # 1. Ensure you have pytest installed (`pip install pytest`). | |
| # 2. Ensure required libraries for VideoProcessingTool are installed (yt_dlp, youtube_transcript_api, opencv-python). | |
| # 3. Navigate to the directory containing this test file and `src` directory. | |
| # 4. Run `pytest` or `python -m pytest` in your terminal. | |
| # 5. For tests requiring network (integration), ensure you have an internet connection. | |
| # 6. For CV dependent tests to be meaningful beyond interface checks, replace dummy model files with actual ones. | |