594 lines
23 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
APScheduler-based scheduling system to replace complex Celery delayed task management.
This provides clean job scheduling and revocation without manual Redis manipulation.
"""
import random
import logging
from datetime import datetime, timedelta
from typing import Optional, List
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
from apscheduler.jobstores.base import JobLookupError
# Configure APScheduler logging
logging.getLogger('apscheduler').setLevel(logging.WARNING)
# Global scheduler instance
_scheduler = None
_flask_app = None
def _get_flask_app():
"""Get the Flask app instance."""
global _flask_app
if _flask_app:
return _flask_app
try:
from flask import current_app
return current_app
except RuntimeError:
return None
def _hourly_scraper_scheduler():
"""Standalone function for hourly scheduling logic."""
app = _get_flask_app()
if not app:
return
with app.app_context():
try:
from .models import ScraperState, ActivityLog
# Check if scraper is active
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active:
ActivityLog.log_scraper_activity(
action="hourly_scheduler_apscheduler",
status="info",
description="Hourly scheduler skipped - scraper not active"
)
return {"status": "inactive", "papers_scheduled": 0}
if scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="hourly_scheduler_apscheduler",
status="info",
description="Hourly scheduler skipped - scraper paused"
)
return {"status": "paused", "papers_scheduled": 0}
# Get papers to process this hour
from .scrapers.manager import ScraperManager
manager = ScraperManager()
papers = manager.select_papers_for_processing()
if not papers:
ActivityLog.log_scraper_activity(
action="hourly_scheduler_apscheduler",
status="info",
description="No papers available for processing this hour"
)
return {"status": "empty", "papers_scheduled": 0}
# Schedule papers at random times within the hour
scheduled_count = 0
current_time = datetime.now()
scheduled_papers = []
for paper in papers:
# Random delay between 1 second and 58 minutes
delay_seconds = random.randint(1, 3480) # Up to 58 minutes
run_time = current_time + timedelta(seconds=delay_seconds)
# Schedule the individual paper processing job with unique ID
# Include microseconds and random suffix to prevent collisions
import uuid
job_id = f"process_paper_{paper.id}_{int(current_time.timestamp())}_{uuid.uuid4().hex[:8]}"
global _scheduler
if _scheduler:
_scheduler.add_job(
func=_process_single_paper,
trigger='date',
run_date=run_time,
args=[paper.id],
id=job_id,
replace_existing=True, # Changed to True to handle conflicts gracefully
name=f"Process Paper {paper.doi}"
)
scheduled_count += 1
# Collect paper info for single log entry
paper_info = {
"paper_id": paper.id,
"paper_doi": paper.doi,
"job_id": job_id,
"scheduled_time": run_time.isoformat(),
"delay_seconds": delay_seconds
}
scheduled_papers.append(paper_info)
# Create single comprehensive log entry with JSON data
try:
import json
from .models import ActivityLog
scheduling_data = {
"total_scheduled": scheduled_count,
"scheduled_papers": scheduled_papers,
"timestamp": datetime.now().isoformat(),
"hour_range": f"{current_time.strftime('%H:%M')} - {(current_time + timedelta(hours=1)).strftime('%H:%M')}"
}
ActivityLog.log_scraper_activity(
action="hourly_scheduler_apscheduler",
status="success",
description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler. See extra_data for details.",
**{"scheduling_details": json.dumps(scheduling_data)}
)
except Exception:
# Fallback to simple logging
ActivityLog.log_scraper_activity(
action="hourly_scheduler_apscheduler",
status="success",
description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler"
)
return {"status": "success", "papers_scheduled": scheduled_count}
except Exception as e:
from .models import ActivityLog
ActivityLog.log_error(
error_message=f"APScheduler hourly scheduler error: {str(e)}",
source="_hourly_scraper_scheduler"
)
return {"status": "error", "message": str(e)}
def _process_single_paper(paper_id: int):
"""Standalone function to process a single paper."""
app = _get_flask_app()
if not app:
return
with app.app_context():
try:
from .models import ScraperState, ActivityLog, PaperMetadata
# Enhanced race condition protection
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active:
ActivityLog.log_scraper_activity(
action="process_single_paper_apscheduler",
paper_id=paper_id,
status="skipped",
description="Task skipped - scraper not active (APScheduler)"
)
return {"status": "inactive", "paper_id": paper_id}
if scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="process_single_paper_apscheduler",
paper_id=paper_id,
status="skipped",
description="Task skipped - scraper paused (APScheduler)"
)
return {"status": "paused", "paper_id": paper_id}
# Get the paper
paper = PaperMetadata.query.get(paper_id)
if not paper:
return {"status": "error", "message": f"Paper {paper_id} not found"}
# Final check before processing
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active:
ActivityLog.log_scraper_activity(
action="process_single_paper_apscheduler",
paper_id=paper_id,
status="skipped",
description="Task skipped - scraper not active (pre-processing check)"
)
return {"status": "inactive", "paper_id": paper_id}
# Process the paper using scraper manager
from .scrapers.manager import ScraperManager
manager = ScraperManager()
result = manager.process_paper(paper)
return result
except Exception as e:
from .models import ActivityLog
ActivityLog.log_error(
error_message=f"Error processing paper {paper_id} in APScheduler: {str(e)}",
source="_process_single_paper"
)
return {"status": "error", "paper_id": paper_id, "message": str(e)}
def _process_single_paper_manual(paper_id: int, scraper_name: Optional[str] = None):
"""Standalone function to process a single paper manually (bypasses scraper state checks)."""
app = _get_flask_app()
if not app:
return
with app.app_context():
try:
from .models import ActivityLog, PaperMetadata
# Get the paper
paper = PaperMetadata.query.get(paper_id)
if not paper:
return {"status": "error", "message": f"Paper {paper_id} not found"}
# Process the paper using manual method (bypasses scraper state checks)
from .scrapers.manager import ScraperManager
manager = ScraperManager()
result = manager.process_paper_manual(paper, scraper_name=scraper_name)
return result
except Exception as e:
from .models import ActivityLog
ActivityLog.log_error(
error_message=f"Error manually processing paper {paper_id} in APScheduler: {str(e)}",
source="_process_single_paper_manual"
)
return {"status": "error", "paper_id": paper_id, "message": str(e)}
def _job_listener(event):
"""Listen to job execution events."""
app = _get_flask_app()
if not app:
return
with app.app_context():
try:
from .models import ActivityLog
job_id = event.job_id
if event.exception:
ActivityLog.log_error(
error_message=f"APScheduler job {job_id} failed: {str(event.exception)}",
source="ScraperScheduler.job_listener"
)
elif hasattr(event, 'retval') and event.retval:
# Job completed successfully
if job_id.startswith('process_paper_'):
ActivityLog.log_scraper_activity(
action="apscheduler_job_complete",
status="success",
description=f"Job {job_id} completed successfully"
)
except Exception as e:
# Don't let logging errors break the scheduler
print(f"Error in job listener: {str(e)}")
class ScraperScheduler:
"""APScheduler-based scraper task scheduler."""
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
@property
def scheduler(self):
"""Expose the global _scheduler instance."""
global _scheduler
return _scheduler
def init_app(self, app):
"""Initialize the scheduler with Flask app context."""
global _scheduler, _flask_app
_flask_app = app
self.app = app
# Initialize scheduler within app context to access db.engine properly
with app.app_context():
# Use the existing Flask-SQLAlchemy database engine for APScheduler
from .db import db
# Configure job store to use the existing database engine
jobstores = {
'default': SQLAlchemyJobStore(engine=db.engine)
}
# Configure thread pool executor
executors = {
'default': ThreadPoolExecutor(max_workers=50) # Increased from 20 to 50
}
# Job defaults
job_defaults = {
'coalesce': False, # Don't combine multiple scheduled instances
'max_instances': 3, # Allow up to 3 instances of the same job
'misfire_grace_time': 30 # 30 seconds grace period for missed jobs
}
# Get timezone from database configuration
from .models import TimezoneConfig
configured_timezone = TimezoneConfig.get_current_timezone()
# Create the scheduler
_scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone=configured_timezone # Use configurable timezone from database
)
# Add event listeners
_scheduler.add_listener(_job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED)
# Start the scheduler FIRST, which will auto-create tables
_scheduler.start()
# THEN add the hourly scraper job
_scheduler.add_job(
func=_hourly_scraper_scheduler,
trigger='cron',
minute=0, # Run at the start of every hour
id='hourly_scraper_main',
replace_existing=True,
name='Hourly Scraper Scheduler'
)
try:
from .models import ActivityLog
ActivityLog.log_scraper_activity(
action="apscheduler_init",
status="success",
description="APScheduler initialized with database job store and hourly scheduling"
)
except Exception:
# Handle case where we're outside application context
print("✅ APScheduler initialized successfully")
def revoke_all_scraper_jobs(self) -> int:
"""Clean replacement for the complex _clear_delayed_tasks_from_redis method."""
global _scheduler
if not _scheduler:
try:
from .models import ActivityLog
ActivityLog.log_error(
error_message="Scheduler not initialized - cannot revoke jobs",
source="ScraperScheduler.revoke_all_scraper_jobs"
)
except Exception:
print("❌ Scheduler not initialized - cannot revoke jobs")
return 0
revoked_count = 0
revoked_jobs = []
already_gone_jobs = []
failed_jobs = []
try:
# Get all jobs
jobs = _scheduler.get_jobs()
for job in jobs:
# Remove any job that processes papers or uploads (but keep the main hourly scheduler)
if ('paper_process_' in job.id or 'test_paper_process_' in job.id or
'process_paper_' in job.id or 'csv_upload_' in job.id or 'manual_paper_' in job.id or
'startup_paper_' in job.id):
try:
_scheduler.remove_job(job.id)
revoked_count += 1
# Collect job info for single log entry
job_info = {
"job_id": job.id,
"job_name": job.name,
"next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
"args": job.args
}
revoked_jobs.append(job_info)
print(f"✅ Revoked APScheduler job: {job.id}")
except JobLookupError as e:
# Job already removed/completed - this is normal
already_gone_jobs.append({
"job_id": job.id,
"reason": str(e)
})
print(f" Job {job.id} was already completed or removed")
except Exception as e:
# Other error - log it but continue
failed_jobs.append({
"job_id": job.id,
"error": str(e)
})
print(f"❌ Error removing job {job.id}: {str(e)}")
# Create single comprehensive log entry with JSON data
if revoked_jobs or already_gone_jobs or failed_jobs:
try:
import json
from .models import ActivityLog
revocation_data = {
"total_revoked": revoked_count,
"revoked_jobs": revoked_jobs,
"already_gone_jobs": already_gone_jobs,
"failed_jobs": failed_jobs,
"timestamp": datetime.now().isoformat()
}
ActivityLog.log_scraper_activity(
action="revoke_all_scraper_jobs_apscheduler",
status="success",
description=f"Successfully revoked {revoked_count} APScheduler jobs. See extra_data for details.",
**{"revocation_details": json.dumps(revocation_data)}
)
except Exception:
print(f"✅ Successfully revoked {revoked_count} APScheduler jobs")
return revoked_count
except Exception as e:
try:
from .models import ActivityLog
ActivityLog.log_error(
error_message=f"Error revoking APScheduler jobs: {str(e)}",
source="ScraperScheduler.revoke_all_scraper_jobs"
)
except Exception:
print(f"❌ Error revoking APScheduler jobs: {str(e)}")
return 0
def get_job_count(self) -> int:
"""Get the number of scheduled jobs."""
global _scheduler
if not _scheduler:
return 0
return len(_scheduler.get_jobs())
def get_paper_jobs(self) -> List[dict]:
"""Get information about scheduled paper processing jobs."""
global _scheduler
if not _scheduler:
return []
jobs = []
all_jobs = _scheduler.get_jobs()
for job in all_jobs:
# Match jobs that contain paper processing patterns
if ('process_paper_' in job.id or 'paper_process_' in job.id or 'test_paper_process_' in job.id):
job_info = {
'id': job.id,
'name': job.name,
'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
'args': job.args
}
jobs.append(job_info)
return jobs
def shutdown(self):
"""Gracefully shutdown the scheduler."""
global _scheduler
if _scheduler:
try:
from .models import ActivityLog
ActivityLog.log_scraper_activity(
action="apscheduler_shutdown",
status="info",
description="Shutting down APScheduler"
)
except Exception:
print("🔄 Shutting down APScheduler")
_scheduler.shutdown(wait=False)
_scheduler = None
def schedule_paper_processing(self, paper_id: int, delay_seconds: int = 0, job_id: Optional[str] = None) -> str:
"""Schedule a paper for processing with APScheduler.
Args:
paper_id: ID of the paper to process
delay_seconds: Delay in seconds before processing (default: 0 for immediate)
job_id: Optional custom job ID (will be generated if not provided)
Returns:
str: The job ID of the scheduled job
"""
global _scheduler
if not _scheduler:
raise RuntimeError("APScheduler not initialized")
# Generate job ID if not provided
if not job_id:
# Use microseconds and UUID suffix to prevent collisions
import uuid
job_id = f"process_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}"
# Calculate run time
run_time = datetime.now() + timedelta(seconds=delay_seconds)
# Schedule the job
job = _scheduler.add_job(
func=_process_single_paper,
trigger='date',
run_date=run_time,
args=[paper_id],
id=job_id,
name=f"Process Paper {paper_id}",
replace_existing=True
)
# Log the scheduling
try:
from .models import ActivityLog
ActivityLog.log_scraper_activity(
action="schedule_paper_processing_apscheduler",
paper_id=paper_id,
status="info",
description=f"Scheduled paper {paper_id} for processing at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})"
)
except Exception:
print(f"✅ Scheduled paper {paper_id} for processing (Job ID: {job_id})")
return job_id
def schedule_manual_paper_processing(self, paper_id: int, scraper_name: Optional[str] = None, delay_seconds: int = 0, job_id: Optional[str] = None) -> str:
"""
Schedule manual paper processing that bypasses scraper state checks.
Args:
paper_id: ID of the paper to process
scraper_name: Optional specific scraper module to use (defaults to system scraper)
delay_seconds: Delay before processing starts (default: 0)
job_id: Optional custom job ID (auto-generated if not provided)
Returns:
Job ID of the scheduled task
"""
global _scheduler
if not _scheduler:
raise RuntimeError("APScheduler not initialized")
if job_id is None:
job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
run_time = datetime.now() + timedelta(seconds=delay_seconds)
# Schedule the manual processing job
job = _scheduler.add_job(
func=_process_single_paper_manual,
trigger='date',
run_date=run_time,
args=[paper_id, scraper_name],
id=job_id,
name=f"Manual Process Paper {paper_id}",
replace_existing=True
)
# Log the scheduling
try:
from .models import ActivityLog
ActivityLog.log_scraper_activity(
action="schedule_manual_paper_processing",
paper_id=paper_id,
status="info",
description=f"Scheduled manual processing for paper {paper_id} at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})"
)
except Exception:
pass # Don't fail if logging fails
return job_id