594 lines
23 KiB
Python
594 lines
23 KiB
Python
"""
|
||
APScheduler-based scheduling system to replace complex Celery delayed task management.
|
||
This provides clean job scheduling and revocation without manual Redis manipulation.
|
||
"""
|
||
|
||
import random
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, List
|
||
from apscheduler.schedulers.background import BackgroundScheduler
|
||
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
|
||
from apscheduler.executors.pool import ThreadPoolExecutor
|
||
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
|
||
from apscheduler.jobstores.base import JobLookupError
|
||
|
||
# Configure APScheduler logging
|
||
logging.getLogger('apscheduler').setLevel(logging.WARNING)
|
||
|
||
# Global scheduler instance
|
||
_scheduler = None
|
||
_flask_app = None
|
||
|
||
|
||
def _get_flask_app():
|
||
"""Get the Flask app instance."""
|
||
global _flask_app
|
||
if _flask_app:
|
||
return _flask_app
|
||
|
||
try:
|
||
from flask import current_app
|
||
return current_app
|
||
except RuntimeError:
|
||
return None
|
||
|
||
|
||
def _hourly_scraper_scheduler():
|
||
"""Standalone function for hourly scheduling logic."""
|
||
app = _get_flask_app()
|
||
if not app:
|
||
return
|
||
|
||
with app.app_context():
|
||
try:
|
||
from .models import ScraperState, ActivityLog
|
||
|
||
# Check if scraper is active
|
||
scraper_state = ScraperState.get_current_state()
|
||
if not scraper_state.is_active:
|
||
ActivityLog.log_scraper_activity(
|
||
action="hourly_scheduler_apscheduler",
|
||
status="info",
|
||
description="Hourly scheduler skipped - scraper not active"
|
||
)
|
||
return {"status": "inactive", "papers_scheduled": 0}
|
||
|
||
if scraper_state.is_paused:
|
||
ActivityLog.log_scraper_activity(
|
||
action="hourly_scheduler_apscheduler",
|
||
status="info",
|
||
description="Hourly scheduler skipped - scraper paused"
|
||
)
|
||
return {"status": "paused", "papers_scheduled": 0}
|
||
|
||
# Get papers to process this hour
|
||
from .scrapers.manager import ScraperManager
|
||
manager = ScraperManager()
|
||
papers = manager.select_papers_for_processing()
|
||
|
||
if not papers:
|
||
ActivityLog.log_scraper_activity(
|
||
action="hourly_scheduler_apscheduler",
|
||
status="info",
|
||
description="No papers available for processing this hour"
|
||
)
|
||
return {"status": "empty", "papers_scheduled": 0}
|
||
|
||
# Schedule papers at random times within the hour
|
||
scheduled_count = 0
|
||
current_time = datetime.now()
|
||
scheduled_papers = []
|
||
|
||
for paper in papers:
|
||
# Random delay between 1 second and 58 minutes
|
||
delay_seconds = random.randint(1, 3480) # Up to 58 minutes
|
||
run_time = current_time + timedelta(seconds=delay_seconds)
|
||
|
||
# Schedule the individual paper processing job with unique ID
|
||
# Include microseconds and random suffix to prevent collisions
|
||
import uuid
|
||
job_id = f"process_paper_{paper.id}_{int(current_time.timestamp())}_{uuid.uuid4().hex[:8]}"
|
||
|
||
global _scheduler
|
||
if _scheduler:
|
||
_scheduler.add_job(
|
||
func=_process_single_paper,
|
||
trigger='date',
|
||
run_date=run_time,
|
||
args=[paper.id],
|
||
id=job_id,
|
||
replace_existing=True, # Changed to True to handle conflicts gracefully
|
||
name=f"Process Paper {paper.doi}"
|
||
)
|
||
|
||
scheduled_count += 1
|
||
|
||
# Collect paper info for single log entry
|
||
paper_info = {
|
||
"paper_id": paper.id,
|
||
"paper_doi": paper.doi,
|
||
"job_id": job_id,
|
||
"scheduled_time": run_time.isoformat(),
|
||
"delay_seconds": delay_seconds
|
||
}
|
||
scheduled_papers.append(paper_info)
|
||
|
||
# Create single comprehensive log entry with JSON data
|
||
try:
|
||
import json
|
||
from .models import ActivityLog
|
||
|
||
scheduling_data = {
|
||
"total_scheduled": scheduled_count,
|
||
"scheduled_papers": scheduled_papers,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"hour_range": f"{current_time.strftime('%H:%M')} - {(current_time + timedelta(hours=1)).strftime('%H:%M')}"
|
||
}
|
||
|
||
ActivityLog.log_scraper_activity(
|
||
action="hourly_scheduler_apscheduler",
|
||
status="success",
|
||
description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler. See extra_data for details.",
|
||
**{"scheduling_details": json.dumps(scheduling_data)}
|
||
)
|
||
except Exception:
|
||
# Fallback to simple logging
|
||
ActivityLog.log_scraper_activity(
|
||
action="hourly_scheduler_apscheduler",
|
||
status="success",
|
||
description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler"
|
||
)
|
||
|
||
return {"status": "success", "papers_scheduled": scheduled_count}
|
||
|
||
except Exception as e:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_error(
|
||
error_message=f"APScheduler hourly scheduler error: {str(e)}",
|
||
source="_hourly_scraper_scheduler"
|
||
)
|
||
return {"status": "error", "message": str(e)}
|
||
|
||
|
||
def _process_single_paper(paper_id: int):
|
||
"""Standalone function to process a single paper."""
|
||
app = _get_flask_app()
|
||
if not app:
|
||
return
|
||
|
||
with app.app_context():
|
||
try:
|
||
from .models import ScraperState, ActivityLog, PaperMetadata
|
||
|
||
# Enhanced race condition protection
|
||
scraper_state = ScraperState.get_current_state()
|
||
if not scraper_state.is_active:
|
||
ActivityLog.log_scraper_activity(
|
||
action="process_single_paper_apscheduler",
|
||
paper_id=paper_id,
|
||
status="skipped",
|
||
description="Task skipped - scraper not active (APScheduler)"
|
||
)
|
||
return {"status": "inactive", "paper_id": paper_id}
|
||
|
||
if scraper_state.is_paused:
|
||
ActivityLog.log_scraper_activity(
|
||
action="process_single_paper_apscheduler",
|
||
paper_id=paper_id,
|
||
status="skipped",
|
||
description="Task skipped - scraper paused (APScheduler)"
|
||
)
|
||
return {"status": "paused", "paper_id": paper_id}
|
||
|
||
# Get the paper
|
||
paper = PaperMetadata.query.get(paper_id)
|
||
if not paper:
|
||
return {"status": "error", "message": f"Paper {paper_id} not found"}
|
||
|
||
# Final check before processing
|
||
scraper_state = ScraperState.get_current_state()
|
||
if not scraper_state.is_active:
|
||
ActivityLog.log_scraper_activity(
|
||
action="process_single_paper_apscheduler",
|
||
paper_id=paper_id,
|
||
status="skipped",
|
||
description="Task skipped - scraper not active (pre-processing check)"
|
||
)
|
||
return {"status": "inactive", "paper_id": paper_id}
|
||
|
||
# Process the paper using scraper manager
|
||
from .scrapers.manager import ScraperManager
|
||
manager = ScraperManager()
|
||
result = manager.process_paper(paper)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_error(
|
||
error_message=f"Error processing paper {paper_id} in APScheduler: {str(e)}",
|
||
source="_process_single_paper"
|
||
)
|
||
return {"status": "error", "paper_id": paper_id, "message": str(e)}
|
||
|
||
|
||
def _process_single_paper_manual(paper_id: int, scraper_name: Optional[str] = None):
|
||
"""Standalone function to process a single paper manually (bypasses scraper state checks)."""
|
||
app = _get_flask_app()
|
||
if not app:
|
||
return
|
||
|
||
with app.app_context():
|
||
try:
|
||
from .models import ActivityLog, PaperMetadata
|
||
|
||
# Get the paper
|
||
paper = PaperMetadata.query.get(paper_id)
|
||
if not paper:
|
||
return {"status": "error", "message": f"Paper {paper_id} not found"}
|
||
|
||
# Process the paper using manual method (bypasses scraper state checks)
|
||
from .scrapers.manager import ScraperManager
|
||
manager = ScraperManager()
|
||
result = manager.process_paper_manual(paper, scraper_name=scraper_name)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_error(
|
||
error_message=f"Error manually processing paper {paper_id} in APScheduler: {str(e)}",
|
||
source="_process_single_paper_manual"
|
||
)
|
||
return {"status": "error", "paper_id": paper_id, "message": str(e)}
|
||
|
||
|
||
def _job_listener(event):
|
||
"""Listen to job execution events."""
|
||
app = _get_flask_app()
|
||
if not app:
|
||
return
|
||
|
||
with app.app_context():
|
||
try:
|
||
from .models import ActivityLog
|
||
|
||
job_id = event.job_id
|
||
|
||
if event.exception:
|
||
ActivityLog.log_error(
|
||
error_message=f"APScheduler job {job_id} failed: {str(event.exception)}",
|
||
source="ScraperScheduler.job_listener"
|
||
)
|
||
elif hasattr(event, 'retval') and event.retval:
|
||
# Job completed successfully
|
||
if job_id.startswith('process_paper_'):
|
||
ActivityLog.log_scraper_activity(
|
||
action="apscheduler_job_complete",
|
||
status="success",
|
||
description=f"Job {job_id} completed successfully"
|
||
)
|
||
except Exception as e:
|
||
# Don't let logging errors break the scheduler
|
||
print(f"Error in job listener: {str(e)}")
|
||
|
||
|
||
class ScraperScheduler:
|
||
"""APScheduler-based scraper task scheduler."""
|
||
|
||
def __init__(self, app=None):
|
||
self.app = app
|
||
if app:
|
||
self.init_app(app)
|
||
|
||
@property
|
||
def scheduler(self):
|
||
"""Expose the global _scheduler instance."""
|
||
global _scheduler
|
||
return _scheduler
|
||
|
||
def init_app(self, app):
|
||
"""Initialize the scheduler with Flask app context."""
|
||
global _scheduler, _flask_app
|
||
_flask_app = app
|
||
self.app = app
|
||
|
||
# Initialize scheduler within app context to access db.engine properly
|
||
with app.app_context():
|
||
# Use the existing Flask-SQLAlchemy database engine for APScheduler
|
||
from .db import db
|
||
|
||
# Configure job store to use the existing database engine
|
||
jobstores = {
|
||
'default': SQLAlchemyJobStore(engine=db.engine)
|
||
}
|
||
|
||
# Configure thread pool executor
|
||
executors = {
|
||
'default': ThreadPoolExecutor(max_workers=50) # Increased from 20 to 50
|
||
}
|
||
|
||
# Job defaults
|
||
job_defaults = {
|
||
'coalesce': False, # Don't combine multiple scheduled instances
|
||
'max_instances': 3, # Allow up to 3 instances of the same job
|
||
'misfire_grace_time': 30 # 30 seconds grace period for missed jobs
|
||
}
|
||
|
||
# Get timezone from database configuration
|
||
from .models import TimezoneConfig
|
||
configured_timezone = TimezoneConfig.get_current_timezone()
|
||
|
||
# Create the scheduler
|
||
_scheduler = BackgroundScheduler(
|
||
jobstores=jobstores,
|
||
executors=executors,
|
||
job_defaults=job_defaults,
|
||
timezone=configured_timezone # Use configurable timezone from database
|
||
)
|
||
|
||
# Add event listeners
|
||
_scheduler.add_listener(_job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
|
||
|
||
# Start the scheduler FIRST, which will auto-create tables
|
||
_scheduler.start()
|
||
|
||
# THEN add the hourly scraper job
|
||
_scheduler.add_job(
|
||
func=_hourly_scraper_scheduler,
|
||
trigger='cron',
|
||
minute=0, # Run at the start of every hour
|
||
id='hourly_scraper_main',
|
||
replace_existing=True,
|
||
name='Hourly Scraper Scheduler'
|
||
)
|
||
|
||
try:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_scraper_activity(
|
||
action="apscheduler_init",
|
||
status="success",
|
||
description="APScheduler initialized with database job store and hourly scheduling"
|
||
)
|
||
except Exception:
|
||
# Handle case where we're outside application context
|
||
print("✅ APScheduler initialized successfully")
|
||
|
||
def revoke_all_scraper_jobs(self) -> int:
|
||
"""Clean replacement for the complex _clear_delayed_tasks_from_redis method."""
|
||
global _scheduler
|
||
if not _scheduler:
|
||
try:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_error(
|
||
error_message="Scheduler not initialized - cannot revoke jobs",
|
||
source="ScraperScheduler.revoke_all_scraper_jobs"
|
||
)
|
||
except Exception:
|
||
print("❌ Scheduler not initialized - cannot revoke jobs")
|
||
return 0
|
||
|
||
revoked_count = 0
|
||
revoked_jobs = []
|
||
already_gone_jobs = []
|
||
failed_jobs = []
|
||
|
||
try:
|
||
# Get all jobs
|
||
jobs = _scheduler.get_jobs()
|
||
|
||
for job in jobs:
|
||
# Remove any job that processes papers or uploads (but keep the main hourly scheduler)
|
||
if ('paper_process_' in job.id or 'test_paper_process_' in job.id or
|
||
'process_paper_' in job.id or 'csv_upload_' in job.id or 'manual_paper_' in job.id or
|
||
'startup_paper_' in job.id):
|
||
try:
|
||
_scheduler.remove_job(job.id)
|
||
revoked_count += 1
|
||
|
||
# Collect job info for single log entry
|
||
job_info = {
|
||
"job_id": job.id,
|
||
"job_name": job.name,
|
||
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
|
||
"args": job.args
|
||
}
|
||
revoked_jobs.append(job_info)
|
||
|
||
print(f"✅ Revoked APScheduler job: {job.id}")
|
||
|
||
except JobLookupError as e:
|
||
# Job already removed/completed - this is normal
|
||
already_gone_jobs.append({
|
||
"job_id": job.id,
|
||
"reason": str(e)
|
||
})
|
||
print(f"ℹ️ Job {job.id} was already completed or removed")
|
||
|
||
except Exception as e:
|
||
# Other error - log it but continue
|
||
failed_jobs.append({
|
||
"job_id": job.id,
|
||
"error": str(e)
|
||
})
|
||
print(f"❌ Error removing job {job.id}: {str(e)}")
|
||
|
||
# Create single comprehensive log entry with JSON data
|
||
if revoked_jobs or already_gone_jobs or failed_jobs:
|
||
try:
|
||
import json
|
||
from .models import ActivityLog
|
||
|
||
revocation_data = {
|
||
"total_revoked": revoked_count,
|
||
"revoked_jobs": revoked_jobs,
|
||
"already_gone_jobs": already_gone_jobs,
|
||
"failed_jobs": failed_jobs,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
ActivityLog.log_scraper_activity(
|
||
action="revoke_all_scraper_jobs_apscheduler",
|
||
status="success",
|
||
description=f"Successfully revoked {revoked_count} APScheduler jobs. See extra_data for details.",
|
||
**{"revocation_details": json.dumps(revocation_data)}
|
||
)
|
||
except Exception:
|
||
print(f"✅ Successfully revoked {revoked_count} APScheduler jobs")
|
||
|
||
return revoked_count
|
||
|
||
except Exception as e:
|
||
try:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_error(
|
||
error_message=f"Error revoking APScheduler jobs: {str(e)}",
|
||
source="ScraperScheduler.revoke_all_scraper_jobs"
|
||
)
|
||
except Exception:
|
||
print(f"❌ Error revoking APScheduler jobs: {str(e)}")
|
||
return 0
|
||
|
||
def get_job_count(self) -> int:
|
||
"""Get the number of scheduled jobs."""
|
||
global _scheduler
|
||
if not _scheduler:
|
||
return 0
|
||
return len(_scheduler.get_jobs())
|
||
|
||
def get_paper_jobs(self) -> List[dict]:
|
||
"""Get information about scheduled paper processing jobs."""
|
||
global _scheduler
|
||
if not _scheduler:
|
||
return []
|
||
|
||
jobs = []
|
||
all_jobs = _scheduler.get_jobs()
|
||
|
||
for job in all_jobs:
|
||
# Match jobs that contain paper processing patterns
|
||
if ('process_paper_' in job.id or 'paper_process_' in job.id or 'test_paper_process_' in job.id):
|
||
job_info = {
|
||
'id': job.id,
|
||
'name': job.name,
|
||
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
|
||
'args': job.args
|
||
}
|
||
jobs.append(job_info)
|
||
|
||
return jobs
|
||
|
||
def shutdown(self):
|
||
"""Gracefully shutdown the scheduler."""
|
||
global _scheduler
|
||
if _scheduler:
|
||
try:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_scraper_activity(
|
||
action="apscheduler_shutdown",
|
||
status="info",
|
||
description="Shutting down APScheduler"
|
||
)
|
||
except Exception:
|
||
print("🔄 Shutting down APScheduler")
|
||
|
||
_scheduler.shutdown(wait=False)
|
||
_scheduler = None
|
||
|
||
def schedule_paper_processing(self, paper_id: int, delay_seconds: int = 0, job_id: Optional[str] = None) -> str:
|
||
"""Schedule a paper for processing with APScheduler.
|
||
|
||
Args:
|
||
paper_id: ID of the paper to process
|
||
delay_seconds: Delay in seconds before processing (default: 0 for immediate)
|
||
job_id: Optional custom job ID (will be generated if not provided)
|
||
|
||
Returns:
|
||
str: The job ID of the scheduled job
|
||
"""
|
||
global _scheduler
|
||
if not _scheduler:
|
||
raise RuntimeError("APScheduler not initialized")
|
||
|
||
# Generate job ID if not provided
|
||
if not job_id:
|
||
# Use microseconds and UUID suffix to prevent collisions
|
||
import uuid
|
||
job_id = f"process_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}"
|
||
|
||
# Calculate run time
|
||
run_time = datetime.now() + timedelta(seconds=delay_seconds)
|
||
|
||
# Schedule the job
|
||
job = _scheduler.add_job(
|
||
func=_process_single_paper,
|
||
trigger='date',
|
||
run_date=run_time,
|
||
args=[paper_id],
|
||
id=job_id,
|
||
name=f"Process Paper {paper_id}",
|
||
replace_existing=True
|
||
)
|
||
|
||
# Log the scheduling
|
||
try:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_scraper_activity(
|
||
action="schedule_paper_processing_apscheduler",
|
||
paper_id=paper_id,
|
||
status="info",
|
||
description=f"Scheduled paper {paper_id} for processing at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})"
|
||
)
|
||
except Exception:
|
||
print(f"✅ Scheduled paper {paper_id} for processing (Job ID: {job_id})")
|
||
|
||
return job_id
|
||
|
||
def schedule_manual_paper_processing(self, paper_id: int, scraper_name: Optional[str] = None, delay_seconds: int = 0, job_id: Optional[str] = None) -> str:
|
||
"""
|
||
Schedule manual paper processing that bypasses scraper state checks.
|
||
|
||
Args:
|
||
paper_id: ID of the paper to process
|
||
scraper_name: Optional specific scraper module to use (defaults to system scraper)
|
||
delay_seconds: Delay before processing starts (default: 0)
|
||
job_id: Optional custom job ID (auto-generated if not provided)
|
||
|
||
Returns:
|
||
Job ID of the scheduled task
|
||
"""
|
||
global _scheduler
|
||
if not _scheduler:
|
||
raise RuntimeError("APScheduler not initialized")
|
||
|
||
if job_id is None:
|
||
job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
|
||
|
||
run_time = datetime.now() + timedelta(seconds=delay_seconds)
|
||
|
||
# Schedule the manual processing job
|
||
job = _scheduler.add_job(
|
||
func=_process_single_paper_manual,
|
||
trigger='date',
|
||
run_date=run_time,
|
||
args=[paper_id, scraper_name],
|
||
id=job_id,
|
||
name=f"Manual Process Paper {paper_id}",
|
||
replace_existing=True
|
||
)
|
||
|
||
# Log the scheduling
|
||
try:
|
||
from .models import ActivityLog
|
||
ActivityLog.log_scraper_activity(
|
||
action="schedule_manual_paper_processing",
|
||
paper_id=paper_id,
|
||
status="info",
|
||
description=f"Scheduled manual processing for paper {paper_id} at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})"
|
||
)
|
||
except Exception:
|
||
pass # Don't fail if logging fails
|
||
|
||
return job_id
|