Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from application_layer_agent import ApplicationLayerAgent | |
| from datetime import datetime, timedelta | |
| from databaseengine import DatabaseEngine | |
| import uuid | |
| app = FastAPI() | |
| dbe=DatabaseEngine() | |
| # CORS configuration | |
| origins = ["*"] # Allow all origins; specify domains in production | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, # Allows all origins | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allows all HTTP methods | |
| allow_headers=["*"], # Allows all headers | |
| ) | |
| class PlannerInstruction(BaseModel): | |
| uid:str | |
| pid:str | |
| target:str | |
| high_level_bio_query:str | |
| class Registration(BaseModel): | |
| name:str | |
| operation:str | |
| username:str | |
| password:str | |
| def Home(): | |
| return """ | |
| newMATTER | |
| Version 1.0 | |
| """ | |
| ''' | |
| @app.post("/registerorg") | |
| def Register(request:Registration): | |
| try: | |
| data={ | |
| "name":request.name, | |
| "operation":request.operation, | |
| "username":request.username, | |
| "password":request.password, | |
| "projects":[] | |
| } | |
| dbe.RegisterUser(data) | |
| return {"status":True} | |
| except Exception as e: | |
| return {"status":False} | |
| ''' | |
| def BCL_PLAN(request:PlannerInstruction): | |
| try: | |
| bio_query=request.high_level_bio_query | |
| user_id=request.uid | |
| project_id=request.pid | |
| target=request.target | |
| uid=f"origin_ai_bio_{user_id}_{project_id}" | |
| response=ApplicationLayerAgent(bio_query,uid,user_id,project_id,target) | |
| return { "message": response ,"status":"ok" } | |
| except Exception as e: | |
| return {"message":str(e),"status":str(e)} | |
| def GetUserOps(user_id:str): | |
| try: | |
| userops=dbe.Find_User(uid=user_id) | |
| return {"ops":userops} | |
| except Exception as e: | |
| return {"ops":str(e)} | |
| def GetOperationStatus(user_id:str,project_id:str): | |
| bcl_id=f"origin_ai_bio_{user_id}_{project_id}" | |
| status=dbe.Fetch_Status(bcl_id) | |
| return status | |
| def GetIndividualExperiments(user_id:str,project_id:str): | |
| try: | |
| individual_experiments=dbe.Fetch_IE(f"origin_ai_bio_{user_id}_{project_id}") | |
| return {"exp":str(individual_experiments)} | |
| except Exception as e: | |
| return {"exp":str(e)} | |
| def FetchProject_based_on_Applayer(user_id): | |
| try: | |
| projects=dbe.Fetch_Projects(user_id) | |
| if projects is None : | |
| return {"projects":None} | |
| else: | |
| return {"projects":projects} | |
| except Exception as e: | |
| return { "projects":str(e) } | |