autoGranted / cli.py
Ig0tU
feat: initial agent deployment to Hugging Face Spaces
2f8f181
#!/usr/bin/env python3
"""
AutoGrantED Command Line Interface
Comprehensive management commands for the AutoGrantED system
"""
import click
import os
import sys
from datetime import datetime, timedelta
from flask import current_app
from flask.cli import with_appcontext
from werkzeug.security import generate_password_hash
# Add the app directory to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app import create_app, db
from app.models import User, GrantOpportunity, OpportunityScore, GrantApplication
from app.scrapers.sam_gov_scraper import SAMGovScraper
from app.analysis.scoring_engine import GrantScoringEngine
from app.generators.proposal_generator import ProposalGenerator
app = create_app()
@app.cli.command()
@click.option('--username', prompt=True, help='Admin username')
@click.option('--email', prompt=True, help='Admin email')
@click.option('--password', prompt=True, hide_input=True, confirmation_prompt=True, help='Admin password')
@click.option('--company', prompt=True, help='Company name')
@click.option('--state', prompt=True, help='Company state (2-letter code)')
def create_admin(username, email, password, company, state):
"""Create an admin user."""
try:
# Check if user already exists
existing_user = User.query.filter_by(username=username).first()
if existing_user:
click.echo(f"❌ User '{username}' already exists!")
return
# Create new admin user
user = User(
username=username,
email=email,
company_name=company,
company_state=state.upper(),
sam_registered=True, # Assume admin has registrations
sba_registered=True
)
user.set_password(password)
db.session.add(user)
db.session.commit()
click.echo(f"βœ… Admin user '{username}' created successfully!")
click.echo(f" Company: {company}")
click.echo(f" State: {state.upper()}")
click.echo(f" Email: {email}")
except Exception as e:
click.echo(f"❌ Error creating admin user: {str(e)}")
db.session.rollback()
@app.cli.command()
@click.option('--source', default='sam_gov', help='Scraping source (sam_gov, grants_gov, sbir_gov)')
@click.option('--limit', default=50, help='Maximum opportunities to scrape')
def scan_opportunities(source, limit):
"""Scan for new grant opportunities."""
click.echo(f"πŸ” Scanning {source} for opportunities (limit: {limit})...")
try:
if source == 'sam_gov':
scraper = SAMGovScraper()
opportunities = scraper.scrape_opportunities()
new_count = 0
updated_count = 0
for opp_data in opportunities[:limit]:
existing = GrantOpportunity.query.filter_by(
solicitation_number=opp_data.get('solicitation_number')
).first()
if existing:
# Update existing opportunity
for key, value in opp_data.items():
if hasattr(existing, key):
setattr(existing, key, value)
existing.last_updated = datetime.utcnow()
updated_count += 1
else:
# Create new opportunity
opportunity = GrantOpportunity(**opp_data)
db.session.add(opportunity)
new_count += 1
db.session.commit()
click.echo(f"βœ… Scanning completed!")
click.echo(f" New opportunities: {new_count}")
click.echo(f" Updated opportunities: {updated_count}")
else:
click.echo(f"❌ Scraper '{source}' not implemented yet")
except Exception as e:
click.echo(f"❌ Error during scanning: {str(e)}")
db.session.rollback()
@app.cli.command()
@click.option('--user-id', type=int, help='Specific user ID to analyze for')
@click.option('--min-score', default=50, help='Minimum score threshold')
def analyze_opportunities(user_id, min_score):
"""Analyze opportunities and generate scores."""
click.echo(f"🧠 Analyzing opportunities (min score: {min_score})...")
try:
# Get users to analyze for
if user_id:
users = [User.query.get(user_id)]
if not users[0]:
click.echo(f"❌ User with ID {user_id} not found")
return
else:
users = User.query.all()
if not users:
click.echo("❌ No users found")
return
# Get unprocessed opportunities
opportunities = GrantOpportunity.query.filter_by(processed=False).all()
if not opportunities:
click.echo("ℹ️ No unprocessed opportunities found")
return
engine = GrantScoringEngine()
total_scores = 0
high_scores = 0
for user in users:
click.echo(f" Analyzing for user: {user.username}")
for opportunity in opportunities:
# Check if score already exists
existing_score = OpportunityScore.query.filter_by(
user_id=user.id,
opportunity_id=opportunity.id
).first()
if existing_score:
continue
# Generate score
result = engine.score_opportunity(user, opportunity)
# Save score
score = OpportunityScore(
user_id=user.id,
opportunity_id=opportunity.id,
agency_profile_score=result['agency_profile_score'],
market_need_score=result['market_need_score'],
competitive_landscape_score=result['competitive_landscape_score'],
total_score=result['total_score'],
agency_profile_reason=result['agency_profile_reason'],
market_need_reason=result['market_need_reason'],
competitive_landscape_reason=result['competitive_landscape_reason'],
ai_recommendation=result.get('ai_recommendation', ''),
confidence_level=result.get('confidence_level', 0.0),
model_version='gpt-4'
)
db.session.add(score)
total_scores += 1
if result['total_score'] >= min_score:
high_scores += 1
# Mark opportunities as processed
for opportunity in opportunities:
opportunity.processed = True
db.session.commit()
click.echo(f"βœ… Analysis completed!")
click.echo(f" Total scores generated: {total_scores}")
click.echo(f" High-scoring opportunities: {high_scores}")
except Exception as e:
click.echo(f"❌ Error during analysis: {str(e)}")
db.session.rollback()
@app.cli.command()
@click.option('--opportunity-id', type=int, required=True, help='Opportunity ID to generate proposal for')
@click.option('--user-id', type=int, required=True, help='User ID to generate proposal for')
def generate_proposal(opportunity_id, user_id):
"""Generate a proposal for a specific opportunity."""
click.echo(f"πŸ“ Generating proposal for opportunity {opportunity_id}, user {user_id}...")
try:
# Get user and opportunity
user = User.query.get(user_id)
opportunity = GrantOpportunity.query.get(opportunity_id)
if not user:
click.echo(f"❌ User with ID {user_id} not found")
return
if not opportunity:
click.echo(f"❌ Opportunity with ID {opportunity_id} not found")
return
# Check if application already exists
existing_app = GrantApplication.query.filter_by(
user_id=user_id,
opportunity_id=opportunity_id
).first()
if existing_app:
click.echo(f"⚠️ Application already exists (ID: {existing_app.id})")
return
# Generate proposal
generator = ProposalGenerator()
proposal_data = generator.generate_complete_proposal(user, opportunity)
# Create application record
application = GrantApplication(
user_id=user_id,
opportunity_id=opportunity_id,
project_title=proposal_data['project_title'],
project_summary=proposal_data['project_summary'],
technical_narrative=proposal_data['technical_narrative'],
budget_narrative=proposal_data['budget_narrative'],
commercialization_plan=proposal_data.get('commercialization_plan', ''),
total_budget=proposal_data['budget']['total'],
personnel_costs=proposal_data['budget']['personnel'],
equipment_costs=proposal_data['budget']['equipment'],
travel_costs=proposal_data['budget']['travel'],
other_costs=proposal_data['budget']['other'],
indirect_costs=proposal_data['budget']['indirect']
)
db.session.add(application)
db.session.commit()
click.echo(f"βœ… Proposal generated successfully!")
click.echo(f" Application ID: {application.id}")
click.echo(f" Project Title: {proposal_data['project_title']}")
click.echo(f" Total Budget: ${proposal_data['budget']['total']:,}")
except Exception as e:
click.echo(f"❌ Error generating proposal: {str(e)}")
db.session.rollback()
@app.cli.command()
def system_status():
"""Display system status and statistics."""
click.echo("πŸ“Š AutoGrantED System Status")
click.echo("=" * 50)
try:
# Database statistics
user_count = User.query.count()
opportunity_count = GrantOpportunity.query.count()
active_opportunities = GrantOpportunity.query.filter(
GrantOpportunity.application_due_date > datetime.utcnow()
).count()
application_count = GrantApplication.query.count()
score_count = OpportunityScore.query.count()
click.echo(f"πŸ‘₯ Users: {user_count}")
click.echo(f"🎯 Total Opportunities: {opportunity_count}")
click.echo(f"⏰ Active Opportunities: {active_opportunities}")
click.echo(f"πŸ“„ Applications: {application_count}")
click.echo(f"🧠 Opportunity Scores: {score_count}")
# Recent activity
recent_opportunities = GrantOpportunity.query.filter(
GrantOpportunity.last_updated > datetime.utcnow() - timedelta(days=7)
).count()
recent_applications = GrantApplication.query.filter(
GrantApplication.created_at > datetime.utcnow() - timedelta(days=7)
).count()
click.echo(f"\nπŸ“ˆ Recent Activity (7 days):")
click.echo(f" New/Updated Opportunities: {recent_opportunities}")
click.echo(f" New Applications: {recent_applications}")
# Top scoring opportunities
top_scores = db.session.query(OpportunityScore, GrantOpportunity).join(
GrantOpportunity
).order_by(OpportunityScore.total_score.desc()).limit(5).all()
if top_scores:
click.echo(f"\nπŸ† Top Scoring Opportunities:")
for score, opp in top_scores:
click.echo(f" {score.total_score:3d} - {opp.title[:60]}...")
# System health checks
click.echo(f"\nπŸ”§ System Health:")
# Check database connection
try:
db.session.execute('SELECT 1')
click.echo(" βœ… Database: Connected")
except:
click.echo(" ❌ Database: Connection failed")
# Check for required environment variables
required_vars = ['OPENAI_API_KEY', 'SECRET_KEY']
for var in required_vars:
if os.environ.get(var):
click.echo(f" βœ… {var}: Set")
else:
click.echo(f" ⚠️ {var}: Not set")
except Exception as e:
click.echo(f"❌ Error getting system status: {str(e)}")
@app.cli.command()
@click.option('--days', default=30, help='Number of days to keep')
def cleanup_old_data(days):
"""Clean up old data from the database."""
click.echo(f"🧹 Cleaning up data older than {days} days...")
try:
cutoff_date = datetime.utcnow() - timedelta(days=days)
# Clean up old opportunities that are no longer active
old_opportunities = GrantOpportunity.query.filter(
GrantOpportunity.application_due_date < cutoff_date
).all()
deleted_count = 0
for opp in old_opportunities:
# Only delete if no applications exist
if not opp.applications.count():
db.session.delete(opp)
deleted_count += 1
db.session.commit()
click.echo(f"βœ… Cleanup completed!")
click.echo(f" Deleted {deleted_count} old opportunities")
except Exception as e:
click.echo(f"❌ Error during cleanup: {str(e)}")
db.session.rollback()
@app.cli.command()
def init_db():
"""Initialize the database with tables."""
click.echo("πŸ—„οΈ Initializing database...")
try:
db.create_all()
click.echo("βœ… Database initialized successfully!")
except Exception as e:
click.echo(f"❌ Error initializing database: {str(e)}")
@app.cli.command()
@click.option('--test-gemini', is_flag=True, help='Test Gemini CLI connection')
@click.option('--test-scraping', is_flag=True, help='Test web scraping capabilities')
def test_system(test_gemini, test_scraping):
"""Test system components."""
click.echo("πŸ§ͺ Testing system components...")
if test_gemini:
click.echo(" Testing Gemini CLI...")
try:
# Since we're using the CLI, we can just check for the executable
import shutil
if shutil.which(current_app.config['GEMINI_CLI_PATH']):
click.echo(" βœ… Gemini CLI: Executable found")
else:
click.echo(" ❌ Gemini CLI: Executable not found")
except Exception as e:
click.echo(f" ❌ Gemini CLI: {str(e)}")
if test_scraping:
click.echo(" Testing web scraping...")
try:
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
driver.get('https://www.google.com')
driver.quit()
click.echo(" βœ… Web scraping: Working")
except Exception as e:
click.echo(f" ❌ Web scraping: {str(e)}")
if __name__ == '__main__':
with app.app_context():
app.cli()