Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| AutoGrantED Command Line Interface | |
| Comprehensive management commands for the AutoGrantED system | |
| """ | |
| import click | |
| import os | |
| import sys | |
| from datetime import datetime, timedelta | |
| from flask import current_app | |
| from flask.cli import with_appcontext | |
| from werkzeug.security import generate_password_hash | |
| # Add the app directory to the path | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| from app import create_app, db | |
| from app.models import User, GrantOpportunity, OpportunityScore, GrantApplication | |
| from app.scrapers.sam_gov_scraper import SAMGovScraper | |
| from app.analysis.scoring_engine import GrantScoringEngine | |
| from app.generators.proposal_generator import ProposalGenerator | |
| app = create_app() | |
| def create_admin(username, email, password, company, state): | |
| """Create an admin user.""" | |
| try: | |
| # Check if user already exists | |
| existing_user = User.query.filter_by(username=username).first() | |
| if existing_user: | |
| click.echo(f"β User '{username}' already exists!") | |
| return | |
| # Create new admin user | |
| user = User( | |
| username=username, | |
| email=email, | |
| company_name=company, | |
| company_state=state.upper(), | |
| sam_registered=True, # Assume admin has registrations | |
| sba_registered=True | |
| ) | |
| user.set_password(password) | |
| db.session.add(user) | |
| db.session.commit() | |
| click.echo(f"β Admin user '{username}' created successfully!") | |
| click.echo(f" Company: {company}") | |
| click.echo(f" State: {state.upper()}") | |
| click.echo(f" Email: {email}") | |
| except Exception as e: | |
| click.echo(f"β Error creating admin user: {str(e)}") | |
| db.session.rollback() | |
| def scan_opportunities(source, limit): | |
| """Scan for new grant opportunities.""" | |
| click.echo(f"π Scanning {source} for opportunities (limit: {limit})...") | |
| try: | |
| if source == 'sam_gov': | |
| scraper = SAMGovScraper() | |
| opportunities = scraper.scrape_opportunities() | |
| new_count = 0 | |
| updated_count = 0 | |
| for opp_data in opportunities[:limit]: | |
| existing = GrantOpportunity.query.filter_by( | |
| solicitation_number=opp_data.get('solicitation_number') | |
| ).first() | |
| if existing: | |
| # Update existing opportunity | |
| for key, value in opp_data.items(): | |
| if hasattr(existing, key): | |
| setattr(existing, key, value) | |
| existing.last_updated = datetime.utcnow() | |
| updated_count += 1 | |
| else: | |
| # Create new opportunity | |
| opportunity = GrantOpportunity(**opp_data) | |
| db.session.add(opportunity) | |
| new_count += 1 | |
| db.session.commit() | |
| click.echo(f"β Scanning completed!") | |
| click.echo(f" New opportunities: {new_count}") | |
| click.echo(f" Updated opportunities: {updated_count}") | |
| else: | |
| click.echo(f"β Scraper '{source}' not implemented yet") | |
| except Exception as e: | |
| click.echo(f"β Error during scanning: {str(e)}") | |
| db.session.rollback() | |
| def analyze_opportunities(user_id, min_score): | |
| """Analyze opportunities and generate scores.""" | |
| click.echo(f"π§ Analyzing opportunities (min score: {min_score})...") | |
| try: | |
| # Get users to analyze for | |
| if user_id: | |
| users = [User.query.get(user_id)] | |
| if not users[0]: | |
| click.echo(f"β User with ID {user_id} not found") | |
| return | |
| else: | |
| users = User.query.all() | |
| if not users: | |
| click.echo("β No users found") | |
| return | |
| # Get unprocessed opportunities | |
| opportunities = GrantOpportunity.query.filter_by(processed=False).all() | |
| if not opportunities: | |
| click.echo("βΉοΈ No unprocessed opportunities found") | |
| return | |
| engine = GrantScoringEngine() | |
| total_scores = 0 | |
| high_scores = 0 | |
| for user in users: | |
| click.echo(f" Analyzing for user: {user.username}") | |
| for opportunity in opportunities: | |
| # Check if score already exists | |
| existing_score = OpportunityScore.query.filter_by( | |
| user_id=user.id, | |
| opportunity_id=opportunity.id | |
| ).first() | |
| if existing_score: | |
| continue | |
| # Generate score | |
| result = engine.score_opportunity(user, opportunity) | |
| # Save score | |
| score = OpportunityScore( | |
| user_id=user.id, | |
| opportunity_id=opportunity.id, | |
| agency_profile_score=result['agency_profile_score'], | |
| market_need_score=result['market_need_score'], | |
| competitive_landscape_score=result['competitive_landscape_score'], | |
| total_score=result['total_score'], | |
| agency_profile_reason=result['agency_profile_reason'], | |
| market_need_reason=result['market_need_reason'], | |
| competitive_landscape_reason=result['competitive_landscape_reason'], | |
| ai_recommendation=result.get('ai_recommendation', ''), | |
| confidence_level=result.get('confidence_level', 0.0), | |
| model_version='gpt-4' | |
| ) | |
| db.session.add(score) | |
| total_scores += 1 | |
| if result['total_score'] >= min_score: | |
| high_scores += 1 | |
| # Mark opportunities as processed | |
| for opportunity in opportunities: | |
| opportunity.processed = True | |
| db.session.commit() | |
| click.echo(f"β Analysis completed!") | |
| click.echo(f" Total scores generated: {total_scores}") | |
| click.echo(f" High-scoring opportunities: {high_scores}") | |
| except Exception as e: | |
| click.echo(f"β Error during analysis: {str(e)}") | |
| db.session.rollback() | |
| def generate_proposal(opportunity_id, user_id): | |
| """Generate a proposal for a specific opportunity.""" | |
| click.echo(f"π Generating proposal for opportunity {opportunity_id}, user {user_id}...") | |
| try: | |
| # Get user and opportunity | |
| user = User.query.get(user_id) | |
| opportunity = GrantOpportunity.query.get(opportunity_id) | |
| if not user: | |
| click.echo(f"β User with ID {user_id} not found") | |
| return | |
| if not opportunity: | |
| click.echo(f"β Opportunity with ID {opportunity_id} not found") | |
| return | |
| # Check if application already exists | |
| existing_app = GrantApplication.query.filter_by( | |
| user_id=user_id, | |
| opportunity_id=opportunity_id | |
| ).first() | |
| if existing_app: | |
| click.echo(f"β οΈ Application already exists (ID: {existing_app.id})") | |
| return | |
| # Generate proposal | |
| generator = ProposalGenerator() | |
| proposal_data = generator.generate_complete_proposal(user, opportunity) | |
| # Create application record | |
| application = GrantApplication( | |
| user_id=user_id, | |
| opportunity_id=opportunity_id, | |
| project_title=proposal_data['project_title'], | |
| project_summary=proposal_data['project_summary'], | |
| technical_narrative=proposal_data['technical_narrative'], | |
| budget_narrative=proposal_data['budget_narrative'], | |
| commercialization_plan=proposal_data.get('commercialization_plan', ''), | |
| total_budget=proposal_data['budget']['total'], | |
| personnel_costs=proposal_data['budget']['personnel'], | |
| equipment_costs=proposal_data['budget']['equipment'], | |
| travel_costs=proposal_data['budget']['travel'], | |
| other_costs=proposal_data['budget']['other'], | |
| indirect_costs=proposal_data['budget']['indirect'] | |
| ) | |
| db.session.add(application) | |
| db.session.commit() | |
| click.echo(f"β Proposal generated successfully!") | |
| click.echo(f" Application ID: {application.id}") | |
| click.echo(f" Project Title: {proposal_data['project_title']}") | |
| click.echo(f" Total Budget: ${proposal_data['budget']['total']:,}") | |
| except Exception as e: | |
| click.echo(f"β Error generating proposal: {str(e)}") | |
| db.session.rollback() | |
| def system_status(): | |
| """Display system status and statistics.""" | |
| click.echo("π AutoGrantED System Status") | |
| click.echo("=" * 50) | |
| try: | |
| # Database statistics | |
| user_count = User.query.count() | |
| opportunity_count = GrantOpportunity.query.count() | |
| active_opportunities = GrantOpportunity.query.filter( | |
| GrantOpportunity.application_due_date > datetime.utcnow() | |
| ).count() | |
| application_count = GrantApplication.query.count() | |
| score_count = OpportunityScore.query.count() | |
| click.echo(f"π₯ Users: {user_count}") | |
| click.echo(f"π― Total Opportunities: {opportunity_count}") | |
| click.echo(f"β° Active Opportunities: {active_opportunities}") | |
| click.echo(f"π Applications: {application_count}") | |
| click.echo(f"π§ Opportunity Scores: {score_count}") | |
| # Recent activity | |
| recent_opportunities = GrantOpportunity.query.filter( | |
| GrantOpportunity.last_updated > datetime.utcnow() - timedelta(days=7) | |
| ).count() | |
| recent_applications = GrantApplication.query.filter( | |
| GrantApplication.created_at > datetime.utcnow() - timedelta(days=7) | |
| ).count() | |
| click.echo(f"\nπ Recent Activity (7 days):") | |
| click.echo(f" New/Updated Opportunities: {recent_opportunities}") | |
| click.echo(f" New Applications: {recent_applications}") | |
| # Top scoring opportunities | |
| top_scores = db.session.query(OpportunityScore, GrantOpportunity).join( | |
| GrantOpportunity | |
| ).order_by(OpportunityScore.total_score.desc()).limit(5).all() | |
| if top_scores: | |
| click.echo(f"\nπ Top Scoring Opportunities:") | |
| for score, opp in top_scores: | |
| click.echo(f" {score.total_score:3d} - {opp.title[:60]}...") | |
| # System health checks | |
| click.echo(f"\nπ§ System Health:") | |
| # Check database connection | |
| try: | |
| db.session.execute('SELECT 1') | |
| click.echo(" β Database: Connected") | |
| except: | |
| click.echo(" β Database: Connection failed") | |
| # Check for required environment variables | |
| required_vars = ['OPENAI_API_KEY', 'SECRET_KEY'] | |
| for var in required_vars: | |
| if os.environ.get(var): | |
| click.echo(f" β {var}: Set") | |
| else: | |
| click.echo(f" β οΈ {var}: Not set") | |
| except Exception as e: | |
| click.echo(f"β Error getting system status: {str(e)}") | |
| def cleanup_old_data(days): | |
| """Clean up old data from the database.""" | |
| click.echo(f"π§Ή Cleaning up data older than {days} days...") | |
| try: | |
| cutoff_date = datetime.utcnow() - timedelta(days=days) | |
| # Clean up old opportunities that are no longer active | |
| old_opportunities = GrantOpportunity.query.filter( | |
| GrantOpportunity.application_due_date < cutoff_date | |
| ).all() | |
| deleted_count = 0 | |
| for opp in old_opportunities: | |
| # Only delete if no applications exist | |
| if not opp.applications.count(): | |
| db.session.delete(opp) | |
| deleted_count += 1 | |
| db.session.commit() | |
| click.echo(f"β Cleanup completed!") | |
| click.echo(f" Deleted {deleted_count} old opportunities") | |
| except Exception as e: | |
| click.echo(f"β Error during cleanup: {str(e)}") | |
| db.session.rollback() | |
| def init_db(): | |
| """Initialize the database with tables.""" | |
| click.echo("ποΈ Initializing database...") | |
| try: | |
| db.create_all() | |
| click.echo("β Database initialized successfully!") | |
| except Exception as e: | |
| click.echo(f"β Error initializing database: {str(e)}") | |
| def test_system(test_gemini, test_scraping): | |
| """Test system components.""" | |
| click.echo("π§ͺ Testing system components...") | |
| if test_gemini: | |
| click.echo(" Testing Gemini CLI...") | |
| try: | |
| # Since we're using the CLI, we can just check for the executable | |
| import shutil | |
| if shutil.which(current_app.config['GEMINI_CLI_PATH']): | |
| click.echo(" β Gemini CLI: Executable found") | |
| else: | |
| click.echo(" β Gemini CLI: Executable not found") | |
| except Exception as e: | |
| click.echo(f" β Gemini CLI: {str(e)}") | |
| if test_scraping: | |
| click.echo(" Testing web scraping...") | |
| try: | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.options import Options | |
| options = Options() | |
| options.add_argument('--headless') | |
| options.add_argument('--no-sandbox') | |
| options.add_argument('--disable-dev-shm-usage') | |
| driver = webdriver.Chrome(options=options) | |
| driver.get('https://www.google.com') | |
| driver.quit() | |
| click.echo(" β Web scraping: Working") | |
| except Exception as e: | |
| click.echo(f" β Web scraping: {str(e)}") | |
| if __name__ == '__main__': | |
| with app.app_context(): | |
| app.cli() | |