diff --git a/.gitignore b/.gitignore index 5e539c8..49f9c23 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ dist/ migrations/ -celerybeat-schedule* \ No newline at end of file +# APScheduler job store files +jobs.sqlite \ No newline at end of file diff --git a/Makefile b/Makefile index 35ef8ae..22d24dc 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,9 @@ # List of phony targets (targets that don't represent files) -.PHONY: all clean venv run format format-check lint mypy test dist reformat dev celery celery-flower redis run-all diagnostics +.PHONY: all clean venv run format format-check lint mypy test dist reformat dev run-scheduler diagnostics # Define Python and pip executables inside virtual environment PYTHON := venv/bin/python PIP := venv/bin/pip -CELERY := venv/bin/celery FLASK := venv/bin/flask # Default target that runs the application @@ -133,65 +132,12 @@ dist: format-check lint mypy test # Set up complete development environment dev: clean venv -# Start Celery worker - PURGE FIRST -celery: venv redis - @echo "Purging Celery task queue before starting worker..." - # Purge the queue forcefully. Ignore errors if queue is empty/unreachable initially. - @-$(CELERY) -A celery_worker:celery purge -f - @echo "Starting Celery worker..." - $(CELERY) -A celery_worker:celery worker --loglevel=info - -# Monitor Celery tasks with flower web interface -celery-flower: venv - $(PIP) install flower - $(CELERY) -A celery_worker:celery flower --port=5555 - -# Run Celery beat scheduler for periodic tasks -celery-beat: venv redis - @echo "Starting Celery beat scheduler..." - # Ensure celerybeat-schedule file is removed for clean start if needed - @-rm -f celerybeat-schedule.db - # Use the default file-based scheduler (removed the --scheduler flag) - $(CELERY) -A celery_worker:celery beat --loglevel=info - -# Check if Redis is running, start if needed -redis: - @if ! redis-cli ping > /dev/null 2>&1; then \ - echo "Starting Redis server..."; \ - redis-server --daemonize yes; \ - sleep 1; \ - else \ - echo "Redis is already running."; \ - fi - -# Run complete application stack (Flask app + Celery worker + Redis + Beat scheduler) -run-all: redis - @echo "Starting Flask, Celery worker and Beat scheduler..." - # Run them in parallel. Ctrl+C will send SIGINT to make, which propagates. - # Use trap to attempt cleanup, but primary cleanup is purge on next start. - @trap '$(MAKE) stop-all;' INT TERM; \ - $(MAKE) -j3 run celery celery-beat & wait - -# Stop running Celery worker and beat gracefully -stop-celery: - @echo "Attempting graceful shutdown of Celery worker and beat..." - @-pkill -TERM -f "celery -A celery_worker:celery worker" || echo "Worker not found or already stopped." - @-pkill -TERM -f "celery -A celery_worker:celery beat" || echo "Beat not found or already stopped." - @sleep 1 # Give processes a moment to terminate - @echo "Purging remaining tasks from Celery queue..." - @-$(CELERY) -A celery_worker:celery purge -f || echo "Purge failed or queue empty." - -# Stop Flask development server -stop-flask: - @echo "Attempting shutdown of Flask development server..." - @-pkill -TERM -f "flask --app scipaperloader --debug run" || echo "Flask server not found or already stopped." - -# Stop all components potentially started by run-all -stop-all: stop-celery stop-flask - @echo "All components stopped." +# Start the APScheduler-enabled Flask application +run-scheduler: venv + @echo "Starting Flask app with APScheduler..." + $(PYTHON) -m flask --app scipaperloader --debug run # Run diagnostic tools -# Run diagnostic tools - works with or without virtualenv diagnostics: $(PYTHON) tools/run_diagnostics.py diff --git a/README.md b/README.md index c777625..beea725 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,6 @@ And open it in the browser at [http://localhost:5000/](http://localhost:5000/) ## Prerequisites - Python >=3.8 -- Redis (for Celery task queue) ## Development environment @@ -41,30 +40,39 @@ And open it in the browser at [http://localhost:5000/](http://localhost:5000/) add development dependencies under `project.optional-dependencies.*`; run `make clean && make venv` to reinstall the environment -## Asynchronous Task Processing with Celery +## Task Processing Architecture -SciPaperLoader uses Celery for processing large CSV uploads and other background tasks. This allows the application to handle large datasets reliably without blocking the web interface. +SciPaperLoader uses **APScheduler** for all task processing: -### Running Celery Components +- **Periodic Tasks**: Hourly scraper scheduling with randomized paper processing +- **Background Tasks**: CSV uploads, manual paper processing, and all async operations +- **Job Management**: Clean job scheduling, revocation, and status tracking -- `make redis`: ensures Redis server is running (required for Celery) +This unified architecture provides reliable task processing with simple, maintainable code. -- `make celery`: starts a Celery worker to process background tasks +### Running Components -- `make celery-flower`: starts Flower, a web interface for monitoring Celery tasks at http://localhost:5555 +- `make run`: starts the Flask application with integrated APScheduler -- `make run-all`: runs the entire stack (Flask app + Celery worker + Redis) in development mode +For development monitoring: +- Access the Flask admin interface for APScheduler job monitoring +- View real-time logs in the application's activity log section ### How It Works -When you upload a CSV file through the web interface: +**For CSV Uploads:** +1. File is uploaded through the web interface +2. APScheduler creates a background job to process the file +3. Browser shows progress updates via AJAX polling +4. Results are displayed when processing completes -1. The file is sent to the server -2. A Celery task is created to process the file asynchronously -3. The browser shows a progress bar with real-time updates -4. The results are displayed when processing is complete +**For Scheduled Scraping:** +1. APScheduler runs hourly at the top of each hour +2. Papers are selected based on volume and schedule configuration +3. Individual paper processing jobs are scheduled at random times within the hour +4. All jobs are tracked in the database with complete visibility -This architecture allows SciPaperLoader to handle CSV files with thousands of papers without timing out or blocking the web interface. +This unified architecture provides reliable task processing without external dependencies. ## Configuration @@ -72,12 +80,12 @@ Default configuration is loaded from `scipaperloader.defaults` and can be overriden by environment variables with a `FLASK_` prefix. See [Configuring from Environment Variables](https://flask.palletsprojects.com/en/3.0.x/config/#configuring-from-environment-variables). -### Celery Configuration +### Task Processing Configuration -The following environment variables can be set to configure Celery: +APScheduler automatically uses your configured database for job persistence. No additional configuration required. -- `FLASK_CELERY_BROKER_URL`: Redis URL for the message broker (default: `redis://localhost:6379/0`) -- `FLASK_CELERY_RESULT_BACKEND`: Redis URL for storing task results (default: `redis://localhost:6379/0`) +For advanced configuration, you can set: +- `FLASK_SQLALCHEMY_DATABASE_URI`: Database URL (APScheduler uses the same database) Consider using [dotenv](https://flask.palletsprojects.com/en/3.0.x/cli/#environment-variables-from-dotenv). @@ -115,17 +123,18 @@ You must set a [SECRET_KEY](https://flask.palletsprojects.com/en/3.0.x/tutorial/deploy/#configure-the-secret-key) in production to a secret and stable value. -### Deploying with Celery +### Deploying with APScheduler When deploying to production: -1. Configure a production-ready Redis instance or use a managed service -2. Run Celery workers as system services or in Docker containers -3. Consider setting up monitoring for your Celery tasks and workers +1. APScheduler jobs are automatically persistent in your database +2. The Flask application handles all background processing internally +3. No external message broker or workers required +4. Scale by running multiple Flask instances with shared database ## Troubleshooting and Diagnostics -SciPaperLoader includes a collection of diagnostic and emergency tools to help address issues with the application, particularly with the scraper and Celery task system. +SciPaperLoader includes a collection of diagnostic and emergency tools to help address issues with the application, particularly with the scraper and APScheduler task system. ### Quick Access @@ -151,7 +160,7 @@ All diagnostic tools are located in the `tools/diagnostics/` directory: - **check_state.py**: Quickly check the current state of the scraper in the database - **diagnose_scraper.py**: Comprehensive diagnostic tool that examines tasks, logs, and scraper state -- **inspect_tasks.py**: View currently running, scheduled, and reserved Celery tasks +- **inspect_tasks.py**: View currently running and scheduled APScheduler tasks - **test_reversion.py**: Test the paper reversion functionality when stopping the scraper ### Emergency Recovery @@ -159,7 +168,7 @@ All diagnostic tools are located in the `tools/diagnostics/` directory: For cases where the scraper is stuck or behaving unexpectedly: - **emergency_stop.py**: Force stops all scraper activities, revokes all running tasks, and reverts papers from "Pending" state -- **quick_fix.py**: Simplified emergency stop that also restarts Celery workers to ensure code changes are applied +- **quick_fix.py**: Simplified emergency stop that also stops Flask processes to ensure code changes are applied ### Usage Example diff --git a/celery_worker.py b/celery_worker.py deleted file mode 100644 index 8911111..0000000 --- a/celery_worker.py +++ /dev/null @@ -1,11 +0,0 @@ -from scipaperloader.celery import celery, configure_celery -# Import all task modules to ensure they are registered with Celery -import scipaperloader.scrapers.tasks # Import new scheduler tasks -import scipaperloader.blueprints.scraper # Import the scraper module with our tasks - -# Configure celery with Flask app -configure_celery() - -if __name__ == '__main__': - # Start the Celery worker - celery.start(['worker', '--loglevel=info', '--concurrency=2']) \ No newline at end of file diff --git a/dump.rdb b/dump.rdb deleted file mode 100644 index e10cf16..0000000 Binary files a/dump.rdb and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index e034303..ec81343 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,9 +13,7 @@ dependencies = [ "flask-wtf>=1.2.2,<2", "pyzotero>=1.6.11,<2", "pandas>=2.2.3,<3", - "celery>=5.5.1,<6", - "redis>=5.2.1,<6", - "flower>=2.0.1,<3", + "APScheduler>=3.10.4,<4", "flask-migrate>=4.1.0,<5", ] diff --git a/scipaperloader/__init__.py b/scipaperloader/__init__.py index 2e4151f..d8f814e 100644 --- a/scipaperloader/__init__.py +++ b/scipaperloader/__init__.py @@ -5,15 +5,12 @@ from .db import db from .models import init_schedule_config from .models import ActivityLog, ActivityCategory from .blueprints import register_blueprints +from .scheduler import ScraperScheduler def create_app(test_config=None): app = Flask(__name__) app.config.from_object(Config) - # Celery configuration - app.config['CELERY_BROKER_URL'] = app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0') - app.config['CELERY_RESULT_BACKEND'] = app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0') - if test_config: app.config.update(test_config) @@ -24,6 +21,12 @@ def create_app(test_config=None): db.create_all() init_schedule_config() + # Initialize APScheduler + scheduler = ScraperScheduler(app) + + # Store scheduler in app config for access from other modules + app.config['SCHEDULER'] = scheduler + @app.context_processor def inject_app_title(): return {"app_title": app.config["APP_TITLE"]} diff --git a/scipaperloader/blueprints/scraper.py b/scipaperloader/blueprints/scraper.py index 44e863a..725ba8e 100644 --- a/scipaperloader/blueprints/scraper.py +++ b/scipaperloader/blueprints/scraper.py @@ -1,7 +1,7 @@ """ Simplified scraper blueprint using the new ScraperManager and hourly scheduling system. """ -from flask import Blueprint, jsonify, render_template, request +from flask import Blueprint, jsonify, render_template, request, current_app from ..models import ActivityLog, PaperMetadata, ScraperState, VolumeConfig from ..scrapers.manager import ScraperManager from ..scrapers.factory import get_available_scrapers @@ -346,8 +346,6 @@ def process_papers_manually(): def trigger_immediate_processing(): """Trigger immediate processing of papers without waiting for hourly schedule.""" try: - from ..scrapers.tasks import process_papers_batch - # Get papers that should be processed this hour manager = ScraperManager() papers = manager.select_papers_for_processing() @@ -359,23 +357,37 @@ def trigger_immediate_processing(): "papers_scheduled": 0 }) - # Get paper IDs for batch processing - paper_ids = [paper.id for paper in papers] + # Get APScheduler instance + scheduler = current_app.config.get('SCHEDULER') + if not scheduler: + return jsonify({ + "success": False, + "message": "APScheduler not available" + }), 500 - # Trigger immediate batch processing (no delay) - task = process_papers_batch.delay(paper_ids) + # Schedule papers for immediate processing via APScheduler + scheduled_count = 0 + for paper in papers: + try: + job_id = f"immediate_paper_{paper.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + scheduler.schedule_paper_processing(paper.id, delay_seconds=1, job_id=job_id) + scheduled_count += 1 + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to schedule paper {paper.id}: {str(e)}", + source="trigger_immediate_processing" + ) ActivityLog.log_scraper_command( action="trigger_immediate_processing", status="success", - description=f"Triggered immediate processing of {len(paper_ids)} papers" + description=f"Triggered immediate processing of {scheduled_count} papers via APScheduler" ) return jsonify({ "success": True, - "message": f"Immediate processing started for {len(paper_ids)} papers", - "papers_scheduled": len(paper_ids), - "task_id": task.id + "message": f"Immediate processing started for {scheduled_count} papers", + "papers_scheduled": scheduled_count }) except Exception as e: @@ -472,20 +484,35 @@ def process_single_paper_endpoint(paper_id): "message": "Paper not found" }), 404 - # Process the paper using the manager - result = scraper_manager.process_paper(paper) + # Get APScheduler instance + scheduler = current_app.config.get('SCHEDULER') + if not scheduler: + return jsonify({ + "success": False, + "message": "APScheduler not available" + }), 500 - ActivityLog.log_scraper_command( - action="manual_process_single", - status="success", - description=f"Manually processed paper {paper.doi}" - ) - - return jsonify({ - "success": True, - "message": f"Processing started for paper {paper.doi}", - "paper_id": paper_id - }) + # Schedule the paper for immediate processing via APScheduler + job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + try: + scheduler.schedule_paper_processing(paper_id, delay_seconds=1, job_id=job_id) + + ActivityLog.log_scraper_command( + action="manual_process_single", + status="success", + description=f"Scheduled manual processing for paper {paper.doi} via APScheduler" + ) + + return jsonify({ + "success": True, + "message": f"Processing scheduled for paper {paper.doi}", + "paper_id": paper_id + }) + except Exception as e: + return jsonify({ + "success": False, + "message": f"Failed to schedule processing: {str(e)}" + }), 500 except Exception as e: ActivityLog.log_scraper_command( diff --git a/scipaperloader/blueprints/upload.py b/scipaperloader/blueprints/upload.py index 56173ad..c43b189 100644 --- a/scipaperloader/blueprints/upload.py +++ b/scipaperloader/blueprints/upload.py @@ -2,8 +2,11 @@ import codecs import csv import datetime -from io import StringIO +import traceback +from io import StringIO, BytesIO import json +import uuid +from typing import Dict, Any import pandas as pd from flask import ( @@ -21,7 +24,6 @@ from flask import ( from ..db import db from ..models import PaperMetadata, ActivityLog -from ..celery import celery # Import the celery instance directly from ..defaults import DUPLICATE_STRATEGIES bp = Blueprint("upload", __name__) @@ -29,6 +31,10 @@ bp = Blueprint("upload", __name__) REQUIRED_COLUMNS = {"alternative_id", "journal", "doi", "issn", "title"} CHUNK_SIZE = 100 # Number of rows to process per batch +# Store task progress in memory (for simplicity) +# In production, you might want to use Redis or database +task_progress = {} + def parse_date(date_str): """Parse date string into datetime object.""" if not date_str or pd.isna(date_str): @@ -38,6 +44,76 @@ def parse_date(date_str): except ValueError: return None +def _process_csv_background(task_id: str, file_content: str, delimiter: str, duplicate_strategy: str): + """Background function to process CSV file using APScheduler.""" + print(f"DEBUG: _process_csv_background called with task_id: {task_id}") + + # Get Flask app for context + from flask import current_app + + # Get the Flask app from the scheduler context + from ..scheduler import _get_flask_app + app = _get_flask_app() + + print(f"DEBUG: Flask app obtained: {app}") + + if not app: + # Fallback: try to get current_app + try: + app = current_app + print(f"DEBUG: Using current_app: {app}") + except RuntimeError as e: + print(f"DEBUG: Failed to get current_app: {e}") + task_progress[task_id] = { + "state": "FAILURE", + "progress": 0, + "error": "Flask app context not available" + } + return + + with app.app_context(): + try: + print(f"DEBUG: Inside app context, starting CSV processing for task {task_id}") + + # Initialize progress + task_progress[task_id] = { + "state": "PROGRESS", + "progress": 0, + "message": "Starting CSV processing..." + } + + result = process_csv(file_content, delimiter, duplicate_strategy, task_id) + + print(f"DEBUG: CSV processing completed for task {task_id}, result: {result}") + + # Mark as completed + task_progress[task_id] = { + "state": "SUCCESS", + "progress": 100, + "result": result + } + + except Exception as e: + print(f"DEBUG: Exception in _process_csv_background: {e}") + import traceback + traceback.print_exc() + + # Mark as failed + task_progress[task_id] = { + "state": "FAILURE", + "progress": 0, + "error": str(e) + } + + try: + ActivityLog.log_error( + error_message=f"Background CSV processing failed: {str(e)}", + source="upload._process_csv_background" + ) + except Exception: + # If logging fails, just print the error + print(f"Background CSV processing failed: {str(e)}") + @bp.route("/", methods=["GET", "POST"]) def upload(): if request.method == "POST": @@ -51,23 +127,75 @@ def upload(): stream = codecs.iterdecode(file.stream, "utf-8") content = "".join(stream) - # Trigger the Celery task - task = process_csv.delay(content, delimiter, duplicate_strategy) + # Generate task ID + task_id = str(uuid.uuid4()) - return jsonify({"task_id": task.id}) + # Get the APScheduler instance from the global variable + from ..scheduler import _scheduler + if not _scheduler: + return jsonify({"error": "APScheduler not initialized."}) + + if not _scheduler.running: + return jsonify({"error": "APScheduler not running."}) + + # Initialize task progress immediately + task_progress[task_id] = { + "state": "PENDING", + "progress": 0, + "message": "Task queued for processing..." + } + + # Schedule background task + job_id = f"csv_upload_{task_id}" + # Use UTC time to match APScheduler's timezone configuration + run_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=1) # Start in 1 second + + try: + _scheduler.add_job( + func=_process_csv_background, + trigger='date', + run_date=run_time, + args=[task_id, content, delimiter, duplicate_strategy], + id=job_id, + name=f"CSV Upload {task_id}", + replace_existing=True + ) + + ActivityLog.log_import_activity( + action="schedule_csv_upload", + status="info", + description=f"Scheduled CSV upload task {task_id}", + task_id=task_id + ) + + except Exception as e: + task_progress[task_id] = { + "state": "FAILURE", + "progress": 0, + "error": f"Failed to schedule task: {str(e)}" + } + return jsonify({"error": f"Failed to schedule background task: {str(e)}"}) + + return jsonify({"task_id": task_id}) return render_template("upload.html.jinja", duplicate_strategies=DUPLICATE_STRATEGIES) -@celery.task(bind=True) -def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): +def process_csv(file_content, delimiter, duplicate_strategy="skip", task_id=None): """Process CSV file and import paper metadata.""" - # With the ContextTask in place, we're already inside an app context added_count = skipped_count = updated_count = error_count = 0 errors = [] skipped_records = [] # Add this to track skipped records try: + # Update task progress if provided + if task_id: + task_progress[task_id] = { + "state": "PROGRESS", + "progress": 10, + "message": "Starting CSV import..." + } + # Log the start of import using ActivityLog model ActivityLog.log_import_activity( action="start_csv_import", @@ -77,9 +205,6 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): delimiter=delimiter ) - # Set initial progress percentage - self.update_state(state='PROGRESS', meta={'progress': 10}) - # Read CSV into chunks csv_buffer = StringIO(file_content) # Count total chunks @@ -116,16 +241,16 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): skipped_count += 1 continue else: - metadata = PaperMetadata( - title=row["title"], - doi=doi, - alt_id=row.get("alternative_id"), - issn=row["issn"], + paper = PaperMetadata( + title=row.get("title"), + doi=row.get("doi"), + alt_id=row.get("alt_id") or row.get("alternative_id"), # Handle both column names + issn=row.get("issn"), journal=row.get("journal"), published_online=parse_date(row.get("published_online")), - status="New", + status="New" ) - db.session.add(metadata) + db.session.add(paper) added_count += 1 except Exception as e: error_count += 1 @@ -134,6 +259,15 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): # Commit the chunk and roll session fresh db.session.commit() + # Update progress + if task_id: + progress = min(90, 10 + int((chunk_idx + 1) * 80 / total_chunks)) + task_progress[task_id] = { + "state": "PROGRESS", + "progress": progress, + "message": f"Processed {chunk_idx+1}/{total_chunks} chunks" + } + # Log periodic progress every 5 chunks if (chunk_idx + 1) % 5 == 0: ActivityLog.log_import_activity( @@ -148,11 +282,14 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): } ) - progress = min(90, 10 + int((chunk_idx + 1) * 80 / total_chunks)) - self.update_state(state='PROGRESS', meta={'progress': progress}) - # Final progress update and completion log - self.update_state(state='PROGRESS', meta={'progress': 100}) + if task_id: + task_progress[task_id] = { + "state": "PROGRESS", + "progress": 100, + "message": "Finalizing import..." + } + ActivityLog.log_import_activity( action="complete_csv_import", status="success", @@ -167,6 +304,12 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): except Exception as e: db.session.rollback() + if task_id: + task_progress[task_id] = { + "state": "FAILURE", + "progress": 0, + "error": str(e) + } ActivityLog.log_error( error_message="CSV import failed", exception=e, @@ -189,7 +332,7 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): status="error", description=f"Import completed with {error_count} errors", error_csv=error_csv.getvalue(), - task_id=self.request.id, + task_id=task_id, error_count=error_count ) except Exception: @@ -204,41 +347,23 @@ def process_csv(self, file_content, delimiter, duplicate_strategy="skip"): "skipped_records": skipped_records[:5], # Include up to 5 examples "skipped_reason_summary": "Records were skipped because they already exist in the database. Use 'update' strategy to update them.", "errors": errors[:5], - "error_count": error_count, - "task_id": self.request.id + "error_count": error_count } @bp.route("/task_status/") def task_status(task_id): """Get status of background task.""" - task = celery.AsyncResult(task_id) + progress_data = task_progress.get(task_id) + if not progress_data: + return jsonify({"error": "Task not found."}) - if task.state == "PENDING": - response = {"state": task.state, "progress": 0} - elif task.state == "PROGRESS": - response = { - "state": task.state, - "progress": task.info.get("progress", 0) - } - elif task.state == "SUCCESS": - response = { - "state": task.state, - "result": task.result - } - else: # FAILURE, REVOKED, etc. - response = { - "state": task.state, - "error": str(task.info) if task.info else "Unknown error" - } - - return jsonify(response) + return jsonify(progress_data) @bp.route("/download_error_log/") def download_error_log(task_id): # Find the most recent error log for this task error_log = ActivityLog.query.filter( - ActivityLog.action == "import_errors", - ActivityLog.extra_data.like(f'%"{task_id}"%') # Search in JSON + ActivityLog.action == "import_errors" ).order_by(ActivityLog.timestamp.desc()).first() if not error_log: @@ -255,7 +380,7 @@ def download_error_log(task_id): buffer = StringIO(error_csv) return send_file( - buffer, + BytesIO(buffer.getvalue().encode()), # Corrected to use BytesIO mimetype="text/csv", as_attachment=True, download_name=f"upload_errors_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" diff --git a/scipaperloader/celery.py b/scipaperloader/celery.py deleted file mode 100644 index f6e7d02..0000000 --- a/scipaperloader/celery.py +++ /dev/null @@ -1,52 +0,0 @@ -from celery import Celery -from celery.schedules import crontab - -# Create Celery instance without Flask app initially -celery = Celery( - 'scipaperloader', - broker='redis://localhost:6379/0', - backend='redis://localhost:6379/0', -) - -def configure_celery(app=None): - """Configure Celery with the Flask app settings and ensure tasks run in the app context.""" - if app is None: - # Import here to avoid circular import - from scipaperloader import create_app - app = create_app() - - # Update Celery configuration using the app settings - celery.conf.update( - broker_url=app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), - result_backend=app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), - task_serializer='json', - accept_content=['json'], - result_serializer='json', - timezone='UTC', - enable_utc=True, - task_time_limit=3600, # 1 hour max runtime - task_soft_time_limit=3000, # 50 minutes soft limit - worker_max_tasks_per_child=10, # Restart workers after 10 tasks - worker_max_memory_per_child=1000000, # 1GB memory limit - task_acks_late=True, # Acknowledge tasks after completion - task_reject_on_worker_lost=True, # Requeue tasks if worker dies - # Configure Beat schedule for periodic tasks - beat_schedule={ - 'hourly-scraper-scheduler': { - 'task': 'scipaperloader.scrapers.tasks.hourly_scraper_scheduler', - 'schedule': crontab(minute=0), # Run at the start of every hour - 'options': {'expires': 3600} - }, - } - ) - - # Create a custom task class that pushes the Flask application context - class ContextTask(celery.Task): - abstract = True - - def __call__(self, *args, **kwargs): - with app.app_context(): - return self.run(*args, **kwargs) - - celery.Task = ContextTask - return celery \ No newline at end of file diff --git a/scipaperloader/scheduler.py b/scipaperloader/scheduler.py new file mode 100644 index 0000000..dd71089 --- /dev/null +++ b/scipaperloader/scheduler.py @@ -0,0 +1,449 @@ +""" +APScheduler-based scheduling system to replace complex Celery delayed task management. +This provides clean job scheduling and revocation without manual Redis manipulation. +""" + +import random +import logging +from datetime import datetime, timedelta +from typing import Optional, List +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.executors.pool import ThreadPoolExecutor +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED + +# Configure APScheduler logging +logging.getLogger('apscheduler').setLevel(logging.WARNING) + +# Global scheduler instance +_scheduler = None +_flask_app = None + + +def _get_flask_app(): + """Get the Flask app instance.""" + global _flask_app + if _flask_app: + return _flask_app + + try: + from flask import current_app + return current_app + except RuntimeError: + return None + + +def _hourly_scraper_scheduler(): + """Standalone function for hourly scheduling logic.""" + app = _get_flask_app() + if not app: + return + + with app.app_context(): + try: + from .models import ScraperState, ActivityLog + + # Check if scraper is active + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="hourly_scheduler_apscheduler", + status="info", + description="Hourly scheduler skipped - scraper not active" + ) + return {"status": "inactive", "papers_scheduled": 0} + + if scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="hourly_scheduler_apscheduler", + status="info", + description="Hourly scheduler skipped - scraper paused" + ) + return {"status": "paused", "papers_scheduled": 0} + + # Get papers to process this hour + from .scrapers.manager import ScraperManager + manager = ScraperManager() + papers = manager.select_papers_for_processing() + + if not papers: + ActivityLog.log_scraper_activity( + action="hourly_scheduler_apscheduler", + status="info", + description="No papers available for processing this hour" + ) + return {"status": "empty", "papers_scheduled": 0} + + # Schedule papers at random times within the hour + scheduled_count = 0 + current_time = datetime.now() + + for paper in papers: + # Random delay between 1 second and 58 minutes + delay_seconds = random.randint(1, 3480) # Up to 58 minutes + run_time = current_time + timedelta(seconds=delay_seconds) + + # Schedule the individual paper processing job + job_id = f"process_paper_{paper.id}_{int(current_time.timestamp())}" + + global _scheduler + if _scheduler: + _scheduler.add_job( + func=_process_single_paper, + trigger='date', + run_date=run_time, + args=[paper.id], + id=job_id, + replace_existing=False, + name=f"Process Paper {paper.doi}" + ) + + scheduled_count += 1 + + # Log each scheduled paper + ActivityLog.log_scraper_activity( + action="schedule_paper_apscheduler", + paper_id=paper.id, + status="info", + description=f"Scheduled paper {paper.doi} for processing at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})" + ) + + ActivityLog.log_scraper_activity( + action="hourly_scheduler_apscheduler", + status="success", + description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler" + ) + + return {"status": "success", "papers_scheduled": scheduled_count} + + except Exception as e: + from .models import ActivityLog + ActivityLog.log_error( + error_message=f"APScheduler hourly scheduler error: {str(e)}", + source="_hourly_scraper_scheduler" + ) + return {"status": "error", "message": str(e)} + + +def _process_single_paper(paper_id: int): + """Standalone function to process a single paper.""" + app = _get_flask_app() + if not app: + return + + with app.app_context(): + try: + from .models import ScraperState, ActivityLog, PaperMetadata + + # Enhanced race condition protection + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="process_single_paper_apscheduler", + paper_id=paper_id, + status="skipped", + description="Task skipped - scraper not active (APScheduler)" + ) + return {"status": "inactive", "paper_id": paper_id} + + if scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="process_single_paper_apscheduler", + paper_id=paper_id, + status="skipped", + description="Task skipped - scraper paused (APScheduler)" + ) + return {"status": "paused", "paper_id": paper_id} + + # Get the paper + paper = PaperMetadata.query.get(paper_id) + if not paper: + return {"status": "error", "message": f"Paper {paper_id} not found"} + + # Final check before processing + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="process_single_paper_apscheduler", + paper_id=paper_id, + status="skipped", + description="Task skipped - scraper not active (pre-processing check)" + ) + return {"status": "inactive", "paper_id": paper_id} + + # Process the paper using scraper manager + from .scrapers.manager import ScraperManager + manager = ScraperManager() + result = manager.process_paper(paper) + + return result + + except Exception as e: + from .models import ActivityLog + ActivityLog.log_error( + error_message=f"Error processing paper {paper_id} in APScheduler: {str(e)}", + source="_process_single_paper" + ) + return {"status": "error", "paper_id": paper_id, "message": str(e)} + + +def _job_listener(event): + """Listen to job execution events.""" + app = _get_flask_app() + if not app: + return + + with app.app_context(): + try: + from .models import ActivityLog + + job_id = event.job_id + + if event.exception: + ActivityLog.log_error( + error_message=f"APScheduler job {job_id} failed: {str(event.exception)}", + source="ScraperScheduler.job_listener" + ) + elif hasattr(event, 'retval') and event.retval: + # Job completed successfully + if job_id.startswith('process_paper_'): + ActivityLog.log_scraper_activity( + action="apscheduler_job_complete", + status="success", + description=f"Job {job_id} completed successfully" + ) + except Exception as e: + # Don't let logging errors break the scheduler + print(f"Error in job listener: {str(e)}") + + +class ScraperScheduler: + """APScheduler-based scraper task scheduler.""" + + def __init__(self, app=None): + self.app = app + if app: + self.init_app(app) + + @property + def scheduler(self): + """Expose the global _scheduler instance.""" + global _scheduler + return _scheduler + + def init_app(self, app): + """Initialize the scheduler with Flask app context.""" + global _scheduler, _flask_app + _flask_app = app + self.app = app + + # Initialize scheduler within app context to access db.engine properly + with app.app_context(): + # Use the existing Flask-SQLAlchemy database engine for APScheduler + from .db import db + + # Configure job store to use the existing database engine + jobstores = { + 'default': SQLAlchemyJobStore(engine=db.engine) + } + + # Configure thread pool executor + executors = { + 'default': ThreadPoolExecutor(max_workers=50) # Increased from 20 to 50 + } + + # Job defaults + job_defaults = { + 'coalesce': False, # Don't combine multiple scheduled instances + 'max_instances': 3, # Allow up to 3 instances of the same job + 'misfire_grace_time': 30 # 30 seconds grace period for missed jobs + } + + # Create the scheduler + _scheduler = BackgroundScheduler( + jobstores=jobstores, + executors=executors, + job_defaults=job_defaults, + timezone='UTC' + ) + + # Add event listeners + _scheduler.add_listener(_job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_MISSED) + + # Start the scheduler FIRST, which will auto-create tables + _scheduler.start() + + # THEN add the hourly scraper job + _scheduler.add_job( + func=_hourly_scraper_scheduler, + trigger='cron', + minute=0, # Run at the start of every hour + id='hourly_scraper_main', + replace_existing=True, + name='Hourly Scraper Scheduler' + ) + + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="apscheduler_init", + status="success", + description="APScheduler initialized with database job store and hourly scheduling" + ) + except Exception: + # Handle case where we're outside application context + print("✅ APScheduler initialized successfully") + + def revoke_all_scraper_jobs(self) -> int: + """Clean replacement for the complex _clear_delayed_tasks_from_redis method.""" + global _scheduler + if not _scheduler: + try: + from .models import ActivityLog + ActivityLog.log_error( + error_message="Scheduler not initialized - cannot revoke jobs", + source="ScraperScheduler.revoke_all_scraper_jobs" + ) + except Exception: + print("❌ Scheduler not initialized - cannot revoke jobs") + return 0 + + revoked_count = 0 + + try: + # Get all jobs + jobs = _scheduler.get_jobs() + + for job in jobs: + # Remove any job that processes papers or uploads (but keep the main hourly scheduler) + if ('paper_process_' in job.id or 'test_paper_process_' in job.id or + 'process_paper_' in job.id or 'csv_upload_' in job.id): + _scheduler.remove_job(job.id) + revoked_count += 1 + + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="revoke_apscheduler_job", + status="success", + description=f"Revoked APScheduler job: {job.name} (ID: {job.id})" + ) + except Exception: + print(f"✅ Revoked APScheduler job: {job.id}") + + if revoked_count > 0: + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="revoke_all_scraper_jobs_apscheduler", + status="success", + description=f"Successfully revoked {revoked_count} APScheduler jobs" + ) + except Exception: + print(f"✅ Successfully revoked {revoked_count} APScheduler jobs") + + return revoked_count + + except Exception as e: + try: + from .models import ActivityLog + ActivityLog.log_error( + error_message=f"Error revoking APScheduler jobs: {str(e)}", + source="ScraperScheduler.revoke_all_scraper_jobs" + ) + except Exception: + print(f"❌ Error revoking APScheduler jobs: {str(e)}") + return 0 + + def get_job_count(self) -> int: + """Get the number of scheduled jobs.""" + global _scheduler + if not _scheduler: + return 0 + return len(_scheduler.get_jobs()) + + def get_paper_jobs(self) -> List[dict]: + """Get information about scheduled paper processing jobs.""" + global _scheduler + if not _scheduler: + return [] + + jobs = [] + all_jobs = _scheduler.get_jobs() + + for job in all_jobs: + # Match jobs that contain paper processing patterns + if ('process_paper_' in job.id or 'paper_process_' in job.id or 'test_paper_process_' in job.id): + job_info = { + 'id': job.id, + 'name': job.name, + 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, + 'args': job.args + } + jobs.append(job_info) + + return jobs + + def shutdown(self): + """Gracefully shutdown the scheduler.""" + global _scheduler + if _scheduler: + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="apscheduler_shutdown", + status="info", + description="Shutting down APScheduler" + ) + except Exception: + print("🔄 Shutting down APScheduler") + + _scheduler.shutdown(wait=False) + _scheduler = None + + def schedule_paper_processing(self, paper_id: int, delay_seconds: int = 0, job_id: Optional[str] = None) -> str: + """Schedule a paper for processing with APScheduler. + + Args: + paper_id: ID of the paper to process + delay_seconds: Delay in seconds before processing (default: 0 for immediate) + job_id: Optional custom job ID (will be generated if not provided) + + Returns: + str: The job ID of the scheduled job + """ + global _scheduler + if not _scheduler: + raise RuntimeError("APScheduler not initialized") + + # Generate job ID if not provided + if not job_id: + job_id = f"process_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + # Calculate run time + run_time = datetime.now() + timedelta(seconds=delay_seconds) + + # Schedule the job + job = _scheduler.add_job( + func=_process_single_paper, + trigger='date', + run_date=run_time, + args=[paper_id], + id=job_id, + name=f"Process Paper {paper_id}", + replace_existing=True + ) + + # Log the scheduling + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="schedule_paper_processing_apscheduler", + paper_id=paper_id, + status="info", + description=f"Scheduled paper {paper_id} for processing at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})" + ) + except Exception: + print(f"✅ Scheduled paper {paper_id} for processing (Job ID: {job_id})") + + return job_id diff --git a/scipaperloader/scrapers/manager.py b/scipaperloader/scrapers/manager.py index 9e145e5..376e635 100644 --- a/scipaperloader/scrapers/manager.py +++ b/scipaperloader/scrapers/manager.py @@ -1,13 +1,14 @@ """ Simplified scraper management system with hourly quota scheduling. +Uses APScheduler for all task processing - no Celery dependencies. """ import random import math -import redis -from datetime import datetime, timedelta +from datetime import datetime, timedelta, UTC from typing import List, Dict, Optional from sqlalchemy import func +from flask import current_app from ..models import ( PaperMetadata, @@ -20,7 +21,6 @@ from ..models import ( from ..db import db from ..cache_utils import get_cached_hourly_quota from .factory import get_scraper, get_available_scrapers -from ..celery import celery class ScraperManager: @@ -29,238 +29,67 @@ class ScraperManager: def __init__(self): self.current_scraper = None self.pending_papers = [] # Track papers being processed - # Initialize Redis client for delayed task management - self.redis_client = None - self._init_redis_client() + # No more Redis client initialization - using APScheduler now - def _init_redis_client(self): - """Initialize Redis client for delayed task management.""" + def _get_scheduler(self): + """Get the APScheduler instance from Flask app config.""" try: - # Use same Redis configuration as Celery - self.redis_client = redis.Redis( - host='localhost', - port=6379, - db=0, - decode_responses=True - ) - # Test connection - self.redis_client.ping() - except Exception as e: - # Only log if we're in an application context - try: - ActivityLog.log_error( - error_message=f"Failed to initialize Redis client: {str(e)}", - source="ScraperManager._init_redis_client" - ) - except RuntimeError: - # Outside application context - just print to console - print(f"Warning: Failed to initialize Redis client: {str(e)}") - self.redis_client = None + return current_app.config.get('SCHEDULER') + except RuntimeError: + # Outside application context + return None - def _clear_delayed_tasks_from_redis(self) -> int: - """Clear delayed tasks from Redis structures used by Celery. - - Based on analysis, Celery stores delayed tasks in: - - 'unacked_index': Sorted set containing task IDs with execution timestamps - - 'unacked': Hash containing task data keyed by task ID + + def _clear_delayed_tasks_from_apscheduler(self) -> int: + """Clear delayed tasks from APScheduler - clean replacement for Redis manipulation. Returns: int: Number of delayed tasks cleared """ - if not self.redis_client: + scheduler = self._get_scheduler() + if not scheduler: try: ActivityLog.log_error( - error_message="Redis client not available - cannot clear delayed tasks", - source="ScraperManager._clear_delayed_tasks_from_redis" + error_message="APScheduler not available - cannot clear delayed tasks", + source="ScraperManager._clear_delayed_tasks_from_apscheduler" ) except RuntimeError: - # Working outside application context - just print instead - print("❌ Redis client not available - cannot clear delayed tasks") + print("❌ APScheduler not available - cannot clear delayed tasks") return 0 - cleared_count = 0 try: - # Define scraper task patterns to identify our tasks - scraper_patterns = [ - 'process_single_paper', - 'process_papers_batch', - 'hourly_scraper_scheduler' - ] + cleared_count = scheduler.revoke_all_scraper_jobs() - try: - ActivityLog.log_scraper_activity( - action="check_delayed_tasks", - status="info", - description="Checking Celery delayed task structures (unacked_index, unacked)" - ) - except RuntimeError: - print("🔍 Checking Celery delayed task structures (unacked_index, unacked)") - - # Check 'unacked_index' (sorted set with task IDs and timestamps) - unacked_index_cleared = 0 - if self.redis_client.exists('unacked_index'): - try: - # Get all task IDs from the sorted set - task_ids = self.redis_client.zrange('unacked_index', 0, -1) - - if task_ids: - try: - ActivityLog.log_scraper_activity( - action="scan_unacked_index", - status="info", - description=f"Found {len(task_ids)} tasks in 'unacked_index'" - ) - except RuntimeError: - print(f"📋 Found {len(task_ids)} tasks in 'unacked_index'") - - # Check each task ID against the 'unacked' hash to get task details - scraper_task_ids = [] - for task_id in task_ids: - try: - # Get task data from 'unacked' hash - task_data = self.redis_client.hget('unacked', task_id) - if task_data: - # Check if this task contains any of our scraper patterns - if any(pattern in str(task_data) for pattern in scraper_patterns): - scraper_task_ids.append(task_id) - except Exception: - # Skip individual task errors - continue - - # Remove scraper task IDs from both structures - for task_id in scraper_task_ids: - try: - # Remove from unacked_index (sorted set) - removed_from_index = self.redis_client.zrem('unacked_index', task_id) - # Remove from unacked (hash) - removed_from_hash = self.redis_client.hdel('unacked', task_id) - - if removed_from_index or removed_from_hash: - unacked_index_cleared += 1 - - except Exception as e: - try: - ActivityLog.log_error( - error_message=f"Error removing delayed task {task_id}: {str(e)}", - source="ScraperManager._clear_delayed_tasks_from_redis" - ) - except RuntimeError: - print(f"❌ Error removing delayed task {task_id}: {str(e)}") - continue - - cleared_count += unacked_index_cleared - - if unacked_index_cleared > 0: - try: - ActivityLog.log_scraper_activity( - action="clear_unacked_tasks", - status="success", - description=f"Cleared {unacked_index_cleared} scraper tasks from unacked structures" - ) - except RuntimeError: - print(f"✅ Cleared {unacked_index_cleared} scraper tasks from unacked structures") - else: - try: - ActivityLog.log_scraper_activity( - action="check_unacked_index", - status="info", - description="No tasks found in 'unacked_index'" - ) - except RuntimeError: - print("ℹ️ No tasks found in 'unacked_index'") - - except Exception as e: - try: - ActivityLog.log_error( - error_message=f"Error accessing 'unacked_index': {str(e)}", - source="ScraperManager._clear_delayed_tasks_from_redis" - ) - except RuntimeError: - print(f"❌ Error accessing 'unacked_index': {str(e)}") - else: - try: - ActivityLog.log_scraper_activity( - action="check_unacked_index", - status="info", - description="'unacked_index' key does not exist - no delayed tasks" - ) - except RuntimeError: - print("ℹ️ 'unacked_index' key does not exist - no delayed tasks") - - # Also check the 'celery' queue for immediate tasks (backup check) - celery_cleared = 0 - try: - queue_length = self.redis_client.llen('celery') - if queue_length and queue_length > 0: - # Scan for any scraper tasks in the immediate queue - scraper_tasks = [] - for i in range(queue_length): - try: - task_data = self.redis_client.lindex('celery', i) - if task_data and any(pattern in str(task_data) for pattern in scraper_patterns): - scraper_tasks.append(task_data) - except Exception: - continue - - # Remove scraper tasks from celery queue - for task_data in scraper_tasks: - try: - removed_count = self.redis_client.lrem('celery', 0, task_data) - celery_cleared += removed_count - except Exception: - continue - - cleared_count += celery_cleared - - if celery_cleared > 0: - try: - ActivityLog.log_scraper_activity( - action="clear_celery_tasks", - status="success", - description=f"Cleared {celery_cleared} scraper tasks from 'celery' queue" - ) - except RuntimeError: - print(f"✅ Cleared {celery_cleared} scraper tasks from 'celery' queue") - - except Exception as e: - try: - ActivityLog.log_error( - error_message=f"Error checking 'celery' queue: {str(e)}", - source="ScraperManager._clear_delayed_tasks_from_redis" - ) - except RuntimeError: - print(f"❌ Error checking 'celery' queue: {str(e)}") - - # Summary + # Summary logging if cleared_count > 0: try: ActivityLog.log_scraper_activity( - action="clear_delayed_tasks_complete", + action="clear_delayed_tasks_complete_apscheduler", status="success", - description=f"Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})" + description=f"Total delayed scraper tasks cleared from APScheduler: {cleared_count}" ) except RuntimeError: - print(f"✅ Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})") + print(f"✅ Total delayed scraper tasks cleared from APScheduler: {cleared_count}") else: try: ActivityLog.log_scraper_activity( - action="clear_delayed_tasks_complete", + action="clear_delayed_tasks_complete_apscheduler", status="info", - description="No delayed scraper tasks found to clear in Redis" + description="No delayed scraper tasks found to clear in APScheduler" ) except RuntimeError: - print("ℹ️ No delayed scraper tasks found to clear in Redis") + print("ℹ️ No delayed scraper tasks found to clear in APScheduler") return cleared_count except Exception as e: try: ActivityLog.log_error( - error_message=f"Failed to clear delayed tasks from Redis: {str(e)}", - source="ScraperManager._clear_delayed_tasks_from_redis" + error_message=f"Failed to clear delayed tasks from APScheduler: {str(e)}", + source="ScraperManager._clear_delayed_tasks_from_apscheduler" ) except RuntimeError: - print(f"❌ Failed to clear delayed tasks from Redis: {str(e)}") + print(f"❌ Failed to clear delayed tasks from APScheduler: {str(e)}") return 0 def start_scraper(self) -> Dict[str, str]: @@ -323,7 +152,7 @@ class ScraperManager: return {"status": "error", "message": str(e)} def stop_scraper(self) -> Dict[str, str]: - """Stop the scraper, revoke all running tasks, and revert pending papers.""" + """Stop the scraper, revoke all APScheduler jobs, and revert pending papers.""" try: # STEP 1: Immediately set scraper as inactive - this is critical for race condition prevention ScraperState.set_active(False) @@ -332,125 +161,20 @@ class ScraperManager: ActivityLog.log_scraper_command( action="stop_scraper_start", status="info", - description="Scraper stop initiated - marked as inactive. Beginning task revocation and delayed task clearing." + description="Scraper stop initiated - marked as inactive. Beginning APScheduler job revocation." ) - # STEP 2: Brief pause to allow running tasks to see the inactive state + # STEP 2: Brief pause to allow running jobs to see the inactive state import time time.sleep(0.2) - # STEP 3: Revoke all running tasks - revoked_count = 0 - delayed_cleared_count = 0 + # STEP 3: Revoke all APScheduler jobs + delayed_cleared_count = self._clear_delayed_tasks_from_apscheduler() - try: - # Get Celery inspector to check for running tasks - i = celery.control.inspect() - active = i.active() or {} - scheduled = i.scheduled() or {} - reserved = i.reserved() or {} - - # Revoke active tasks - for worker, tasks in active.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - ActivityLog.log_scraper_activity( - action="revoke_task", - status="success", - description=f"Revoked active task: {task.get('name', 'unknown')} (ID: {task['id']})" - ) - - # Revoke scheduled tasks - for worker, tasks in scheduled.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - ActivityLog.log_scraper_activity( - action="revoke_task", - status="success", - description=f"Revoked scheduled task: {task.get('name', 'unknown')} (ID: {task['id']})" - ) - - # Revoke reserved tasks - for worker, tasks in reserved.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - ActivityLog.log_scraper_activity( - action="revoke_task", - status="success", - description=f"Revoked reserved task: {task.get('name', 'unknown')} (ID: {task['id']})" - ) - - # Purge all task queues - celery.control.purge() - ActivityLog.log_scraper_activity( - action="purge_queues", - status="success", - description="Purged all task queues" - ) - - # STEP 4: Clear delayed tasks from Redis sorted sets - delayed_cleared_count = self._clear_delayed_tasks_from_redis() - - # Additional cleanup: revoke any remaining scraper-related tasks by name pattern - try: - # Use broadcast to revoke tasks that match scraper patterns - scraper_task_patterns = [ - 'process_single_paper', - 'process_papers_batch', - 'hourly_scraper_scheduler' - ] - - # Get a fresh inspection of tasks after purge - fresh_inspect = celery.control.inspect() - all_tasks = {} - all_tasks.update(fresh_inspect.active() or {}) - all_tasks.update(fresh_inspect.scheduled() or {}) - all_tasks.update(fresh_inspect.reserved() or {}) - - additional_revoked = 0 - for worker, tasks in all_tasks.items(): - for task in tasks: - task_name = task.get('name', '') - task_id = task.get('id', '') - if any(pattern in task_name for pattern in scraper_task_patterns) and task_id: - celery.control.revoke(task_id, terminate=True) - additional_revoked += 1 - ActivityLog.log_scraper_activity( - action="revoke_scraper_task", - status="success", - description=f"Revoked lingering scraper task: {task_name} (ID: {task_id})" - ) - - if additional_revoked > 0: - ActivityLog.log_scraper_activity( - action="cleanup_scraper_tasks", - status="success", - description=f"Additional cleanup: revoked {additional_revoked} lingering scraper tasks" - ) - - except Exception as e: - ActivityLog.log_error( - error_message=f"Error during additional scraper task cleanup: {str(e)}", - source="ScraperManager.stop_scraper.cleanup" - ) - - except Exception as e: - ActivityLog.log_error( - error_message=f"Error revoking tasks: {str(e)}", - source="ScraperManager.stop_scraper" - ) - # Continue with paper reversion even if task revocation fails - - # STEP 5: Wait a bit longer for any remaining tasks to finish their checks and exit + # STEP 4: Wait a bit for any remaining jobs to finish their checks and exit time.sleep(1.0) - # STEP 6: Revert papers from processing status + # STEP 5: Revert papers from processing status scraper = get_scraper() input_statuses = scraper.get_input_statuses() @@ -469,7 +193,7 @@ class ScraperManager: paper.status = paper.previous_status else: paper.status = revert_status - paper.updated_at = datetime.utcnow() + paper.updated_at = datetime.now(UTC) reverted_count += 1 db.session.commit() @@ -483,12 +207,12 @@ class ScraperManager: ActivityLog.log_scraper_command( action="stop_scraper", status="success", - description=f"Scraper stopped completely. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers." + description=f"Scraper stopped completely. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers." ) return { "status": "success", - "message": f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers to previous status." + "message": f"Scraper stopped. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers to previous status." } except Exception as e: @@ -499,51 +223,16 @@ class ScraperManager: return {"status": "error", "message": str(e)} def reset_scraper(self) -> Dict[str, str]: - """Reset scraper state, revoke all running tasks, and clear all processing statuses.""" + """Reset scraper state, revoke all APScheduler jobs, and clear all processing statuses.""" try: - # First, revoke all running tasks (similar to stop_scraper) - revoked_count = 0 - ActivityLog.log_scraper_command( action="reset_scraper_start", status="info", - description="Beginning scraper reset process with task revocation" + description="Beginning scraper reset process with APScheduler job revocation" ) - try: - # Get Celery inspector to check for running tasks - i = celery.control.inspect() - active = i.active() or {} - scheduled = i.scheduled() or {} - reserved = i.reserved() or {} - - # Revoke all tasks (active, scheduled, reserved) - for queue_name, queue_tasks in [("active", active), ("scheduled", scheduled), ("reserved", reserved)]: - for worker, tasks in queue_tasks.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - ActivityLog.log_scraper_activity( - action="revoke_task", - status="success", - description=f"Revoked {queue_name} task: {task.get('name', 'unknown')} (ID: {task['id']})" - ) - - # Purge all task queues - celery.control.purge() - ActivityLog.log_scraper_activity( - action="purge_queues", - status="success", - description="Purged all task queues during reset" - ) - - except Exception as e: - ActivityLog.log_error( - error_message=f"Error revoking tasks during reset: {str(e)}", - source="ScraperManager.reset_scraper" - ) - # Continue with paper reversion even if task revocation fails + # Clear all APScheduler jobs + delayed_cleared_count = self._clear_delayed_tasks_from_apscheduler() # Get current scraper configuration scraper = get_scraper() @@ -563,7 +252,7 @@ class ScraperManager: paper.status = paper.previous_status else: paper.status = revert_status - paper.updated_at = datetime.utcnow() + paper.updated_at = datetime.now(UTC) paper.error_msg = None # Clear any error messages reverted_count += 1 @@ -576,12 +265,12 @@ class ScraperManager: ActivityLog.log_scraper_command( action="reset_scraper", status="success", - description=f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers." + description=f"Scraper reset. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers." ) return { "status": "success", - "message": f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers to original status." + "message": f"Scraper reset. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers to original status." } except Exception as e: @@ -697,7 +386,7 @@ class ScraperManager: # Update paper status to processing paper.previous_status = previous_status paper.status = output_statuses["processing"] - paper.updated_at = datetime.utcnow() + paper.updated_at = datetime.now(UTC) db.session.commit() # **ADDITIONAL RACE CONDITION CHECK**: Verify scraper is still active before expensive scraping operation @@ -705,7 +394,7 @@ class ScraperManager: if not scraper_state.is_active: # Scraper was deactivated after we marked paper as processing - revert and exit paper.status = previous_status - paper.updated_at = datetime.utcnow() + paper.updated_at = datetime.now(UTC) db.session.commit() ActivityLog.log_scraper_activity( @@ -729,7 +418,7 @@ class ScraperManager: paper.status = output_statuses["failure"] paper.error_msg = result.message - paper.updated_at = datetime.utcnow() + paper.updated_at = datetime.now(UTC) db.session.commit() # Log result @@ -754,7 +443,7 @@ class ScraperManager: if input_statuses: paper.status = input_statuses[0] paper.error_msg = f"Processing error: {str(e)}" - paper.updated_at = datetime.utcnow() + paper.updated_at = datetime.now(UTC) db.session.commit() except: pass # Don't fail if reversion fails diff --git a/scipaperloader/scrapers/tasks.py b/scipaperloader/scrapers/tasks.py index 18d61da..faa599a 100644 --- a/scipaperloader/scrapers/tasks.py +++ b/scipaperloader/scrapers/tasks.py @@ -1,18 +1,17 @@ """ -Hourly scheduler task that processes papers at random times within each hour. +APScheduler-based task functions that replace Celery tasks for paper processing. """ import random from datetime import datetime, timedelta from typing import Optional -from celery import shared_task +from flask import current_app -from ..models import ScraperState, ActivityLog +from ..models import ScraperState, ActivityLog, PaperMetadata from .manager import ScraperManager -@shared_task(bind=True) -def hourly_scraper_scheduler(self): +def hourly_scraper_scheduler(): """ Hourly task that schedules paper processing at random times within the hour. @@ -29,8 +28,6 @@ def hourly_scraper_scheduler(self): status="info", description="Hourly scheduler skipped - scraper not active" ) - # Disable retries for inactive scheduler - self.retry = False return {"status": "inactive", "papers_scheduled": 0} if scraper_state.is_paused: @@ -39,8 +36,6 @@ def hourly_scraper_scheduler(self): status="info", description="Hourly scheduler skipped - scraper paused" ) - # Disable retries for paused scheduler - self.retry = False return {"status": "paused", "papers_scheduled": 0} # Initialize scraper manager @@ -57,6 +52,15 @@ def hourly_scraper_scheduler(self): ) return {"status": "empty", "papers_scheduled": 0} + # Get scheduler from Flask app config + scheduler = current_app.config.get('SCHEDULER') + if not scheduler: + ActivityLog.log_error( + error_message="APScheduler not available for paper scheduling", + source="hourly_scraper_scheduler" + ) + return {"status": "error", "message": "APScheduler not available"} + # Schedule papers at random times within the hour (0-3600 seconds) scheduled_count = 0 current_time = datetime.now() @@ -64,24 +68,27 @@ def hourly_scraper_scheduler(self): for paper in papers: # Random delay between 1 second and 58 minutes delay_seconds = random.randint(1, 3480) # Up to 58 minutes + run_date = current_time + timedelta(seconds=delay_seconds) - # Schedule the task using Celery's task registry to avoid circular import issues - from ..celery import celery - celery.send_task( - 'scipaperloader.scrapers.tasks.process_single_paper', + # Schedule the task using APScheduler + job_id = f"paper_process_{paper.id}_{int(current_time.timestamp())}" + scheduler.add_job( + func=process_single_paper, + trigger='date', + run_date=run_date, args=[paper.id], - countdown=delay_seconds + id=job_id, + replace_existing=True ) scheduled_count += 1 # Log each scheduled paper - schedule_time = current_time + timedelta(seconds=delay_seconds) ActivityLog.log_scraper_activity( action="schedule_paper", paper_id=paper.id, status="info", - description=f"Scheduled paper {paper.doi} for processing at {schedule_time.strftime('%H:%M:%S')}" + description=f"Scheduled paper {paper.doi} for processing at {run_date.strftime('%H:%M:%S')}" ) ActivityLog.log_scraper_activity( @@ -100,8 +107,7 @@ def hourly_scraper_scheduler(self): return {"status": "error", "message": str(e)} -@shared_task(bind=True) -def process_single_paper(self, paper_id: int): +def process_single_paper(paper_id: int): """ Process a single paper. This task is scheduled at random times within each hour. @@ -120,7 +126,6 @@ def process_single_paper(self, paper_id: int): status="skipped", description="Task skipped - scraper not active (initial check)" ) - self.retry = False return {"status": "inactive", "paper_id": paper_id} if scraper_state.is_paused: @@ -130,30 +135,8 @@ def process_single_paper(self, paper_id: int): status="skipped", description="Task skipped - scraper paused (initial check)" ) - self.retry = False return {"status": "paused", "paper_id": paper_id} - # Check if this specific task has been revoked - try: - from ..celery import celery - - # Check if the current task is in the revoked list - if hasattr(self, 'request') and self.request.id: - revoked_tasks = celery.control.inspect().revoked() - if revoked_tasks: - for worker, tasks in revoked_tasks.items(): - if self.request.id in tasks: - ActivityLog.log_scraper_activity( - action="process_single_paper", - paper_id=paper_id, - status="skipped", - description=f"Task skipped - task ID {self.request.id} was revoked" - ) - return {"status": "revoked", "paper_id": paper_id, "task_id": self.request.id} - except Exception: - # Don't fail on revocation check issues, just continue with state checks - pass - # Brief pause to allow stop commands to take effect import time time.sleep(0.1) @@ -167,7 +150,6 @@ def process_single_paper(self, paper_id: int): status="skipped", description="Task skipped - scraper not active (secondary check)" ) - self.retry = False return {"status": "inactive", "paper_id": paper_id} if scraper_state.is_paused: @@ -177,11 +159,9 @@ def process_single_paper(self, paper_id: int): status="skipped", description="Task skipped - scraper paused (secondary check)" ) - self.retry = False return {"status": "paused", "paper_id": paper_id} # Get the paper - from ..models import PaperMetadata paper = PaperMetadata.query.get(paper_id) if not paper: return {"status": "error", "message": f"Paper {paper_id} not found"} @@ -195,7 +175,6 @@ def process_single_paper(self, paper_id: int): status="skipped", description="Task skipped - scraper not active (pre-processing check)" ) - self.retry = False return {"status": "inactive", "paper_id": paper_id} # Process the paper using scraper manager @@ -210,10 +189,20 @@ def process_single_paper(self, paper_id: int): source="process_single_paper" ) return {"status": "error", "paper_id": paper_id, "message": str(e)} + manager = ScraperManager() + result = manager.process_paper(paper) + + return result + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error processing paper {paper_id}: {str(e)}", + source="process_single_paper" + ) + return {"status": "error", "paper_id": paper_id, "message": str(e)} -@shared_task(bind=True) -def process_papers_batch(self, paper_ids: list, scraper_module: Optional[str] = None): +def process_papers_batch(paper_ids: list, scraper_module: Optional[str] = None): """ Process multiple papers in a batch for immediate processing. @@ -226,7 +215,6 @@ def process_papers_batch(self, paper_ids: list, scraper_module: Optional[str] = manager = ScraperManager() for paper_id in paper_ids: - from ..models import PaperMetadata paper = PaperMetadata.query.get(paper_id) if paper: result = manager.process_paper(paper) diff --git a/tests/test_csv_upload.py b/tests/test_csv_upload.py new file mode 100644 index 0000000..bdaaf90 --- /dev/null +++ b/tests/test_csv_upload.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +Test script to verify CSV upload functionality works with APScheduler. +""" +import requests +import time +import io +import csv +from scipaperloader import create_app + +def create_test_csv(): + """Create a simple test CSV file.""" + csv_content = """title,doi,issn,journal,alternative_id,published_online +Test Paper 1,10.1000/test_upload_001,1234-5678,Test Journal,ALT001,2024-01-01 +Test Paper 2,10.1000/test_upload_002,1234-5678,Test Journal,ALT002,2024-01-02 +Test Paper 3,10.1000/test_upload_003,1234-5678,Test Journal,ALT003,2024-01-03 +""" + return csv_content + +def test_csv_upload(): + """Test the CSV upload functionality.""" + print("🧪 Testing CSV Upload Functionality") + print("=" * 50) + + # Create Flask app + app = create_app() + + with app.test_client() as client: + # Create test CSV + csv_content = create_test_csv() + + # Prepare file data + csv_file = io.BytesIO(csv_content.encode('utf-8')) + csv_file.name = 'test_upload.csv' + + print("📤 Uploading CSV file...") + + # Make upload request + response = client.post('/upload/', data={ + 'file': (csv_file, 'test_upload.csv'), + 'delimiter': ',', + 'duplicate_strategy': 'skip' + }, content_type='multipart/form-data') + + print(f"Response Status: {response.status_code}") + print(f"Response Data: {response.get_json()}") + + if response.status_code == 200: + response_data = response.get_json() + if 'task_id' in response_data: + task_id = response_data['task_id'] + print(f"✅ Task scheduled successfully: {task_id}") + + # Monitor task progress + print("\n📊 Monitoring task progress...") + for i in range(30): # Wait up to 30 seconds + progress_response = client.get(f'/upload/task_status/{task_id}') + if progress_response.status_code == 200: + progress_data = progress_response.get_json() + print(f"Progress: {progress_data}") + + if progress_data.get('state') == 'SUCCESS': + print("✅ CSV upload completed successfully!") + result = progress_data.get('result', {}) + print(f" Added: {result.get('added', 0)}") + print(f" Skipped: {result.get('skipped', 0)}") + print(f" Errors: {result.get('error_count', 0)}") + return True + elif progress_data.get('state') == 'FAILURE': + print(f"❌ CSV upload failed: {progress_data.get('error')}") + return False + else: + print(f"❌ Failed to get task status: {progress_response.status_code}") + return False + + time.sleep(1) + + print("⏰ Task did not complete within 30 seconds") + return False + else: + print(f"❌ No task_id in response: {response_data}") + return False + else: + print(f"❌ Upload request failed: {response.status_code}") + print(f"Response: {response.get_data(as_text=True)}") + return False + +def check_scheduler_status(): + """Check APScheduler status.""" + print("\n🔍 Checking APScheduler Status") + print("=" * 50) + + app = create_app() + with app.app_context(): + from scipaperloader.scheduler import _scheduler + + if not _scheduler: + print("❌ APScheduler not initialized") + return False + + if not _scheduler.running: + print("❌ APScheduler not running") + return False + + jobs = _scheduler.get_jobs() + print(f"✅ APScheduler running with {len(jobs)} jobs") + + # Show current jobs + for job in jobs: + print(f" - {job.id}: {job.name}") + + return True + +if __name__ == "__main__": + print("🚀 CSV Upload Test Suite") + print("=" * 50) + + # First check scheduler status + if not check_scheduler_status(): + print("❌ APScheduler issues detected, cannot proceed with test") + exit(1) + + # Run the upload test + success = test_csv_upload() + + if success: + print("\n🎉 All tests passed! CSV upload is working correctly.") + exit(0) + else: + print("\n❌ Test failed! CSV upload needs debugging.") + exit(1) diff --git a/tests/test_scheduler_functionality.py b/tests/test_scheduler_functionality.py new file mode 100644 index 0000000..1c87e44 --- /dev/null +++ b/tests/test_scheduler_functionality.py @@ -0,0 +1,397 @@ +#!/usr/bin/env python3 +""" +Comprehensive test for APScheduler functionality in SciPaperLoader. +Tests job scheduling, execution, revocation, and hourly scheduler functionality. +""" + +import sys +import os +import time +import threading +from datetime import datetime, timedelta + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from scipaperloader import create_app +from scipaperloader.models import PaperMetadata, ScraperState, ActivityLog, ScheduleConfig, VolumeConfig +from scipaperloader.scrapers.manager import ScraperManager +from scipaperloader.db import db + + +def test_scheduler_functionality(): + """Comprehensive test of APScheduler functionality.""" + + print("🧪 Testing APScheduler Functionality") + print("=" * 50) + + # Create test app with in-memory database + app = create_app({ + 'TESTING': True, + 'SQLALCHEMY_DATABASE_URI': 'sqlite:///:memory:', + }) + + with app.app_context(): + # Test 1: Basic scheduler availability + print("\n📋 Test 1: Scheduler Initialization") + scheduler = app.config.get('SCHEDULER') + if not scheduler: + print("❌ APScheduler not found in app config") + return False + + print("✅ APScheduler available and initialized") + print(f"📊 Initial job count: {scheduler.get_job_count()}") + + # Test 2: Database table creation + print("\n📋 Test 2: APScheduler Database Tables") + try: + # Check if we can query jobs (which requires tables to exist) + jobs = scheduler.get_paper_jobs() + print("✅ APScheduler database tables exist and accessible") + print(f"📋 Current paper jobs: {len(jobs)}") + except Exception as e: + print(f"❌ APScheduler database tables not accessible: {e}") + return False + + # Test 3: Job scheduling functionality + print("\n📋 Test 3: Job Scheduling") + + # Create test paper + test_paper = PaperMetadata( + title="Test Paper for Scheduler", + doi="10.1000/test_scheduler_001", + issn="1234-5678", + journal="Test Journal", + status="New" + ) + db.session.add(test_paper) + db.session.commit() + + # Schedule a paper for processing in 30 seconds (longer delay) + try: + job_id = scheduler.schedule_paper_processing( + paper_id=test_paper.id, + delay_seconds=30 # Increased delay to 30 seconds + # Removed explicit job_id to allow default "paper_job_" prefix + ) + print(f"✅ Paper scheduling works: Job ID {job_id}") + except Exception as e: + print(f"❌ Paper scheduling failed: {e}") + return False + + # Verify job was scheduled + jobs_after = scheduler.get_paper_jobs() + if len(jobs_after) == 0: + print("❌ No jobs found after scheduling") + return False + + print(f"✅ Job successfully scheduled: {len(jobs_after)} paper job(s) found") + + # Test 4: Job information retrieval + print("\n📋 Test 4: Job Information Retrieval") + + scheduled_job = jobs_after[0] + print(f"✅ Job details accessible:") + print(f" 📝 Job ID: {scheduled_job['id']}") + print(f" 📝 Job Name: {scheduled_job['name']}") + print(f" 📝 Next Run Time: {scheduled_job['next_run_time']}") + print(f" 📝 Args: {scheduled_job['args']}") + + # Test 5: Job revocation + print("\n📋 Test 5: Job Revocation") + + initial_count = len(jobs_after) + revoked_count = scheduler.revoke_all_scraper_jobs() + + if revoked_count != initial_count: + print(f"⚠️ Warning: Expected to revoke {initial_count} jobs, but revoked {revoked_count}") + else: + print(f"✅ Job revocation works: {revoked_count} job(s) revoked") + + # Verify jobs were revoked + jobs_after_revocation = scheduler.get_paper_jobs() + if len(jobs_after_revocation) > 0: + print(f"❌ Jobs still exist after revocation: {len(jobs_after_revocation)}") + return False + + print("✅ All paper jobs successfully revoked") + + # Test 6: Multiple job scheduling + print("\n📋 Test 6: Multiple Job Scheduling") + + # Create more test papers + test_papers = [] + for i in range(3): + paper = PaperMetadata( + title=f"Test Paper {i+1}", + doi=f"10.1000/test_scheduler_{i+2:03d}", + issn="1234-5678", + journal="Test Journal", + status="New" + ) + db.session.add(paper) + test_papers.append(paper) + + db.session.commit() + + # Schedule multiple papers + scheduled_jobs = [] + for i, paper in enumerate(test_papers): + job_id = scheduler.schedule_paper_processing( + paper_id=paper.id, + delay_seconds=10 + i # Stagger the scheduling + # Removed explicit job_id to allow default "paper_job_" prefix + ) + scheduled_jobs.append(job_id) + + print(f"✅ Multiple job scheduling works: {len(scheduled_jobs)} jobs scheduled") + + # Verify all jobs are scheduled + all_jobs = scheduler.get_paper_jobs() + if len(all_jobs) != len(test_papers): + print(f"❌ Expected {len(test_papers)} jobs, found {len(all_jobs)}") + return False + + print(f"✅ All jobs properly scheduled: {len(all_jobs)} total jobs") + + # Test 7: ScraperManager integration + print("\n📋 Test 7: ScraperManager Integration") + + manager = ScraperManager() + + # Test paper selection + papers = manager.select_papers_for_processing(limit=2) + print(f"✅ ScraperManager paper selection: {len(papers)} papers selected") + + # Test scraper state management with APScheduler + start_result = manager.start_scraper() + if start_result["status"] != "success": + print(f"❌ Failed to start scraper: {start_result['message']}") + return False + + print("✅ Scraper started successfully") + + # Test job clearing through manager + cleared_count = manager._clear_delayed_tasks_from_apscheduler() + print(f"✅ ScraperManager job clearing: {cleared_count} jobs cleared") + + # Verify jobs were cleared + remaining_jobs = scheduler.get_paper_jobs() + if len(remaining_jobs) > 0: + print(f"❌ Jobs still exist after manager clearing: {len(remaining_jobs)}") + return False + + print("✅ ScraperManager successfully clears APScheduler jobs") + + # Test 8: Hourly scheduler configuration + print("\n📋 Test 8: Hourly Scheduler Configuration") + + # Ensure the hourly job is scheduled correctly + all_scheduler_jobs = scheduler._scheduler.get_jobs() if hasattr(scheduler, '_scheduler') and scheduler._scheduler else [] + hourly_jobs = [job for job in all_scheduler_jobs if job.id == 'hourly_scraper_main'] + + if not hourly_jobs: + print("❌ Hourly scheduler job not found") + return False + + hourly_job = hourly_jobs[0] + print("✅ Hourly scheduler job found:") + print(f" 📝 Job ID: {hourly_job.id}") + print(f" 📝 Job Name: {hourly_job.name}") + print(f" 📝 Trigger: {hourly_job.trigger}") + print(f" 📝 Next Run: {hourly_job.next_run_time}") + + # Test 9: Configuration-based scheduling + print("\n📋 Test 9: Configuration-based Scheduling") + + # Set up volume configuration + volume_config = VolumeConfig.query.first() + if not volume_config: + volume_config = VolumeConfig(volume=10) # 10 papers per day + db.session.add(volume_config) + db.session.commit() + + # Test quota calculation + quota = manager.get_current_hour_quota() + print(f"✅ Hourly quota calculation: {quota} papers per hour") + + if quota < 0: + print("❌ Invalid quota calculation") + return False + + # Test 10: Activity logging integration + print("\n📋 Test 10: Activity Logging Integration") + + # Check recent APScheduler-related logs + recent_logs = ActivityLog.query.filter( + ActivityLog.action.like('%apscheduler%') + ).order_by(ActivityLog.timestamp.desc()).limit(5).all() + + print(f"✅ APScheduler activity logging: {len(recent_logs)} related log entries") + + if recent_logs: + for log in recent_logs[:3]: + print(f" 📝 {log.action}: {log.description}") + + # Test 11: Error handling + print("\n📋 Test 11: Error Handling") + + # Test scheduling with invalid paper ID + try: + scheduler.schedule_paper_processing( + paper_id=99999, # Non-existent paper + delay_seconds=1, + job_id="test_error_job" + ) + print("✅ Scheduling with invalid paper ID handled gracefully") + except Exception as e: + print(f"✅ Scheduling with invalid paper ID properly raises exception: {e}") + + # Test 12: Cleanup and shutdown + print("\n📋 Test 12: Cleanup and Shutdown") + + # Stop scraper + stop_result = manager.stop_scraper() + if stop_result["status"] != "success": + print(f"❌ Failed to stop scraper: {stop_result['message']}") + return False + + print("✅ Scraper stopped successfully") + + # Final job count should be minimal (only hourly scheduler) + final_job_count = scheduler.get_job_count() + final_paper_jobs = len(scheduler.get_paper_jobs()) + + print(f"📊 Final state:") + print(f" 📝 Total jobs: {final_job_count}") + print(f" 📝 Paper jobs: {final_paper_jobs}") + + if final_paper_jobs > 0: + print("❌ Paper jobs still exist after cleanup") + return False + + print("✅ Cleanup completed successfully") + + print("\n🎉 ALL SCHEDULER TESTS PASSED!") + print("\n📋 Test Summary:") + print(" ✅ APScheduler initialization works") + print(" ✅ Database tables created and accessible") + print(" ✅ Job scheduling functionality works") + print(" ✅ Job information retrieval works") + print(" ✅ Job revocation works") + print(" ✅ Multiple job scheduling works") + print(" ✅ ScraperManager integration works") + print(" ✅ Hourly scheduler configured correctly") + print(" ✅ Configuration-based scheduling works") + print(" ✅ Activity logging integration works") + print(" ✅ Error handling works") + print(" ✅ Cleanup and shutdown works") + + return True + + +def test_job_execution(): + """Test that jobs actually execute (requires waiting).""" + print("\n🔄 Testing Job Execution (5-second test)") + print("-" * 40) + + app = create_app({ + 'TESTING': True, + 'SQLALCHEMY_DATABASE_URI': 'sqlite:///:memory:', + }) + + with app.app_context(): + # Initialize database and scheduler + db.create_all() + scheduler = app.config.get('SCHEDULER') + if not scheduler: + print("❌ Scheduler not initialized") + return False + + # Create test paper + test_paper = PaperMetadata( + title="Test Paper for Execution", + doi="10.1000/test_execution", + issn="1234-5678", + journal="Test Journal", + status="Pending" + ) + db.session.add(test_paper) + db.session.commit() + + # Verify paper is added to the database + test_paper_id = test_paper.id + if not test_paper_id: + print("❌ Test paper not added to the database") + return False + + # Schedule paper for processing in 2 seconds + job_id = scheduler.schedule_paper_processing( + paper_id=test_paper_id, + delay_seconds=2 + ) + + print(f"📅 Scheduled job {job_id} for execution in 2 seconds") + + # Wait and check for execution + print("⏳ Waiting for job execution...") + time.sleep(3) + + # Check if job completed (should be removed from scheduler) + remaining_jobs = scheduler.get_paper_jobs() + + if remaining_jobs: + print(f"⚠️ Job still in scheduler: {len(remaining_jobs)} remaining") + for job in remaining_jobs: + print(f" 📝 Job ID: {job['id']}, Next Run Time: {job['next_run_time']}") + else: + print("✅ Job executed and removed from scheduler") + + # Check activity logs for execution evidence + execution_logs = ActivityLog.query.filter( + ActivityLog.action.like('%process_single_paper%') + ).order_by(ActivityLog.timestamp.desc()).limit(3).all() + + if execution_logs: + print("✅ Job execution logged in activity:") + for log in execution_logs: + print(f" 📝 {log.action}: {log.description}") + else: + print("⚠️ No execution logs found") + + # Validate job execution status in the database + updated_paper = PaperMetadata.query.get(test_paper_id) + if updated_paper: + print(f"🔍 Retrieved paper: {updated_paper.title}, Status: {updated_paper.status}") + if updated_paper.status == "Done": + print("✅ Paper status updated to 'Done'") + else: + print(f"❌ Paper status not updated: {updated_paper.status}") + else: + print("❌ Paper not found in the database") + + return True + + +if __name__ == "__main__": + print(f"📅 Starting scheduler tests at {datetime.now()}") + + try: + # Run main functionality tests + success = test_scheduler_functionality() + + if success: + print("\n" + "="*50) + # Run execution test if main tests pass + test_job_execution() + + print(f"\n📅 Tests completed at {datetime.now()}") + sys.exit(0 if success else 1) + + except KeyboardInterrupt: + print("\n⏹️ Tests interrupted by user") + sys.exit(1) + except Exception as e: + print(f"\n❌ Test error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) diff --git a/tests/test_scipaperloader.py b/tests/test_scipaperloader.py index 3d57c2d..c05eea9 100644 --- a/tests/test_scipaperloader.py +++ b/tests/test_scipaperloader.py @@ -18,4 +18,5 @@ def client(app): def test_index(client): response = client.get("/") - assert b"It works!" in response.data + # Updated assertion to check for actual content in the index page + assert b"Welcome to SciPaperLoader" in response.data diff --git a/tools/DIAGNOSTIC_GUIDE.md b/tools/DIAGNOSTIC_GUIDE.md index 0326cb9..21a9bcf 100644 --- a/tools/DIAGNOSTIC_GUIDE.md +++ b/tools/DIAGNOSTIC_GUIDE.md @@ -10,7 +10,7 @@ especially for addressing issues with the scraper module. **Symptoms:** - Web interface shows scraper as stopped but papers are still being processed - `/scraper/stop` endpoint returns success but processing continues -- Active tasks show up in Celery inspector +- Active tasks show up in APScheduler inspector **Solutions:** @@ -24,7 +24,7 @@ python tools/diagnostics/emergency_stop.py The emergency stop performs these actions: - Sets scraper state to inactive in the database -- Revokes all running, reserved, and scheduled Celery tasks +- Revokes all running and scheduled APScheduler tasks - Purges all task queues - Reverts papers with "Pending" status to their previous state @@ -33,12 +33,12 @@ The emergency stop performs these actions: **Symptoms:** - Code changes don't seem to have any effect - Bug fixes don't work even though the code is updated -- Workers might be using cached versions of modified code +- APScheduler might be using cached versions of modified code **Solution:** ```bash -# Use the quick fix to stop tasks and restart workers +# Use the quick fix to stop tasks and restart the application make diagnostics # Then select option 6 (Quick fix) # Or directly: @@ -57,7 +57,7 @@ python tools/diagnostics/diagnose_scraper.py This tool will: - Show current scraper state -- List all active, scheduled, and reserved tasks +- List all active and scheduled APScheduler tasks - Display recent activity and error logs ## Preventative Measures @@ -67,11 +67,10 @@ This tool will: - Deploying code changes - Modifying the database -2. **Monitor task queue size** using Flower web interface: +2. **Monitor APScheduler jobs** through the diagnostic tools: ```bash - make celery-flower + make diagnostics # Then select option 2 (Inspect tasks) ``` - Then visit http://localhost:5555 3. **Check logs for failed tasks** regularly in the Logger tab of the application diff --git a/tools/diagnostics/README.md b/tools/diagnostics/README.md index 9895142..2870e4b 100644 --- a/tools/diagnostics/README.md +++ b/tools/diagnostics/README.md @@ -7,14 +7,14 @@ This directory contains various scripts for diagnosing issues, debugging, and ha ### Scraper Management - **emergency_stop.py**: Force stops all scraper activities, revokes running tasks, and reverts papers from "Pending" state -- **quick_fix.py**: A simplified emergency stop that also restarts Celery workers to ensure code changes are applied +- **quick_fix.py**: A simplified emergency stop that also stops Flask processes to ensure code changes are applied - **test_reversion.py**: Tests the paper reversion functionality when stopping the scraper ### Monitoring and Diagnostics - **check_state.py**: Checks the current state of the scraper in the database - **diagnose_scraper.py**: Comprehensive diagnostic tool that examines tasks, logs, and scraper state -- **inspect_tasks.py**: Displays currently running, scheduled, and reserved Celery tasks +- **inspect_tasks.py**: Displays currently running and scheduled APScheduler tasks ## Usage @@ -59,5 +59,5 @@ python tools/diagnostics/quick_fix.py ## Notes - Always run these scripts from the project root directory -- Some scripts may require a running Redis server +- Some scripts may require a running Flask application with APScheduler - After using emergency tools, the application may need to be restarted completely diff --git a/tools/diagnostics/diagnose_scraper.py b/tools/diagnostics/diagnose_scraper.py index b823dcb..e498cab 100755 --- a/tools/diagnostics/diagnose_scraper.py +++ b/tools/diagnostics/diagnose_scraper.py @@ -3,7 +3,6 @@ Diagnose and fix scraper stopping issues. """ from scipaperloader import create_app -from scipaperloader.celery import celery from scipaperloader.models import ScraperState, ActivityLog from scipaperloader.scrapers.factory import get_scraper @@ -17,22 +16,16 @@ def check_scraper_status(): print(f"Scraper state in DB: active={scraper_state.is_active}, paused={scraper_state.is_paused}") else: print("No scraper state found in database") - -def check_celery_tasks(): - """Check currently running Celery tasks.""" - i = celery.control.inspect() - - print("\n=== ACTIVE TASKS ===") - active_tasks = i.active() or {} - for worker, tasks in active_tasks.items(): - for task in tasks: - print(f"Worker: {worker}, Task: {task.get('name', 'Unknown')}, ID: {task.get('id', 'Unknown')}") - - print("\n=== SCHEDULED TASKS ===") - scheduled_tasks = i.scheduled() or {} - for worker, tasks in scheduled_tasks.items(): - for task in tasks: - print(f"Worker: {worker}, Task: {task.get('name', 'Unknown')}, ID: {task.get('id', 'Unknown')}") + +def check_scheduler_jobs(): + """Check the current jobs in APScheduler.""" + with app.app_context(): + scheduler = app.config.get('SCHEDULER') + if not scheduler: + print("❌ APScheduler not found in app config") + else: + jobs = scheduler.get_paper_jobs() + print("Scheduled jobs:", jobs) def check_recent_logs(): """Check recent activity logs for clues.""" @@ -60,41 +53,26 @@ def force_stop_scraper(): print("Set scraper state to inactive") # Revoke all tasks - i = celery.control.inspect() - revoked_ids = [] - - # Check all queues - for queue_name, queue_func in [ - ("scheduled", i.scheduled), - ("active", i.active), - ("reserved", i.reserved) - ]: - queue = queue_func() or {} - for worker, tasks in queue.items(): - for task in tasks: - task_id = task.get('id') - if task_id and task_id not in revoked_ids: - celery.control.revoke(task_id, terminate=True) - revoked_ids.append(task_id) - print(f"Revoked task: {task_id}") - - # Purge all queues - celery.control.purge() - print("Purged all task queues") + scheduler = app.config.get('SCHEDULER') + if not scheduler: + print("❌ APScheduler not found in app config") + else: + revoked_count = scheduler.revoke_all_scraper_jobs() + print(f"✅ Revoked {revoked_count} jobs from APScheduler") # Log the action ActivityLog.log_scraper_command( action="force_stop_scraper", status="success", - description=f"Force stopped scraper, revoked {len(revoked_ids)} tasks" + description=f"Force stopped scraper, revoked {revoked_count} tasks" ) - print(f"\nRevoked {len(revoked_ids)} tasks in total") + print(f"\nRevoked {revoked_count} tasks in total") if __name__ == "__main__": print("=== SCRAPER STATUS DIAGNOSTIC TOOL ===") check_scraper_status() - check_celery_tasks() + check_scheduler_jobs() check_recent_logs() stop_confirmation = input("\nDo you want to force stop the scraper? (y/n): ") diff --git a/tools/diagnostics/emergency_stop.py b/tools/diagnostics/emergency_stop.py index 892d1df..b88cce1 100755 --- a/tools/diagnostics/emergency_stop.py +++ b/tools/diagnostics/emergency_stop.py @@ -23,7 +23,6 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../. from scipaperloader import create_app from scipaperloader.db import db from scipaperloader.models import PaperMetadata, ActivityLog, ScraperState -from scipaperloader.celery import celery app = create_app() @@ -38,46 +37,18 @@ def emergency_stop(): ScraperState.set_paused(False) print("✓ Set scraper state to inactive") - # 2. Revoke all tasks - print("\nRevoking running tasks...") - try: - i = celery.control.inspect() - active = i.active() or {} - scheduled = i.scheduled() or {} - reserved = i.reserved() or {} - - revoked_count = 0 - - # Revoke active tasks - for worker, tasks in active.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - print(f" Revoked active task: {task.get('name', 'unknown')}") - - # Revoke scheduled tasks - for worker, tasks in scheduled.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - - # Revoke reserved tasks - for worker, tasks in reserved.items(): - for task in tasks: - if 'id' in task: - celery.control.revoke(task['id'], terminate=True) - revoked_count += 1 - - print(f"✓ Revoked {revoked_count} tasks") - - # 3. Purge queues - celery.control.purge() - print("✓ Purged all task queues") - - except Exception as e: - print(f"⚠ Error revoking tasks: {str(e)}") + # 2. Revoke all jobs in APScheduler + scheduler = app.config.get('SCHEDULER') + if scheduler: + revoked_count = scheduler.revoke_all_scraper_jobs() + print(f"✅ Revoked {revoked_count} jobs from APScheduler") + else: + print("❌ APScheduler not found in app config") + + # 3. Revert all papers to 'Pending' state + PaperMetadata.query.filter_by(status="Processing").update({"status": "Pending"}) + db.session.commit() + print("✅ Reverted all 'Processing' papers to 'Pending' state") # 4. Revert papers in "Pending" status try: diff --git a/tools/diagnostics/inspect_tasks.py b/tools/diagnostics/inspect_tasks.py index 33a5b85..3f6bd3e 100755 --- a/tools/diagnostics/inspect_tasks.py +++ b/tools/diagnostics/inspect_tasks.py @@ -1,11 +1,78 @@ #!/usr/bin/env python3 """ -Inspect current Celery tasks (active, reserved, and scheduled) +Inspect current APScheduler jobs (active and scheduled). """ -from scipaperloader.celery import celery +import sys +import os +from datetime import datetime -i = celery.control.inspect() -print("Active tasks:", i.active()) -print("Reserved tasks:", i.reserved()) -print("Scheduled tasks:", i.scheduled()) +# Add project root to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) + +from scipaperloader import create_app +from scipaperloader.models import ScraperState + +def main(): + print("=== APScheduler Task Inspector ===") + print(f"Time: {datetime.now()}\n") + + app = create_app() + + with app.app_context(): + # Check scraper state + scraper_state = ScraperState.get_current_state() + print(f"🔄 Scraper State:") + print(f" Active: {'✅' if scraper_state.is_active else '❌'} {scraper_state.is_active}") + print(f" Paused: {'⏸️' if scraper_state.is_paused else '▶️'} {scraper_state.is_paused}") + print() + + # Check APScheduler + scheduler = app.config.get('SCHEDULER') + if not scheduler: + print("❌ APScheduler not found in app config") + return + + print("📋 APScheduler Status:") + # Access the underlying scheduler + if hasattr(scheduler, 'scheduler') and scheduler.scheduler: + print(f" Running: {'✅' if scheduler.scheduler.running else '❌'} {scheduler.scheduler.running}") + else: + print("❌ APScheduler instance not accessible") + print() + + # Get all jobs + if hasattr(scheduler, 'scheduler') and scheduler.scheduler: + all_jobs = scheduler.scheduler.get_jobs() + else: + all_jobs = [] + paper_jobs = scheduler.get_paper_jobs() + + print(f"📊 Job Statistics:") + print(f" Total jobs: {len(all_jobs)}") + print(f" Paper processing jobs: {len(paper_jobs)}") + print() + + if paper_jobs: + print("📝 Active Paper Processing Jobs:") + for job in paper_jobs: + next_run = job.get('next_run_time', 'Not scheduled') + print(f" • {job['id']}") + print(f" Next run: {next_run}") + print(f" Name: {job.get('name', 'N/A')}") + if job.get('args'): + print(f" Paper ID: {job['args'][0] if job['args'] else 'N/A'}") + print() + else: + print("✅ No active paper processing jobs") + + # Show other jobs if any + other_jobs = [job for job in all_jobs if not any(pattern in job.id for pattern in ['paper_process_', 'test_paper_process_', 'process_paper_'])] + if other_jobs: + print(f"🔧 Other Scheduled Jobs ({len(other_jobs)}):") + for job in other_jobs: + next_run = job.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if job.next_run_time else 'Not scheduled' + print(f" • {job.id} - Next run: {next_run}") + +if __name__ == "__main__": + main() diff --git a/tools/diagnostics/quick_fix.py b/tools/diagnostics/quick_fix.py index c0dd0ac..a9a4191 100755 --- a/tools/diagnostics/quick_fix.py +++ b/tools/diagnostics/quick_fix.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ -Quick fix script to stop all running scraper tasks and restart Celery workers. -This ensures the updated code is loaded and tasks are properly terminated. +Quick fix script to stop all running scraper tasks using APScheduler. +This ensures all scheduled tasks are properly terminated. """ import os @@ -9,45 +9,55 @@ import sys import signal import subprocess import time -from datetime import datetime +from datetime import datetime, UTC # Add project root to path sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) -def kill_celery_processes(): - """Kill all running Celery processes""" - print("Killing Celery processes...") +def stop_apscheduler_jobs(): + """Stop all APScheduler jobs through the Flask app""" + print("Stopping APScheduler jobs...") try: - # Get all celery processes - result = subprocess.run(['pgrep', '-f', 'celery'], capture_output=True, text=True) + from scipaperloader import create_app + + app = create_app() + with app.app_context(): + scheduler = app.config.get('SCHEDULER') + if scheduler: + revoked_count = scheduler.revoke_all_scraper_jobs() + print(f"✓ Revoked {revoked_count} APScheduler jobs") + else: + print("❌ APScheduler not found in app config") + + except Exception as e: + print(f"⚠ Error stopping APScheduler jobs: {e}") + +def kill_python_processes(): + """Kill any running Python processes that might be Flask/APScheduler workers""" + print("Checking for running Flask/APScheduler processes...") + try: + # Look for Flask processes + result = subprocess.run(['pgrep', '-f', 'flask'], capture_output=True, text=True) if result.returncode == 0: pids = result.stdout.strip().split('\n') for pid in pids: if pid: try: - os.kill(int(pid), signal.SIGTERM) - print(f" Killed process {pid}") - except ProcessLookupError: - pass # Process already dead + # Check if this is our process before killing + cmdline_result = subprocess.run(['ps', '-p', pid, '-o', 'cmd='], capture_output=True, text=True) + if 'scipaperloader' in cmdline_result.stdout: + os.kill(int(pid), signal.SIGTERM) + print(f" Killed Flask process {pid}") + except (ProcessLookupError, ValueError): + pass # Process already dead or invalid PID # Wait a moment for graceful shutdown time.sleep(2) - - # Force kill any remaining processes - result = subprocess.run(['pgrep', '-f', 'celery'], capture_output=True, text=True) - if result.returncode == 0: - pids = result.stdout.strip().split('\n') - for pid in pids: - if pid: - try: - os.kill(int(pid), signal.SIGKILL) - print(f" Force killed process {pid}") - except ProcessLookupError: - pass + else: + print("✓ No Flask processes found") - print("✓ All Celery processes terminated") except Exception as e: - print(f"⚠ Error killing processes: {e}") + print(f"⚠ Error checking processes: {e}") def stop_scraper_state(): """Set scraper state to inactive using Flask app context""" @@ -55,6 +65,7 @@ def stop_scraper_state(): from scipaperloader import create_app from scipaperloader.models import ScraperState, PaperMetadata from scipaperloader.db import db + from scipaperloader.scrapers.factory import get_scraper app = create_app() with app.app_context(): @@ -63,41 +74,57 @@ def stop_scraper_state(): ScraperState.set_paused(False) print("✓ Set scraper state to inactive") - # Revert any pending papers to "New" status (simple approach since we don't have previous_status data yet) - pending_papers = PaperMetadata.query.filter_by(status="Pending").all() + # Get scraper configuration for proper status reversion + scraper = get_scraper() + input_statuses = scraper.get_input_statuses() + output_statuses = scraper.get_output_statuses() + processing_status = output_statuses.get("processing", "Processing") + + # Revert any papers in processing status + processing_papers = PaperMetadata.query.filter_by(status=processing_status).all() reverted_count = 0 - for paper in pending_papers: - paper.status = "New" # Simple fallback - revert all to "New" - reverted_count += 1 - - if reverted_count > 0: + if processing_papers and input_statuses: + revert_status = input_statuses[0] # Use first input status as default + + for paper in processing_papers: + # Try to use previous_status if available, otherwise use first input status + if hasattr(paper, 'previous_status') and paper.previous_status: + paper.status = paper.previous_status + else: + paper.status = revert_status + paper.updated_at = datetime.now(UTC) + reverted_count += 1 + db.session.commit() - print(f"✓ Reverted {reverted_count} papers from 'Pending' to 'New'") + print(f"✓ Reverted {reverted_count} papers from '{processing_status}' to previous status") else: - print("✓ No pending papers to revert") + print("✓ No papers in processing status to revert") except Exception as e: print(f"⚠ Error setting scraper state: {e}") def main(): - print("=== QUICK SCRAPER FIX ===") + print("=== QUICK SCRAPER FIX (APScheduler) ===") print(f"Time: {datetime.now()}") print() - # Step 1: Stop scraper state + # Step 1: Stop scraper state and revert papers stop_scraper_state() - # Step 2: Kill all Celery processes - kill_celery_processes() + # Step 2: Stop all APScheduler jobs + stop_apscheduler_jobs() + + # Step 3: Kill any running Flask processes + kill_python_processes() print() print("=== FIX COMPLETE ===") print("The scraper has been stopped and all tasks terminated.") - print("You can now restart the Celery workers with:") - print(" make celery") - print("or") + print("You can now restart the application with:") print(" make run") + print("or") + print(" python -m flask --app scipaperloader run") if __name__ == "__main__": main() diff --git a/tools/diagnostics/test_reversion.py b/tools/diagnostics/test_reversion.py index 346bdfe..6e2520a 100755 --- a/tools/diagnostics/test_reversion.py +++ b/tools/diagnostics/test_reversion.py @@ -1,16 +1,17 @@ #!/usr/bin/env python3 """ -Test script for verifying the paper reversion fix. +Test script for verifying the paper reversion fix with APScheduler. This script: -1. Simulates stopping the scraper -2. Checks that all pending papers were reverted to their previous status -3. Ensures all running tasks were terminated +1. Creates test papers and simulates processing +2. Tests the stop_scraper functionality +3. Checks that all pending papers were reverted to their previous status +4. Ensures all running tasks were terminated """ import os import sys import time -from datetime import datetime +from datetime import datetime, UTC, timedelta from sqlalchemy import func from flask import Flask @@ -21,81 +22,136 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../. from scipaperloader import create_app from scipaperloader.db import db from scipaperloader.models import PaperMetadata, ActivityLog, ScraperState -from scipaperloader.celery import celery +from scipaperloader.scrapers.factory import get_scraper +from scipaperloader.scrapers.manager import ScraperManager +print("[DEBUG] Initializing Flask app...") app = create_app() -def test_stop_scraper(): - """Test the stop_scraper functionality""" - - with app.app_context(): - # First check current scraper state - scraper_state = ScraperState.get_current_state() - print(f"Current scraper state: active={scraper_state.is_active}, paused={scraper_state.is_paused}") - - # Check if there are any papers in "Pending" state - pending_count = PaperMetadata.query.filter_by(status="Pending").count() - print(f"Papers in 'Pending' state before stopping: {pending_count}") - - if pending_count == 0: - print("No papers in 'Pending' state to test with.") - print("Would you like to create a test paper in Pending state? (y/n)") - choice = input().lower() - if choice == 'y': - # Create a test paper - paper = PaperMetadata( - title="Test Paper for Reversion", - doi="10.1234/test.123", - status="Pending", - previous_status="New", # Test value we expect to be reverted to - created_at=datetime.utcnow(), - updated_at=datetime.utcnow() - ) - db.session.add(paper) - db.session.commit() - print(f"Created test paper with ID {paper.id}, status='Pending', previous_status='New'") - pending_count = 1 - - # Simulate the stop_scraper API call - from scipaperloader.blueprints.scraper import revert_pending_papers - print("Reverting pending papers...") - reverted = revert_pending_papers() - print(f"Reverted {reverted} papers from 'Pending' state") - - # Check if any papers are still in "Pending" state - still_pending = PaperMetadata.query.filter_by(status="Pending").count() - print(f"Papers still in 'Pending' state after stopping: {still_pending}") - - # List any that were reverted and their current status - if reverted > 0: - print("\nPapers that were reverted:") - recent_logs = ActivityLog.query.filter_by(action="revert_pending").order_by( - ActivityLog.timestamp.desc()).limit(10).all() - - for log in recent_logs: - paper = PaperMetadata.query.get(log.paper_id) - if paper: - print(f"Paper ID {paper.id}: '{paper.title}' - Now status='{paper.status}'") - - # Check active celery tasks - i = celery.control.inspect() - active = i.active() or {} - reserved = i.reserved() or {} - scheduled = i.scheduled() or {} - - active_count = sum(len(tasks) for worker, tasks in active.items()) - reserved_count = sum(len(tasks) for worker, tasks in reserved.items()) - scheduled_count = sum(len(tasks) for worker, tasks in scheduled.items()) - - print(f"\nCurrently {active_count} active, {reserved_count} reserved, and {scheduled_count} scheduled tasks") - - # Print conclusion - if still_pending == 0 and reverted > 0: - print("\nSUCCESS: All pending papers were properly reverted!") - elif still_pending > 0: - print(f"\nWARNING: {still_pending} papers are still in 'Pending' state!") - elif pending_count == 0 and reverted == 0: - print("\nNo papers to revert. Can't fully test.") +print("[DEBUG] Flask app initialized.") -if __name__ == "__main__": - test_stop_scraper() +def test_stop_scraper(): + """Test the stop_scraper functionality with proper APScheduler integration""" + + print("[DEBUG] Entering app context...") + with app.app_context(): + print("[DEBUG] App context entered.") + + # Clear existing test data + print("[DEBUG] Clearing existing test data...") + PaperMetadata.query.filter(PaperMetadata.doi.like('10.1234/test%')).delete() + db.session.commit() + print("[DEBUG] Existing test data cleared.") + + # Get scraper configuration + scraper = get_scraper() + input_statuses = scraper.get_input_statuses() + output_statuses = scraper.get_output_statuses() + + if not input_statuses: + print("❌ No input statuses found for current scraper") + return + + input_status = input_statuses[0] # Use first input status + processing_status = output_statuses.get("processing", "Processing") + + print(f"[DEBUG] Using input status: {input_status}") + print(f"[DEBUG] Using processing status: {processing_status}") + + # Create test papers in input status + test_papers = [] + print("[DEBUG] Creating test papers...") + for i in range(3): + test_paper = PaperMetadata() + test_paper.title = f"Test Paper {i+1}" + test_paper.doi = f"10.1234/test{i+1}" + test_paper.status = input_status + test_paper.created_at = datetime.now(UTC) + test_paper.updated_at = datetime.now(UTC) + db.session.add(test_paper) + test_papers.append(test_paper) + db.session.commit() + print(f"[DEBUG] Created {len(test_papers)} test papers in '{input_status}' status.") + + # Simulate some papers being moved to processing status + print("[DEBUG] Simulating papers in processing...") + for i, paper in enumerate(test_papers[:2]): # Move first 2 papers to processing + paper.previous_status = paper.status # Store previous status + paper.status = processing_status + paper.updated_at = datetime.now(UTC) + db.session.commit() + print(f"[DEBUG] Moved 2 papers to '{processing_status}' status.") + + # Check current scraper state + scraper_state = ScraperState.get_current_state() + print(f"[DEBUG] Current scraper state: active={scraper_state.is_active}, paused={scraper_state.is_paused}") + + # Check paper counts before stopping + input_count = PaperMetadata.query.filter_by(status=input_status).count() + processing_count = PaperMetadata.query.filter_by(status=processing_status).count() + print(f"[DEBUG] Papers before stopping: {input_count} in '{input_status}', {processing_count} in '{processing_status}'") + + # Test APScheduler job management + scheduler = app.config.get('SCHEDULER') + if scheduler: + print("[DEBUG] Testing APScheduler job management...") + + # Create some test jobs using the correct API + for paper in test_papers: + job_id = scheduler.schedule_paper_processing( + paper_id=paper.id, + delay_seconds=60, # 1 minute from now + job_id=f"test_paper_process_{paper.id}" + ) + print(f"[DEBUG] Scheduled job {job_id} for paper {paper.id}") + + jobs_before = len(scheduler.get_paper_jobs()) + print(f"[DEBUG] Created {jobs_before} test jobs in APScheduler") + + # Test the manager's stop_scraper method + print("[DEBUG] Testing ScraperManager.stop_scraper()...") + manager = ScraperManager() + result = manager.stop_scraper() + + print(f"[DEBUG] stop_scraper result: {result}") + + # Check jobs after stopping + jobs_after = len(scheduler.get_paper_jobs()) + print(f"[DEBUG] Jobs after stopping: {jobs_after} (should be 0)") + + if jobs_after == 0: + print("✅ All APScheduler jobs successfully revoked") + else: + print(f"❌ {jobs_after} jobs still exist after revocation") + else: + print("❌ APScheduler not found in app config") + + # Check paper counts after stopping + input_count_after = PaperMetadata.query.filter_by(status=input_status).count() + processing_count_after = PaperMetadata.query.filter_by(status=processing_status).count() + print(f"[DEBUG] Papers after stopping: {input_count_after} in '{input_status}', {processing_count_after} in '{processing_status}'") + + # Verify that processing papers were reverted + if processing_count_after == 0 and input_count_after >= processing_count: + print("✅ Papers successfully reverted from processing to previous status") + else: + print(f"❌ Paper reversion failed: expected 0 processing papers, got {processing_count_after}") + + # Check scraper state after stopping + scraper_state_after = ScraperState.get_current_state() + print(f"[DEBUG] Scraper state after stopping: active={scraper_state_after.is_active}, paused={scraper_state_after.is_paused}") + + if not scraper_state_after.is_active and not scraper_state_after.is_paused: + print("✅ Scraper state correctly set to inactive") + else: + print("❌ Scraper state not properly updated") + + # Clean up test data + print("[DEBUG] Cleaning up test data...") + PaperMetadata.query.filter(PaperMetadata.doi.like('10.1234/test%')).delete() + db.session.commit() + print("[DEBUG] Test data cleaned up.") + +print("[DEBUG] Starting test_stop_scraper...") +test_stop_scraper() +print("[DEBUG] test_stop_scraper completed.")