""" APScheduler-based scheduling system to replace complex Celery delayed task management. This provides clean job scheduling and revocation without manual Redis manipulation. """ import random import logging from datetime import datetime, timedelta from typing import Optional, List from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED from apscheduler.jobstores.base import JobLookupError # Configure APScheduler logging logging.getLogger('apscheduler').setLevel(logging.WARNING) # Global scheduler instance _scheduler = None _flask_app = None def _get_flask_app(): """Get the Flask app instance.""" global _flask_app if _flask_app: return _flask_app try: from flask import current_app return current_app except RuntimeError: return None def _hourly_scraper_scheduler(): """Standalone function for hourly scheduling logic.""" app = _get_flask_app() if not app: return with app.app_context(): try: from .models import ScraperState, ActivityLog # Check if scraper is active scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: ActivityLog.log_scraper_activity( action="hourly_scheduler_apscheduler", status="info", description="Hourly scheduler skipped - scraper not active" ) return {"status": "inactive", "papers_scheduled": 0} if scraper_state.is_paused: ActivityLog.log_scraper_activity( action="hourly_scheduler_apscheduler", status="info", description="Hourly scheduler skipped - scraper paused" ) return {"status": "paused", "papers_scheduled": 0} # Get papers to process this hour from .scrapers.manager import ScraperManager manager = ScraperManager() papers = manager.select_papers_for_processing() if not papers: ActivityLog.log_scraper_activity( action="hourly_scheduler_apscheduler", status="info", description="No papers available for processing this hour" ) return {"status": "empty", "papers_scheduled": 0} # Schedule papers at random times within the hour scheduled_count = 0 current_time = datetime.now() scheduled_papers = [] for paper in papers: # Random delay between 1 second and 58 minutes delay_seconds = random.randint(1, 3480) # Up to 58 minutes run_time = current_time + timedelta(seconds=delay_seconds) # Schedule the individual paper processing job with unique ID # Include microseconds and random suffix to prevent collisions import uuid job_id = f"process_paper_{paper.id}_{int(current_time.timestamp())}_{uuid.uuid4().hex[:8]}" global _scheduler if _scheduler: _scheduler.add_job( func=_process_single_paper, trigger='date', run_date=run_time, args=[paper.id], id=job_id, replace_existing=True, # Changed to True to handle conflicts gracefully name=f"Process Paper {paper.doi}" ) scheduled_count += 1 # Collect paper info for single log entry paper_info = { "paper_id": paper.id, "paper_doi": paper.doi, "job_id": job_id, "scheduled_time": run_time.isoformat(), "delay_seconds": delay_seconds } scheduled_papers.append(paper_info) # Create single comprehensive log entry with JSON data try: import json from .models import ActivityLog scheduling_data = { "total_scheduled": scheduled_count, "scheduled_papers": scheduled_papers, "timestamp": datetime.now().isoformat(), "hour_range": f"{current_time.strftime('%H:%M')} - {(current_time + timedelta(hours=1)).strftime('%H:%M')}" } ActivityLog.log_scraper_activity( action="hourly_scheduler_apscheduler", status="success", description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler. See extra_data for details.", **{"scheduling_details": json.dumps(scheduling_data)} ) except Exception: # Fallback to simple logging ActivityLog.log_scraper_activity( action="hourly_scheduler_apscheduler", status="success", description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler" ) return {"status": "success", "papers_scheduled": scheduled_count} except Exception as e: from .models import ActivityLog ActivityLog.log_error( error_message=f"APScheduler hourly scheduler error: {str(e)}", source="_hourly_scraper_scheduler" ) return {"status": "error", "message": str(e)} def _process_single_paper(paper_id: int): """Standalone function to process a single paper.""" app = _get_flask_app() if not app: return with app.app_context(): try: from .models import ScraperState, ActivityLog, PaperMetadata # Enhanced race condition protection scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: ActivityLog.log_scraper_activity( action="process_single_paper_apscheduler", paper_id=paper_id, status="skipped", description="Task skipped - scraper not active (APScheduler)" ) return {"status": "inactive", "paper_id": paper_id} if scraper_state.is_paused: ActivityLog.log_scraper_activity( action="process_single_paper_apscheduler", paper_id=paper_id, status="skipped", description="Task skipped - scraper paused (APScheduler)" ) return {"status": "paused", "paper_id": paper_id} # Get the paper paper = PaperMetadata.query.get(paper_id) if not paper: return {"status": "error", "message": f"Paper {paper_id} not found"} # Final check before processing scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: ActivityLog.log_scraper_activity( action="process_single_paper_apscheduler", paper_id=paper_id, status="skipped", description="Task skipped - scraper not active (pre-processing check)" ) return {"status": "inactive", "paper_id": paper_id} # Process the paper using scraper manager from .scrapers.manager import ScraperManager manager = ScraperManager() result = manager.process_paper(paper) return result except Exception as e: from .models import ActivityLog ActivityLog.log_error( error_message=f"Error processing paper {paper_id} in APScheduler: {str(e)}", source="_process_single_paper" ) return {"status": "error", "paper_id": paper_id, "message": str(e)} def _process_single_paper_manual(paper_id: int, scraper_name: Optional[str] = None): """Standalone function to process a single paper manually (bypasses scraper state checks).""" app = _get_flask_app() if not app: return with app.app_context(): try: from .models import ActivityLog, PaperMetadata # Get the paper paper = PaperMetadata.query.get(paper_id) if not paper: return {"status": "error", "message": f"Paper {paper_id} not found"} # Process the paper using manual method (bypasses scraper state checks) from .scrapers.manager import ScraperManager manager = ScraperManager() result = manager.process_paper_manual(paper, scraper_name=scraper_name) return result except Exception as e: from .models import ActivityLog ActivityLog.log_error( error_message=f"Error manually processing paper {paper_id} in APScheduler: {str(e)}", source="_process_single_paper_manual" ) return {"status": "error", "paper_id": paper_id, "message": str(e)} def _job_listener(event): """Listen to job execution events.""" app = _get_flask_app() if not app: return with app.app_context(): try: from .models import ActivityLog job_id = event.job_id if event.exception: ActivityLog.log_error( error_message=f"APScheduler job {job_id} failed: {str(event.exception)}", source="ScraperScheduler.job_listener" ) elif hasattr(event, 'retval') and event.retval: # Job completed successfully if job_id.startswith('process_paper_'): ActivityLog.log_scraper_activity( action="apscheduler_job_complete", status="success", description=f"Job {job_id} completed successfully" ) except Exception as e: # Don't let logging errors break the scheduler print(f"Error in job listener: {str(e)}") class ScraperScheduler: """APScheduler-based scraper task scheduler.""" def __init__(self, app=None): self.app = app if app: self.init_app(app) @property def scheduler(self): """Expose the global _scheduler instance.""" global _scheduler return _scheduler def init_app(self, app): """Initialize the scheduler with Flask app context.""" global _scheduler, _flask_app _flask_app = app self.app = app # Initialize scheduler within app context to access db.engine properly with app.app_context(): # Use the existing Flask-SQLAlchemy database engine for APScheduler from .db import db # Configure job store to use the existing database engine jobstores = { 'default': SQLAlchemyJobStore(engine=db.engine) } # Configure thread pool executor executors = { 'default': ThreadPoolExecutor(max_workers=50) # Increased from 20 to 50 } # Job defaults job_defaults = { 'coalesce': False, # Don't combine multiple scheduled instances 'max_instances': 3, # Allow up to 3 instances of the same job 'misfire_grace_time': 30 # 30 seconds grace period for missed jobs } # Get timezone from database configuration from .models import TimezoneConfig configured_timezone = TimezoneConfig.get_current_timezone() # Create the scheduler _scheduler = BackgroundScheduler( jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=configured_timezone # Use configurable timezone from database ) # Add event listeners _scheduler.add_listener(_job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED) # Start the scheduler FIRST, which will auto-create tables _scheduler.start() # THEN add the hourly scraper job _scheduler.add_job( func=_hourly_scraper_scheduler, trigger='cron', minute=0, # Run at the start of every hour id='hourly_scraper_main', replace_existing=True, name='Hourly Scraper Scheduler' ) try: from .models import ActivityLog ActivityLog.log_scraper_activity( action="apscheduler_init", status="success", description="APScheduler initialized with database job store and hourly scheduling" ) except Exception: # Handle case where we're outside application context print("✅ APScheduler initialized successfully") def revoke_all_scraper_jobs(self) -> int: """Clean replacement for the complex _clear_delayed_tasks_from_redis method.""" global _scheduler if not _scheduler: try: from .models import ActivityLog ActivityLog.log_error( error_message="Scheduler not initialized - cannot revoke jobs", source="ScraperScheduler.revoke_all_scraper_jobs" ) except Exception: print("❌ Scheduler not initialized - cannot revoke jobs") return 0 revoked_count = 0 revoked_jobs = [] already_gone_jobs = [] failed_jobs = [] try: # Get all jobs jobs = _scheduler.get_jobs() for job in jobs: # Remove any job that processes papers or uploads (but keep the main hourly scheduler) if ('paper_process_' in job.id or 'test_paper_process_' in job.id or 'process_paper_' in job.id or 'csv_upload_' in job.id or 'manual_paper_' in job.id or 'startup_paper_' in job.id): try: _scheduler.remove_job(job.id) revoked_count += 1 # Collect job info for single log entry job_info = { "job_id": job.id, "job_name": job.name, "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, "args": job.args } revoked_jobs.append(job_info) print(f"✅ Revoked APScheduler job: {job.id}") except JobLookupError as e: # Job already removed/completed - this is normal already_gone_jobs.append({ "job_id": job.id, "reason": str(e) }) print(f"ℹ️ Job {job.id} was already completed or removed") except Exception as e: # Other error - log it but continue failed_jobs.append({ "job_id": job.id, "error": str(e) }) print(f"❌ Error removing job {job.id}: {str(e)}") # Create single comprehensive log entry with JSON data if revoked_jobs or already_gone_jobs or failed_jobs: try: import json from .models import ActivityLog revocation_data = { "total_revoked": revoked_count, "revoked_jobs": revoked_jobs, "already_gone_jobs": already_gone_jobs, "failed_jobs": failed_jobs, "timestamp": datetime.now().isoformat() } ActivityLog.log_scraper_activity( action="revoke_all_scraper_jobs_apscheduler", status="success", description=f"Successfully revoked {revoked_count} APScheduler jobs. See extra_data for details.", **{"revocation_details": json.dumps(revocation_data)} ) except Exception: print(f"✅ Successfully revoked {revoked_count} APScheduler jobs") return revoked_count except Exception as e: try: from .models import ActivityLog ActivityLog.log_error( error_message=f"Error revoking APScheduler jobs: {str(e)}", source="ScraperScheduler.revoke_all_scraper_jobs" ) except Exception: print(f"❌ Error revoking APScheduler jobs: {str(e)}") return 0 def get_job_count(self) -> int: """Get the number of scheduled jobs.""" global _scheduler if not _scheduler: return 0 return len(_scheduler.get_jobs()) def get_paper_jobs(self) -> List[dict]: """Get information about scheduled paper processing jobs.""" global _scheduler if not _scheduler: return [] jobs = [] all_jobs = _scheduler.get_jobs() for job in all_jobs: # Match jobs that contain paper processing patterns if ('process_paper_' in job.id or 'paper_process_' in job.id or 'test_paper_process_' in job.id): job_info = { 'id': job.id, 'name': job.name, 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'args': job.args } jobs.append(job_info) return jobs def shutdown(self): """Gracefully shutdown the scheduler.""" global _scheduler if _scheduler: try: from .models import ActivityLog ActivityLog.log_scraper_activity( action="apscheduler_shutdown", status="info", description="Shutting down APScheduler" ) except Exception: print("🔄 Shutting down APScheduler") _scheduler.shutdown(wait=False) _scheduler = None def schedule_paper_processing(self, paper_id: int, delay_seconds: int = 0, job_id: Optional[str] = None) -> str: """Schedule a paper for processing with APScheduler. Args: paper_id: ID of the paper to process delay_seconds: Delay in seconds before processing (default: 0 for immediate) job_id: Optional custom job ID (will be generated if not provided) Returns: str: The job ID of the scheduled job """ global _scheduler if not _scheduler: raise RuntimeError("APScheduler not initialized") # Generate job ID if not provided if not job_id: # Use microseconds and UUID suffix to prevent collisions import uuid job_id = f"process_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}" # Calculate run time run_time = datetime.now() + timedelta(seconds=delay_seconds) # Schedule the job job = _scheduler.add_job( func=_process_single_paper, trigger='date', run_date=run_time, args=[paper_id], id=job_id, name=f"Process Paper {paper_id}", replace_existing=True ) # Log the scheduling try: from .models import ActivityLog ActivityLog.log_scraper_activity( action="schedule_paper_processing_apscheduler", paper_id=paper_id, status="info", description=f"Scheduled paper {paper_id} for processing at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})" ) except Exception: print(f"✅ Scheduled paper {paper_id} for processing (Job ID: {job_id})") return job_id def schedule_manual_paper_processing(self, paper_id: int, scraper_name: Optional[str] = None, delay_seconds: int = 0, job_id: Optional[str] = None) -> str: """ Schedule manual paper processing that bypasses scraper state checks. Args: paper_id: ID of the paper to process scraper_name: Optional specific scraper module to use (defaults to system scraper) delay_seconds: Delay before processing starts (default: 0) job_id: Optional custom job ID (auto-generated if not provided) Returns: Job ID of the scheduled task """ global _scheduler if not _scheduler: raise RuntimeError("APScheduler not initialized") if job_id is None: job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" run_time = datetime.now() + timedelta(seconds=delay_seconds) # Schedule the manual processing job job = _scheduler.add_job( func=_process_single_paper_manual, trigger='date', run_date=run_time, args=[paper_id, scraper_name], id=job_id, name=f"Manual Process Paper {paper_id}", replace_existing=True ) # Log the scheduling try: from .models import ActivityLog ActivityLog.log_scraper_activity( action="schedule_manual_paper_processing", paper_id=paper_id, status="info", description=f"Scheduled manual processing for paper {paper_id} at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})" ) except Exception: pass # Don't fail if logging fails return job_id