jebin2's picture
fix: Remediate CodeQL security vulnerabilities
d7c2fbc
from fastapi import FastAPI, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from typing import Optional, Dict, Any
import os
import subprocess
import dotenv
import json
# Load environment variables
dotenv.load_dotenv(os.path.join(os.path.dirname(__file__), '..', '.env'))
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from src.config import get_config_value
app = FastAPI(title="GitHub Workflow Runner")
# Mount static files
app.mount("/static", StaticFiles(directory=os.path.dirname(__file__)), name="static")
def validate_safe_arg(value: str, param_name: str, allow_hyphen: bool = False):
"""
Validate that a string argument is safe to pass to a subprocess.
Rejects values starting with '-' to prevent flag injection.
"""
if not value:
return
# Prevent argument injection
if not allow_hyphen and value.startswith('-'):
raise HTTPException(status_code=400, detail=f"Invalid {param_name}: Cannot start with '-'")
# Basic sanity check for dangerous characters often used in shell injection
# Even though we use shell=False, it's good practice to be strict
dangerous_chars = [';', '&', '|', '`', '$', '(', ')', '<', '>', '\\']
if any(char in value for char in dangerous_chars):
raise HTTPException(status_code=400, detail=f"Invalid {param_name}: Contains illegal characters")
def get_github_token_from_git_credentials():
"""Parse ~/.git-credentials to extract GitHub token"""
git_credentials_path = os.path.expanduser('~/.git-credentials')
if not os.path.exists(git_credentials_path):
return None
try:
with open(git_credentials_path, 'r') as f:
for line in f:
line = line.strip()
# Format: https://username:token@github.com
if 'github.com' in line and ':' in line and '@' in line:
try:
creds_part = line.split('@')[0]
if '://' in creds_part:
creds_part = creds_part.split('://')[1]
parts = creds_part.split(':')
if len(parts) >= 2:
token = parts[1]
if token.startswith('ghp_') or token.startswith('gho_') or token.startswith('github_pat_'):
return token
except Exception:
continue
except Exception as e:
print(f"Error reading git-credentials: {e}")
return None
# Configuration - Priority: env var > git-credentials
GITHUB_TOKEN = get_config_value("GITHUB_TOKEN") or get_config_value("GITHUB_PAT") or get_github_token_from_git_credentials()
REPO_OWNER = "ElvoroLtd"
REPO_NAME = "Elvoro"
WORKFLOW_FILE = "process_csv.yml"
@app.get("/health")
async def health_check():
"""Health check endpoint for container orchestration"""
return {"status": "healthy"}
@app.get("/")
async def index():
return FileResponse(os.path.join(os.path.dirname(__file__), 'index.html'))
@app.get("/api/auth/status")
async def auth_status():
"""Check if we have a valid token by running `gh auth status` or similar"""
token = GITHUB_TOKEN
if not token:
return {"authenticated": False, "message": "Token not found in .env"}
try:
env = os.environ.copy()
env['GITHUB_TOKEN'] = token
cmd = ['gh', 'api', 'user']
result = subprocess.run(cmd, capture_output=True, text=True, env=env)
if result.returncode == 0:
user_data = json.loads(result.stdout)
return {"authenticated": True, "user": user_data.get('login')}
else:
return {"authenticated": False, "message": "Invalid token"}
except Exception as e:
return {"authenticated": False, "message": str(e)}
@app.get("/api/env-vars")
async def get_env_vars(workflow: str = Query(default="process_csv.yml")):
"""
Parse env file based on selected workflow.
Strategy:
1. Load actual values from .env (Source of Truth)
2. Load keys from template file (publisher.env / video_generate.env)
3. Return keys from template populated with values from .env
"""
# 1. Load Source of Truth (.env)
root_dir = os.path.join(os.path.dirname(__file__), '..')
dotenv_path = os.path.join(root_dir, '.env')
actual_values = dotenv.dotenv_values(dotenv_path)
# 2. Determine Template File
if workflow == 'publisher.yml':
template_filename = 'publisher.env'
else:
template_filename = 'video_generate.env'
template_path = os.path.join(root_dir, template_filename)
vars_dict = {}
# 3. Specific keys from template, populate with actual values
if os.path.exists(template_path):
try:
with open(template_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key = line.split('=')[0].strip()
else:
key = line.strip()
vars_dict[key] = actual_values.get(key, '')
except Exception as e:
print(f"Error reading template {template_filename}: {e}")
else:
vars_dict = dict(actual_values)
return {"vars": vars_dict}
class TriggerRequest(BaseModel):
token: Optional[str] = None
inputs: Optional[Dict[str, Any]] = {}
ref: Optional[str] = "feature/video-revamp"
workflow: Optional[str] = "process_csv.yml"
@app.post("/api/trigger")
async def trigger_workflow(data: TriggerRequest):
token = data.token or GITHUB_TOKEN
if not token:
raise HTTPException(status_code=401, detail="No GitHub token provided")
inputs = data.inputs or {}
ref = data.ref
workflow_file = data.workflow
# Validate inputs to prevent argument injection
validate_safe_arg(ref, "ref")
validate_safe_arg(workflow_file, "workflow")
cmd = [
'gh', 'workflow', 'run', workflow_file,
'--repo', f"{REPO_OWNER}/{REPO_NAME}",
'--ref', ref
]
for key, value in inputs.items():
if value:
# Validate input key and value to prevent command injection
validate_safe_arg(key, "input key")
validate_safe_arg(str(value), "input value")
cmd.extend(['-f', f"{key}={value}"])
try:
print(f"Executing: {' '.join(cmd)}")
env = os.environ.copy()
env['GITHUB_TOKEN'] = token
result = subprocess.run(cmd, capture_output=True, text=True, env=env)
if result.returncode == 0:
return {"success": True, "message": "Workflow triggered successfully"}
else:
print(f"Error triggering workflow: {result.stderr}")
raise HTTPException(status_code=400, detail=f"Failed: {result.stderr}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/runs")
async def list_runs(
token: Optional[str] = Query(default=None),
workflow: str = Query(default="process_csv.yml")
):
"""List recent workflow runs using gh"""
token = token or GITHUB_TOKEN
if not token:
return {"runs": []}
# Validate inputs
validate_safe_arg(workflow, "workflow")
cmd = [
'gh', 'run', 'list',
'--workflow', workflow,
'--repo', f"{REPO_OWNER}/{REPO_NAME}",
'--limit', '5',
'--json', 'number,status,conclusion,createdAt,url,name'
]
try:
env = os.environ.copy()
env['GITHUB_TOKEN'] = token
result = subprocess.run(cmd, capture_output=True, text=True, env=env)
if result.returncode == 0:
runs = json.loads(result.stdout)
normalized_runs = []
for run in runs:
normalized_runs.append({
'run_number': run.get('number'),
'name': run.get('name'),
'status': run.get('status'),
'conclusion': run.get('conclusion'),
'created_at': run.get('createdAt'),
'html_url': run.get('url')
})
return {"workflow_runs": normalized_runs}
else:
return {"runs": [], "error": result.stderr}
except Exception as e:
return {"runs": [], "error": str(e)}
if __name__ == '__main__':
import uvicorn
print("GitHub Workflow Runner (FastAPI)")
print("Open your browser to: http://localhost:5002")
uvicorn.run(app, host="127.0.0.1", port=5002)