Spaces:
Runtime error
Runtime error
| import textwrap | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| import asyncio | |
| from api.external_services import InitiazlizeGithubService, InitiazlizeActiveloopService | |
| # Load environment variables | |
| load_dotenv() | |
| github_service = InitiazlizeGithubService() | |
| activeloop_service = InitiazlizeActiveloopService() | |
| async def process_file(owner, repo, file_type): | |
| docs = github_service.load_repo_data(owner, repo, file_type) | |
| activeloop_service.upload_to_activeloop(docs) | |
| return docs | |
| async def main(): | |
| repo_url = "https://github.com/facebookresearch/segment-anything" | |
| owner, repo = github_service.parse_github_url(repo_url) | |
| file_types = [".py", ".js", ".ts", ".md", "ipynb"] | |
| tasks = [] | |
| for file_type in file_types: | |
| task = process_file(owner, repo, file_type) | |
| tasks.append(task) | |
| results = await asyncio.gather(*tasks) | |
| # docs = github_service.load_repo_data(owner, repo, file_type) | |
| # activeloop_service.upload_to_activeloop(docs) | |
| print(results) | |
| # activeloop_service.upload_to_activeloop(docs1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |
| # # upload | |
| # owner, repo = github_service.parse_github_url(repo_url) | |
| # docs = github_service.load_repo_data(owner, repo) | |
| # activeloop_service.upload_to_activeloop(docs) | |
| # # retrieve | |
| # intro_question = "what this code is talking about?" | |
| # answer = activeloop_service.query_engine.query(intro_question) | |
| # print(answer.__dict__) | |