#!/usr/bin/env python3 """ AutoGrantED Command Line Interface Comprehensive management commands for the AutoGrantED system """ import click import os import sys from datetime import datetime, timedelta from flask import current_app from flask.cli import with_appcontext from werkzeug.security import generate_password_hash # Add the app directory to the path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app import create_app, db from app.models import User, GrantOpportunity, OpportunityScore, GrantApplication from app.scrapers.sam_gov_scraper import SAMGovScraper from app.analysis.scoring_engine import GrantScoringEngine from app.generators.proposal_generator import ProposalGenerator app = create_app() @app.cli.command() @click.option('--username', prompt=True, help='Admin username') @click.option('--email', prompt=True, help='Admin email') @click.option('--password', prompt=True, hide_input=True, confirmation_prompt=True, help='Admin password') @click.option('--company', prompt=True, help='Company name') @click.option('--state', prompt=True, help='Company state (2-letter code)') def create_admin(username, email, password, company, state): """Create an admin user.""" try: # Check if user already exists existing_user = User.query.filter_by(username=username).first() if existing_user: click.echo(f"โŒ User '{username}' already exists!") return # Create new admin user user = User( username=username, email=email, company_name=company, company_state=state.upper(), sam_registered=True, # Assume admin has registrations sba_registered=True ) user.set_password(password) db.session.add(user) db.session.commit() click.echo(f"โœ… Admin user '{username}' created successfully!") click.echo(f" Company: {company}") click.echo(f" State: {state.upper()}") click.echo(f" Email: {email}") except Exception as e: click.echo(f"โŒ Error creating admin user: {str(e)}") db.session.rollback() @app.cli.command() @click.option('--source', default='sam_gov', help='Scraping source (sam_gov, grants_gov, sbir_gov)') @click.option('--limit', default=50, help='Maximum opportunities to scrape') def scan_opportunities(source, limit): """Scan for new grant opportunities.""" click.echo(f"๐Ÿ” Scanning {source} for opportunities (limit: {limit})...") try: if source == 'sam_gov': scraper = SAMGovScraper() opportunities = scraper.scrape_opportunities() new_count = 0 updated_count = 0 for opp_data in opportunities[:limit]: existing = GrantOpportunity.query.filter_by( solicitation_number=opp_data.get('solicitation_number') ).first() if existing: # Update existing opportunity for key, value in opp_data.items(): if hasattr(existing, key): setattr(existing, key, value) existing.last_updated = datetime.utcnow() updated_count += 1 else: # Create new opportunity opportunity = GrantOpportunity(**opp_data) db.session.add(opportunity) new_count += 1 db.session.commit() click.echo(f"โœ… Scanning completed!") click.echo(f" New opportunities: {new_count}") click.echo(f" Updated opportunities: {updated_count}") else: click.echo(f"โŒ Scraper '{source}' not implemented yet") except Exception as e: click.echo(f"โŒ Error during scanning: {str(e)}") db.session.rollback() @app.cli.command() @click.option('--user-id', type=int, help='Specific user ID to analyze for') @click.option('--min-score', default=50, help='Minimum score threshold') def analyze_opportunities(user_id, min_score): """Analyze opportunities and generate scores.""" click.echo(f"๐Ÿง  Analyzing opportunities (min score: {min_score})...") try: # Get users to analyze for if user_id: users = [User.query.get(user_id)] if not users[0]: click.echo(f"โŒ User with ID {user_id} not found") return else: users = User.query.all() if not users: click.echo("โŒ No users found") return # Get unprocessed opportunities opportunities = GrantOpportunity.query.filter_by(processed=False).all() if not opportunities: click.echo("โ„น๏ธ No unprocessed opportunities found") return engine = GrantScoringEngine() total_scores = 0 high_scores = 0 for user in users: click.echo(f" Analyzing for user: {user.username}") for opportunity in opportunities: # Check if score already exists existing_score = OpportunityScore.query.filter_by( user_id=user.id, opportunity_id=opportunity.id ).first() if existing_score: continue # Generate score result = engine.score_opportunity(user, opportunity) # Save score score = OpportunityScore( user_id=user.id, opportunity_id=opportunity.id, agency_profile_score=result['agency_profile_score'], market_need_score=result['market_need_score'], competitive_landscape_score=result['competitive_landscape_score'], total_score=result['total_score'], agency_profile_reason=result['agency_profile_reason'], market_need_reason=result['market_need_reason'], competitive_landscape_reason=result['competitive_landscape_reason'], ai_recommendation=result.get('ai_recommendation', ''), confidence_level=result.get('confidence_level', 0.0), model_version='gpt-4' ) db.session.add(score) total_scores += 1 if result['total_score'] >= min_score: high_scores += 1 # Mark opportunities as processed for opportunity in opportunities: opportunity.processed = True db.session.commit() click.echo(f"โœ… Analysis completed!") click.echo(f" Total scores generated: {total_scores}") click.echo(f" High-scoring opportunities: {high_scores}") except Exception as e: click.echo(f"โŒ Error during analysis: {str(e)}") db.session.rollback() @app.cli.command() @click.option('--opportunity-id', type=int, required=True, help='Opportunity ID to generate proposal for') @click.option('--user-id', type=int, required=True, help='User ID to generate proposal for') def generate_proposal(opportunity_id, user_id): """Generate a proposal for a specific opportunity.""" click.echo(f"๐Ÿ“ Generating proposal for opportunity {opportunity_id}, user {user_id}...") try: # Get user and opportunity user = User.query.get(user_id) opportunity = GrantOpportunity.query.get(opportunity_id) if not user: click.echo(f"โŒ User with ID {user_id} not found") return if not opportunity: click.echo(f"โŒ Opportunity with ID {opportunity_id} not found") return # Check if application already exists existing_app = GrantApplication.query.filter_by( user_id=user_id, opportunity_id=opportunity_id ).first() if existing_app: click.echo(f"โš ๏ธ Application already exists (ID: {existing_app.id})") return # Generate proposal generator = ProposalGenerator() proposal_data = generator.generate_complete_proposal(user, opportunity) # Create application record application = GrantApplication( user_id=user_id, opportunity_id=opportunity_id, project_title=proposal_data['project_title'], project_summary=proposal_data['project_summary'], technical_narrative=proposal_data['technical_narrative'], budget_narrative=proposal_data['budget_narrative'], commercialization_plan=proposal_data.get('commercialization_plan', ''), total_budget=proposal_data['budget']['total'], personnel_costs=proposal_data['budget']['personnel'], equipment_costs=proposal_data['budget']['equipment'], travel_costs=proposal_data['budget']['travel'], other_costs=proposal_data['budget']['other'], indirect_costs=proposal_data['budget']['indirect'] ) db.session.add(application) db.session.commit() click.echo(f"โœ… Proposal generated successfully!") click.echo(f" Application ID: {application.id}") click.echo(f" Project Title: {proposal_data['project_title']}") click.echo(f" Total Budget: ${proposal_data['budget']['total']:,}") except Exception as e: click.echo(f"โŒ Error generating proposal: {str(e)}") db.session.rollback() @app.cli.command() def system_status(): """Display system status and statistics.""" click.echo("๐Ÿ“Š AutoGrantED System Status") click.echo("=" * 50) try: # Database statistics user_count = User.query.count() opportunity_count = GrantOpportunity.query.count() active_opportunities = GrantOpportunity.query.filter( GrantOpportunity.application_due_date > datetime.utcnow() ).count() application_count = GrantApplication.query.count() score_count = OpportunityScore.query.count() click.echo(f"๐Ÿ‘ฅ Users: {user_count}") click.echo(f"๐ŸŽฏ Total Opportunities: {opportunity_count}") click.echo(f"โฐ Active Opportunities: {active_opportunities}") click.echo(f"๐Ÿ“„ Applications: {application_count}") click.echo(f"๐Ÿง  Opportunity Scores: {score_count}") # Recent activity recent_opportunities = GrantOpportunity.query.filter( GrantOpportunity.last_updated > datetime.utcnow() - timedelta(days=7) ).count() recent_applications = GrantApplication.query.filter( GrantApplication.created_at > datetime.utcnow() - timedelta(days=7) ).count() click.echo(f"\n๐Ÿ“ˆ Recent Activity (7 days):") click.echo(f" New/Updated Opportunities: {recent_opportunities}") click.echo(f" New Applications: {recent_applications}") # Top scoring opportunities top_scores = db.session.query(OpportunityScore, GrantOpportunity).join( GrantOpportunity ).order_by(OpportunityScore.total_score.desc()).limit(5).all() if top_scores: click.echo(f"\n๐Ÿ† Top Scoring Opportunities:") for score, opp in top_scores: click.echo(f" {score.total_score:3d} - {opp.title[:60]}...") # System health checks click.echo(f"\n๐Ÿ”ง System Health:") # Check database connection try: db.session.execute('SELECT 1') click.echo(" โœ… Database: Connected") except: click.echo(" โŒ Database: Connection failed") # Check for required environment variables required_vars = ['OPENAI_API_KEY', 'SECRET_KEY'] for var in required_vars: if os.environ.get(var): click.echo(f" โœ… {var}: Set") else: click.echo(f" โš ๏ธ {var}: Not set") except Exception as e: click.echo(f"โŒ Error getting system status: {str(e)}") @app.cli.command() @click.option('--days', default=30, help='Number of days to keep') def cleanup_old_data(days): """Clean up old data from the database.""" click.echo(f"๐Ÿงน Cleaning up data older than {days} days...") try: cutoff_date = datetime.utcnow() - timedelta(days=days) # Clean up old opportunities that are no longer active old_opportunities = GrantOpportunity.query.filter( GrantOpportunity.application_due_date < cutoff_date ).all() deleted_count = 0 for opp in old_opportunities: # Only delete if no applications exist if not opp.applications.count(): db.session.delete(opp) deleted_count += 1 db.session.commit() click.echo(f"โœ… Cleanup completed!") click.echo(f" Deleted {deleted_count} old opportunities") except Exception as e: click.echo(f"โŒ Error during cleanup: {str(e)}") db.session.rollback() @app.cli.command() def init_db(): """Initialize the database with tables.""" click.echo("๐Ÿ—„๏ธ Initializing database...") try: db.create_all() click.echo("โœ… Database initialized successfully!") except Exception as e: click.echo(f"โŒ Error initializing database: {str(e)}") @app.cli.command() @click.option('--test-gemini', is_flag=True, help='Test Gemini CLI connection') @click.option('--test-scraping', is_flag=True, help='Test web scraping capabilities') def test_system(test_gemini, test_scraping): """Test system components.""" click.echo("๐Ÿงช Testing system components...") if test_gemini: click.echo(" Testing Gemini CLI...") try: # Since we're using the CLI, we can just check for the executable import shutil if shutil.which(current_app.config['GEMINI_CLI_PATH']): click.echo(" โœ… Gemini CLI: Executable found") else: click.echo(" โŒ Gemini CLI: Executable not found") except Exception as e: click.echo(f" โŒ Gemini CLI: {str(e)}") if test_scraping: click.echo(" Testing web scraping...") try: from selenium import webdriver from selenium.webdriver.chrome.options import Options options = Options() options.add_argument('--headless') options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') driver = webdriver.Chrome(options=options) driver.get('https://www.google.com') driver.quit() click.echo(" โœ… Web scraping: Working") except Exception as e: click.echo(f" โŒ Web scraping: {str(e)}") if __name__ == '__main__': with app.app_context(): app.cli()