398 lines
15 KiB
Python
398 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive test for APScheduler functionality in SciPaperLoader.
|
|
Tests job scheduling, execution, revocation, and hourly scheduler functionality.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from scipaperloader import create_app
|
|
from scipaperloader.models import PaperMetadata, ScraperState, ActivityLog, ScheduleConfig, VolumeConfig
|
|
from scipaperloader.scrapers.manager import ScraperManager
|
|
from scipaperloader.db import db
|
|
|
|
|
|
def test_scheduler_functionality():
|
|
"""Comprehensive test of APScheduler functionality."""
|
|
|
|
print("🧪 Testing APScheduler Functionality")
|
|
print("=" * 50)
|
|
|
|
# Create test app with in-memory database
|
|
app = create_app({
|
|
'TESTING': True,
|
|
'SQLALCHEMY_DATABASE_URI': 'sqlite:///:memory:',
|
|
})
|
|
|
|
with app.app_context():
|
|
# Test 1: Basic scheduler availability
|
|
print("\n📋 Test 1: Scheduler Initialization")
|
|
scheduler = app.config.get('SCHEDULER')
|
|
if not scheduler:
|
|
print("❌ APScheduler not found in app config")
|
|
return False
|
|
|
|
print("✅ APScheduler available and initialized")
|
|
print(f"📊 Initial job count: {scheduler.get_job_count()}")
|
|
|
|
# Test 2: Database table creation
|
|
print("\n📋 Test 2: APScheduler Database Tables")
|
|
try:
|
|
# Check if we can query jobs (which requires tables to exist)
|
|
jobs = scheduler.get_paper_jobs()
|
|
print("✅ APScheduler database tables exist and accessible")
|
|
print(f"📋 Current paper jobs: {len(jobs)}")
|
|
except Exception as e:
|
|
print(f"❌ APScheduler database tables not accessible: {e}")
|
|
return False
|
|
|
|
# Test 3: Job scheduling functionality
|
|
print("\n📋 Test 3: Job Scheduling")
|
|
|
|
# Create test paper
|
|
test_paper = PaperMetadata(
|
|
title="Test Paper for Scheduler",
|
|
doi="10.1000/test_scheduler_001",
|
|
issn="1234-5678",
|
|
journal="Test Journal",
|
|
status="New"
|
|
)
|
|
db.session.add(test_paper)
|
|
db.session.commit()
|
|
|
|
# Schedule a paper for processing in 30 seconds (longer delay)
|
|
try:
|
|
job_id = scheduler.schedule_paper_processing(
|
|
paper_id=test_paper.id,
|
|
delay_seconds=30 # Increased delay to 30 seconds
|
|
# Removed explicit job_id to allow default "paper_job_" prefix
|
|
)
|
|
print(f"✅ Paper scheduling works: Job ID {job_id}")
|
|
except Exception as e:
|
|
print(f"❌ Paper scheduling failed: {e}")
|
|
return False
|
|
|
|
# Verify job was scheduled
|
|
jobs_after = scheduler.get_paper_jobs()
|
|
if len(jobs_after) == 0:
|
|
print("❌ No jobs found after scheduling")
|
|
return False
|
|
|
|
print(f"✅ Job successfully scheduled: {len(jobs_after)} paper job(s) found")
|
|
|
|
# Test 4: Job information retrieval
|
|
print("\n📋 Test 4: Job Information Retrieval")
|
|
|
|
scheduled_job = jobs_after[0]
|
|
print(f"✅ Job details accessible:")
|
|
print(f" 📝 Job ID: {scheduled_job['id']}")
|
|
print(f" 📝 Job Name: {scheduled_job['name']}")
|
|
print(f" 📝 Next Run Time: {scheduled_job['next_run_time']}")
|
|
print(f" 📝 Args: {scheduled_job['args']}")
|
|
|
|
# Test 5: Job revocation
|
|
print("\n📋 Test 5: Job Revocation")
|
|
|
|
initial_count = len(jobs_after)
|
|
revoked_count = scheduler.revoke_all_scraper_jobs()
|
|
|
|
if revoked_count != initial_count:
|
|
print(f"⚠️ Warning: Expected to revoke {initial_count} jobs, but revoked {revoked_count}")
|
|
else:
|
|
print(f"✅ Job revocation works: {revoked_count} job(s) revoked")
|
|
|
|
# Verify jobs were revoked
|
|
jobs_after_revocation = scheduler.get_paper_jobs()
|
|
if len(jobs_after_revocation) > 0:
|
|
print(f"❌ Jobs still exist after revocation: {len(jobs_after_revocation)}")
|
|
return False
|
|
|
|
print("✅ All paper jobs successfully revoked")
|
|
|
|
# Test 6: Multiple job scheduling
|
|
print("\n📋 Test 6: Multiple Job Scheduling")
|
|
|
|
# Create more test papers
|
|
test_papers = []
|
|
for i in range(3):
|
|
paper = PaperMetadata(
|
|
title=f"Test Paper {i+1}",
|
|
doi=f"10.1000/test_scheduler_{i+2:03d}",
|
|
issn="1234-5678",
|
|
journal="Test Journal",
|
|
status="New"
|
|
)
|
|
db.session.add(paper)
|
|
test_papers.append(paper)
|
|
|
|
db.session.commit()
|
|
|
|
# Schedule multiple papers
|
|
scheduled_jobs = []
|
|
for i, paper in enumerate(test_papers):
|
|
job_id = scheduler.schedule_paper_processing(
|
|
paper_id=paper.id,
|
|
delay_seconds=10 + i # Stagger the scheduling
|
|
# Removed explicit job_id to allow default "paper_job_" prefix
|
|
)
|
|
scheduled_jobs.append(job_id)
|
|
|
|
print(f"✅ Multiple job scheduling works: {len(scheduled_jobs)} jobs scheduled")
|
|
|
|
# Verify all jobs are scheduled
|
|
all_jobs = scheduler.get_paper_jobs()
|
|
if len(all_jobs) != len(test_papers):
|
|
print(f"❌ Expected {len(test_papers)} jobs, found {len(all_jobs)}")
|
|
return False
|
|
|
|
print(f"✅ All jobs properly scheduled: {len(all_jobs)} total jobs")
|
|
|
|
# Test 7: ScraperManager integration
|
|
print("\n📋 Test 7: ScraperManager Integration")
|
|
|
|
manager = ScraperManager()
|
|
|
|
# Test paper selection
|
|
papers = manager.select_papers_for_processing(limit=2)
|
|
print(f"✅ ScraperManager paper selection: {len(papers)} papers selected")
|
|
|
|
# Test scraper state management with APScheduler
|
|
start_result = manager.start_scraper()
|
|
if start_result["status"] != "success":
|
|
print(f"❌ Failed to start scraper: {start_result['message']}")
|
|
return False
|
|
|
|
print("✅ Scraper started successfully")
|
|
|
|
# Test job clearing through manager
|
|
cleared_count = manager._clear_delayed_tasks_from_apscheduler()
|
|
print(f"✅ ScraperManager job clearing: {cleared_count} jobs cleared")
|
|
|
|
# Verify jobs were cleared
|
|
remaining_jobs = scheduler.get_paper_jobs()
|
|
if len(remaining_jobs) > 0:
|
|
print(f"❌ Jobs still exist after manager clearing: {len(remaining_jobs)}")
|
|
return False
|
|
|
|
print("✅ ScraperManager successfully clears APScheduler jobs")
|
|
|
|
# Test 8: Hourly scheduler configuration
|
|
print("\n📋 Test 8: Hourly Scheduler Configuration")
|
|
|
|
# Ensure the hourly job is scheduled correctly
|
|
all_scheduler_jobs = scheduler._scheduler.get_jobs() if hasattr(scheduler, '_scheduler') and scheduler._scheduler else []
|
|
hourly_jobs = [job for job in all_scheduler_jobs if job.id == 'hourly_scraper_main']
|
|
|
|
if not hourly_jobs:
|
|
print("❌ Hourly scheduler job not found")
|
|
return False
|
|
|
|
hourly_job = hourly_jobs[0]
|
|
print("✅ Hourly scheduler job found:")
|
|
print(f" 📝 Job ID: {hourly_job.id}")
|
|
print(f" 📝 Job Name: {hourly_job.name}")
|
|
print(f" 📝 Trigger: {hourly_job.trigger}")
|
|
print(f" 📝 Next Run: {hourly_job.next_run_time}")
|
|
|
|
# Test 9: Configuration-based scheduling
|
|
print("\n📋 Test 9: Configuration-based Scheduling")
|
|
|
|
# Set up volume configuration
|
|
volume_config = VolumeConfig.query.first()
|
|
if not volume_config:
|
|
volume_config = VolumeConfig(volume=10) # 10 papers per day
|
|
db.session.add(volume_config)
|
|
db.session.commit()
|
|
|
|
# Test quota calculation
|
|
quota = manager.get_current_hour_quota()
|
|
print(f"✅ Hourly quota calculation: {quota} papers per hour")
|
|
|
|
if quota < 0:
|
|
print("❌ Invalid quota calculation")
|
|
return False
|
|
|
|
# Test 10: Activity logging integration
|
|
print("\n📋 Test 10: Activity Logging Integration")
|
|
|
|
# Check recent APScheduler-related logs
|
|
recent_logs = ActivityLog.query.filter(
|
|
ActivityLog.action.like('%apscheduler%')
|
|
).order_by(ActivityLog.timestamp.desc()).limit(5).all()
|
|
|
|
print(f"✅ APScheduler activity logging: {len(recent_logs)} related log entries")
|
|
|
|
if recent_logs:
|
|
for log in recent_logs[:3]:
|
|
print(f" 📝 {log.action}: {log.description}")
|
|
|
|
# Test 11: Error handling
|
|
print("\n📋 Test 11: Error Handling")
|
|
|
|
# Test scheduling with invalid paper ID
|
|
try:
|
|
scheduler.schedule_paper_processing(
|
|
paper_id=99999, # Non-existent paper
|
|
delay_seconds=1,
|
|
job_id="test_error_job"
|
|
)
|
|
print("✅ Scheduling with invalid paper ID handled gracefully")
|
|
except Exception as e:
|
|
print(f"✅ Scheduling with invalid paper ID properly raises exception: {e}")
|
|
|
|
# Test 12: Cleanup and shutdown
|
|
print("\n📋 Test 12: Cleanup and Shutdown")
|
|
|
|
# Stop scraper
|
|
stop_result = manager.stop_scraper()
|
|
if stop_result["status"] != "success":
|
|
print(f"❌ Failed to stop scraper: {stop_result['message']}")
|
|
return False
|
|
|
|
print("✅ Scraper stopped successfully")
|
|
|
|
# Final job count should be minimal (only hourly scheduler)
|
|
final_job_count = scheduler.get_job_count()
|
|
final_paper_jobs = len(scheduler.get_paper_jobs())
|
|
|
|
print(f"📊 Final state:")
|
|
print(f" 📝 Total jobs: {final_job_count}")
|
|
print(f" 📝 Paper jobs: {final_paper_jobs}")
|
|
|
|
if final_paper_jobs > 0:
|
|
print("❌ Paper jobs still exist after cleanup")
|
|
return False
|
|
|
|
print("✅ Cleanup completed successfully")
|
|
|
|
print("\n🎉 ALL SCHEDULER TESTS PASSED!")
|
|
print("\n📋 Test Summary:")
|
|
print(" ✅ APScheduler initialization works")
|
|
print(" ✅ Database tables created and accessible")
|
|
print(" ✅ Job scheduling functionality works")
|
|
print(" ✅ Job information retrieval works")
|
|
print(" ✅ Job revocation works")
|
|
print(" ✅ Multiple job scheduling works")
|
|
print(" ✅ ScraperManager integration works")
|
|
print(" ✅ Hourly scheduler configured correctly")
|
|
print(" ✅ Configuration-based scheduling works")
|
|
print(" ✅ Activity logging integration works")
|
|
print(" ✅ Error handling works")
|
|
print(" ✅ Cleanup and shutdown works")
|
|
|
|
return True
|
|
|
|
|
|
def test_job_execution():
|
|
"""Test that jobs actually execute (requires waiting)."""
|
|
print("\n🔄 Testing Job Execution (5-second test)")
|
|
print("-" * 40)
|
|
|
|
app = create_app({
|
|
'TESTING': True,
|
|
'SQLALCHEMY_DATABASE_URI': 'sqlite:///:memory:',
|
|
})
|
|
|
|
with app.app_context():
|
|
# Initialize database and scheduler
|
|
db.create_all()
|
|
scheduler = app.config.get('SCHEDULER')
|
|
if not scheduler:
|
|
print("❌ Scheduler not initialized")
|
|
return False
|
|
|
|
# Create test paper
|
|
test_paper = PaperMetadata(
|
|
title="Test Paper for Execution",
|
|
doi="10.1000/test_execution",
|
|
issn="1234-5678",
|
|
journal="Test Journal",
|
|
status="Pending"
|
|
)
|
|
db.session.add(test_paper)
|
|
db.session.commit()
|
|
|
|
# Verify paper is added to the database
|
|
test_paper_id = test_paper.id
|
|
if not test_paper_id:
|
|
print("❌ Test paper not added to the database")
|
|
return False
|
|
|
|
# Schedule paper for processing in 2 seconds
|
|
job_id = scheduler.schedule_paper_processing(
|
|
paper_id=test_paper_id,
|
|
delay_seconds=2
|
|
)
|
|
|
|
print(f"📅 Scheduled job {job_id} for execution in 2 seconds")
|
|
|
|
# Wait and check for execution
|
|
print("⏳ Waiting for job execution...")
|
|
time.sleep(3)
|
|
|
|
# Check if job completed (should be removed from scheduler)
|
|
remaining_jobs = scheduler.get_paper_jobs()
|
|
|
|
if remaining_jobs:
|
|
print(f"⚠️ Job still in scheduler: {len(remaining_jobs)} remaining")
|
|
for job in remaining_jobs:
|
|
print(f" 📝 Job ID: {job['id']}, Next Run Time: {job['next_run_time']}")
|
|
else:
|
|
print("✅ Job executed and removed from scheduler")
|
|
|
|
# Check activity logs for execution evidence
|
|
execution_logs = ActivityLog.query.filter(
|
|
ActivityLog.action.like('%process_single_paper%')
|
|
).order_by(ActivityLog.timestamp.desc()).limit(3).all()
|
|
|
|
if execution_logs:
|
|
print("✅ Job execution logged in activity:")
|
|
for log in execution_logs:
|
|
print(f" 📝 {log.action}: {log.description}")
|
|
else:
|
|
print("⚠️ No execution logs found")
|
|
|
|
# Validate job execution status in the database
|
|
updated_paper = PaperMetadata.query.get(test_paper_id)
|
|
if updated_paper:
|
|
print(f"🔍 Retrieved paper: {updated_paper.title}, Status: {updated_paper.status}")
|
|
if updated_paper.status == "Done":
|
|
print("✅ Paper status updated to 'Done'")
|
|
else:
|
|
print(f"❌ Paper status not updated: {updated_paper.status}")
|
|
else:
|
|
print("❌ Paper not found in the database")
|
|
|
|
return True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(f"📅 Starting scheduler tests at {datetime.now()}")
|
|
|
|
try:
|
|
# Run main functionality tests
|
|
success = test_scheduler_functionality()
|
|
|
|
if success:
|
|
print("\n" + "="*50)
|
|
# Run execution test if main tests pass
|
|
test_job_execution()
|
|
|
|
print(f"\n📅 Tests completed at {datetime.now()}")
|
|
sys.exit(0 if success else 1)
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n⏹️ Tests interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n❌ Test error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|