SciPaperLoader/tests/test_scheduler_functionality.py

398 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Comprehensive test for APScheduler functionality in SciPaperLoader.
Tests job scheduling, execution, revocation, and hourly scheduler functionality.
"""
import sys
import os
import time
import threading
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from scipaperloader import create_app
from scipaperloader.models import PaperMetadata, ScraperState, ActivityLog, ScheduleConfig, VolumeConfig
from scipaperloader.scrapers.manager import ScraperManager
from scipaperloader.db import db
def test_scheduler_functionality():
"""Comprehensive test of APScheduler functionality."""
print("🧪 Testing APScheduler Functionality")
print("=" * 50)
# Create test app with in-memory database
app = create_app({
'TESTING': True,
'SQLALCHEMY_DATABASE_URI': 'sqlite:///:memory:',
})
with app.app_context():
# Test 1: Basic scheduler availability
print("\n📋 Test 1: Scheduler Initialization")
scheduler = app.config.get('SCHEDULER')
if not scheduler:
print("❌ APScheduler not found in app config")
return False
print("✅ APScheduler available and initialized")
print(f"📊 Initial job count: {scheduler.get_job_count()}")
# Test 2: Database table creation
print("\n📋 Test 2: APScheduler Database Tables")
try:
# Check if we can query jobs (which requires tables to exist)
jobs = scheduler.get_paper_jobs()
print("✅ APScheduler database tables exist and accessible")
print(f"📋 Current paper jobs: {len(jobs)}")
except Exception as e:
print(f"❌ APScheduler database tables not accessible: {e}")
return False
# Test 3: Job scheduling functionality
print("\n📋 Test 3: Job Scheduling")
# Create test paper
test_paper = PaperMetadata(
title="Test Paper for Scheduler",
doi="10.1000/test_scheduler_001",
issn="1234-5678",
journal="Test Journal",
status="New"
)
db.session.add(test_paper)
db.session.commit()
# Schedule a paper for processing in 30 seconds (longer delay)
try:
job_id = scheduler.schedule_paper_processing(
paper_id=test_paper.id,
delay_seconds=30 # Increased delay to 30 seconds
# Removed explicit job_id to allow default "paper_job_" prefix
)
print(f"✅ Paper scheduling works: Job ID {job_id}")
except Exception as e:
print(f"❌ Paper scheduling failed: {e}")
return False
# Verify job was scheduled
jobs_after = scheduler.get_paper_jobs()
if len(jobs_after) == 0:
print("❌ No jobs found after scheduling")
return False
print(f"✅ Job successfully scheduled: {len(jobs_after)} paper job(s) found")
# Test 4: Job information retrieval
print("\n📋 Test 4: Job Information Retrieval")
scheduled_job = jobs_after[0]
print(f"✅ Job details accessible:")
print(f" 📝 Job ID: {scheduled_job['id']}")
print(f" 📝 Job Name: {scheduled_job['name']}")
print(f" 📝 Next Run Time: {scheduled_job['next_run_time']}")
print(f" 📝 Args: {scheduled_job['args']}")
# Test 5: Job revocation
print("\n📋 Test 5: Job Revocation")
initial_count = len(jobs_after)
revoked_count = scheduler.revoke_all_scraper_jobs()
if revoked_count != initial_count:
print(f"⚠️ Warning: Expected to revoke {initial_count} jobs, but revoked {revoked_count}")
else:
print(f"✅ Job revocation works: {revoked_count} job(s) revoked")
# Verify jobs were revoked
jobs_after_revocation = scheduler.get_paper_jobs()
if len(jobs_after_revocation) > 0:
print(f"❌ Jobs still exist after revocation: {len(jobs_after_revocation)}")
return False
print("✅ All paper jobs successfully revoked")
# Test 6: Multiple job scheduling
print("\n📋 Test 6: Multiple Job Scheduling")
# Create more test papers
test_papers = []
for i in range(3):
paper = PaperMetadata(
title=f"Test Paper {i+1}",
doi=f"10.1000/test_scheduler_{i+2:03d}",
issn="1234-5678",
journal="Test Journal",
status="New"
)
db.session.add(paper)
test_papers.append(paper)
db.session.commit()
# Schedule multiple papers
scheduled_jobs = []
for i, paper in enumerate(test_papers):
job_id = scheduler.schedule_paper_processing(
paper_id=paper.id,
delay_seconds=10 + i # Stagger the scheduling
# Removed explicit job_id to allow default "paper_job_" prefix
)
scheduled_jobs.append(job_id)
print(f"✅ Multiple job scheduling works: {len(scheduled_jobs)} jobs scheduled")
# Verify all jobs are scheduled
all_jobs = scheduler.get_paper_jobs()
if len(all_jobs) != len(test_papers):
print(f"❌ Expected {len(test_papers)} jobs, found {len(all_jobs)}")
return False
print(f"✅ All jobs properly scheduled: {len(all_jobs)} total jobs")
# Test 7: ScraperManager integration
print("\n📋 Test 7: ScraperManager Integration")
manager = ScraperManager()
# Test paper selection
papers = manager.select_papers_for_processing(limit=2)
print(f"✅ ScraperManager paper selection: {len(papers)} papers selected")
# Test scraper state management with APScheduler
start_result = manager.start_scraper()
if start_result["status"] != "success":
print(f"❌ Failed to start scraper: {start_result['message']}")
return False
print("✅ Scraper started successfully")
# Test job clearing through manager
cleared_count = manager._clear_delayed_tasks_from_apscheduler()
print(f"✅ ScraperManager job clearing: {cleared_count} jobs cleared")
# Verify jobs were cleared
remaining_jobs = scheduler.get_paper_jobs()
if len(remaining_jobs) > 0:
print(f"❌ Jobs still exist after manager clearing: {len(remaining_jobs)}")
return False
print("✅ ScraperManager successfully clears APScheduler jobs")
# Test 8: Hourly scheduler configuration
print("\n📋 Test 8: Hourly Scheduler Configuration")
# Ensure the hourly job is scheduled correctly
all_scheduler_jobs = scheduler._scheduler.get_jobs() if hasattr(scheduler, '_scheduler') and scheduler._scheduler else []
hourly_jobs = [job for job in all_scheduler_jobs if job.id == 'hourly_scraper_main']
if not hourly_jobs:
print("❌ Hourly scheduler job not found")
return False
hourly_job = hourly_jobs[0]
print("✅ Hourly scheduler job found:")
print(f" 📝 Job ID: {hourly_job.id}")
print(f" 📝 Job Name: {hourly_job.name}")
print(f" 📝 Trigger: {hourly_job.trigger}")
print(f" 📝 Next Run: {hourly_job.next_run_time}")
# Test 9: Configuration-based scheduling
print("\n📋 Test 9: Configuration-based Scheduling")
# Set up volume configuration
volume_config = VolumeConfig.query.first()
if not volume_config:
volume_config = VolumeConfig(volume=10) # 10 papers per day
db.session.add(volume_config)
db.session.commit()
# Test quota calculation
quota = manager.get_current_hour_quota()
print(f"✅ Hourly quota calculation: {quota} papers per hour")
if quota < 0:
print("❌ Invalid quota calculation")
return False
# Test 10: Activity logging integration
print("\n📋 Test 10: Activity Logging Integration")
# Check recent APScheduler-related logs
recent_logs = ActivityLog.query.filter(
ActivityLog.action.like('%apscheduler%')
).order_by(ActivityLog.timestamp.desc()).limit(5).all()
print(f"✅ APScheduler activity logging: {len(recent_logs)} related log entries")
if recent_logs:
for log in recent_logs[:3]:
print(f" 📝 {log.action}: {log.description}")
# Test 11: Error handling
print("\n📋 Test 11: Error Handling")
# Test scheduling with invalid paper ID
try:
scheduler.schedule_paper_processing(
paper_id=99999, # Non-existent paper
delay_seconds=1,
job_id="test_error_job"
)
print("✅ Scheduling with invalid paper ID handled gracefully")
except Exception as e:
print(f"✅ Scheduling with invalid paper ID properly raises exception: {e}")
# Test 12: Cleanup and shutdown
print("\n📋 Test 12: Cleanup and Shutdown")
# Stop scraper
stop_result = manager.stop_scraper()
if stop_result["status"] != "success":
print(f"❌ Failed to stop scraper: {stop_result['message']}")
return False
print("✅ Scraper stopped successfully")
# Final job count should be minimal (only hourly scheduler)
final_job_count = scheduler.get_job_count()
final_paper_jobs = len(scheduler.get_paper_jobs())
print(f"📊 Final state:")
print(f" 📝 Total jobs: {final_job_count}")
print(f" 📝 Paper jobs: {final_paper_jobs}")
if final_paper_jobs > 0:
print("❌ Paper jobs still exist after cleanup")
return False
print("✅ Cleanup completed successfully")
print("\n🎉 ALL SCHEDULER TESTS PASSED!")
print("\n📋 Test Summary:")
print(" ✅ APScheduler initialization works")
print(" ✅ Database tables created and accessible")
print(" ✅ Job scheduling functionality works")
print(" ✅ Job information retrieval works")
print(" ✅ Job revocation works")
print(" ✅ Multiple job scheduling works")
print(" ✅ ScraperManager integration works")
print(" ✅ Hourly scheduler configured correctly")
print(" ✅ Configuration-based scheduling works")
print(" ✅ Activity logging integration works")
print(" ✅ Error handling works")
print(" ✅ Cleanup and shutdown works")
return True
def test_job_execution():
"""Test that jobs actually execute (requires waiting)."""
print("\n🔄 Testing Job Execution (5-second test)")
print("-" * 40)
app = create_app({
'TESTING': True,
'SQLALCHEMY_DATABASE_URI': 'sqlite:///:memory:',
})
with app.app_context():
# Initialize database and scheduler
db.create_all()
scheduler = app.config.get('SCHEDULER')
if not scheduler:
print("❌ Scheduler not initialized")
return False
# Create test paper
test_paper = PaperMetadata(
title="Test Paper for Execution",
doi="10.1000/test_execution",
issn="1234-5678",
journal="Test Journal",
status="Pending"
)
db.session.add(test_paper)
db.session.commit()
# Verify paper is added to the database
test_paper_id = test_paper.id
if not test_paper_id:
print("❌ Test paper not added to the database")
return False
# Schedule paper for processing in 2 seconds
job_id = scheduler.schedule_paper_processing(
paper_id=test_paper_id,
delay_seconds=2
)
print(f"📅 Scheduled job {job_id} for execution in 2 seconds")
# Wait and check for execution
print("⏳ Waiting for job execution...")
time.sleep(3)
# Check if job completed (should be removed from scheduler)
remaining_jobs = scheduler.get_paper_jobs()
if remaining_jobs:
print(f"⚠️ Job still in scheduler: {len(remaining_jobs)} remaining")
for job in remaining_jobs:
print(f" 📝 Job ID: {job['id']}, Next Run Time: {job['next_run_time']}")
else:
print("✅ Job executed and removed from scheduler")
# Check activity logs for execution evidence
execution_logs = ActivityLog.query.filter(
ActivityLog.action.like('%process_single_paper%')
).order_by(ActivityLog.timestamp.desc()).limit(3).all()
if execution_logs:
print("✅ Job execution logged in activity:")
for log in execution_logs:
print(f" 📝 {log.action}: {log.description}")
else:
print("⚠️ No execution logs found")
# Validate job execution status in the database
updated_paper = PaperMetadata.query.get(test_paper_id)
if updated_paper:
print(f"🔍 Retrieved paper: {updated_paper.title}, Status: {updated_paper.status}")
if updated_paper.status == "Done":
print("✅ Paper status updated to 'Done'")
else:
print(f"❌ Paper status not updated: {updated_paper.status}")
else:
print("❌ Paper not found in the database")
return True
if __name__ == "__main__":
print(f"📅 Starting scheduler tests at {datetime.now()}")
try:
# Run main functionality tests
success = test_scheduler_functionality()
if success:
print("\n" + "="*50)
# Run execution test if main tests pass
test_job_execution()
print(f"\n📅 Tests completed at {datetime.now()}")
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n⏹️ Tests interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Test error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)