From 0adaed0bfa66ded5995d7d57ab809e26b70bd968 Mon Sep 17 00:00:00 2001 From: Michael Beck Date: Wed, 16 Apr 2025 16:32:52 +0200 Subject: [PATCH] fixes dummy paper processing --- Makefile | 10 ++- celery_worker.py | 2 + scipaperloader/blueprints/scraper.py | 96 ++++++++++++++++++++-------- scipaperloader/models.py | 40 ++++++++++++ 4 files changed, 117 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index 1a4728e..32530b4 100644 --- a/Makefile +++ b/Makefile @@ -143,11 +143,15 @@ celery-flower: venv $(PIP) install flower $(CELERY) -A celery_worker:celery flower --port=5555 +# Run Celery beat scheduler for periodic tasks +celery-beat: venv + $(CELERY) -A celery_worker:celery beat --loglevel=info + # Check if Redis is running, start if needed redis: @redis-cli ping > /dev/null 2>&1 || (echo "Starting Redis server..." && redis-server --daemonize yes) -# Run complete application stack (Flask app + Celery worker + Redis) +# Run complete application stack (Flask app + Celery worker + Redis + Beat scheduler) run-all: redis - @echo "Starting Flask and Celery..." - @$(MAKE) -j2 run celery + @echo "Starting Flask, Celery worker and Beat scheduler..." + @$(MAKE) -j3 run celery celery-beat diff --git a/celery_worker.py b/celery_worker.py index f8b0fe8..dbae178 100644 --- a/celery_worker.py +++ b/celery_worker.py @@ -1,4 +1,6 @@ from scipaperloader.celery import celery, configure_celery +# Import all task modules to ensure they are registered with Celery +import scipaperloader.blueprints.scraper # Import the scraper module with our tasks # Configure celery with Flask app configure_celery() diff --git a/scipaperloader/blueprints/scraper.py b/scipaperloader/blueprints/scraper.py index 8b8f746..8029d65 100644 --- a/scipaperloader/blueprints/scraper.py +++ b/scipaperloader/blueprints/scraper.py @@ -4,16 +4,27 @@ import time import math from datetime import datetime, timedelta from flask import Blueprint, jsonify, render_template, request, current_app, flash -from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig +from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig, ScraperState from ..db import db from ..celery import celery from ..defaults import MAX_VOLUME +from celery.schedules import crontab bp = Blueprint("scraper", __name__, url_prefix="/scraper") -# Global variables to track scraper state -SCRAPER_ACTIVE = False -SCRAPER_PAUSED = False +# Setup periodic task to run every minute for testing purposes +@celery.on_after_configure.connect +def setup_periodic_tasks(sender, **kwargs): + # Run the dummy scraper every minute for testing purposes + sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), name='run dummy scraper every minute') + +@celery.task +def run_periodic_dummy_scraper(): + """Periodic task to run the dummy scraper if it's active and not paused""" + if ScraperState.is_scraper_active(): + dummy_scheduled_scraper.delay() + return True + return False @bp.route("/") def index(): @@ -26,22 +37,26 @@ def index(): db.session.add(volume_config) db.session.commit() + # Get scraper state + scraper_state = ScraperState.get_current_state() + return render_template( "scraper.html.jinja", volume_config=volume_config, - scraper_active=SCRAPER_ACTIVE, - scraper_paused=SCRAPER_PAUSED, + scraper_active=scraper_state.is_active, + scraper_paused=scraper_state.is_paused, max_volume=MAX_VOLUME ) @bp.route("/start", methods=["POST"]) def start_scraper(): """Start the scraper.""" - global SCRAPER_ACTIVE, SCRAPER_PAUSED + scraper_state = ScraperState.get_current_state() - if not SCRAPER_ACTIVE: - SCRAPER_ACTIVE = True - SCRAPER_PAUSED = False + if not scraper_state.is_active: + # Update scraper state + ScraperState.set_active(True) + ScraperState.set_paused(False) # Log the action ActivityLog.log_scraper_command( @@ -66,11 +81,12 @@ def start_scraper(): @bp.route("/stop", methods=["POST"]) def stop_scraper(): """Stop the scraper.""" - global SCRAPER_ACTIVE, SCRAPER_PAUSED + scraper_state = ScraperState.get_current_state() - if SCRAPER_ACTIVE: - SCRAPER_ACTIVE = False - SCRAPER_PAUSED = False + if scraper_state.is_active: + # Update scraper state + ScraperState.set_active(False) + ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="stop_scraper", @@ -91,10 +107,11 @@ def stop_scraper(): @bp.route("/pause", methods=["POST"]) def pause_scraper(): """Pause the scraper.""" - global SCRAPER_ACTIVE, SCRAPER_PAUSED + scraper_state = ScraperState.get_current_state() - if SCRAPER_ACTIVE and not SCRAPER_PAUSED: - SCRAPER_PAUSED = True + if scraper_state.is_active and not scraper_state.is_paused: + # Update scraper state + ScraperState.set_paused(True) ActivityLog.log_scraper_command( action="pause_scraper", @@ -106,8 +123,9 @@ def pause_scraper(): "success": True, "message": "Scraper paused" }) - elif SCRAPER_ACTIVE and SCRAPER_PAUSED: - SCRAPER_PAUSED = False + elif scraper_state.is_active and scraper_state.is_paused: + # Update scraper state + ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="resume_scraper", @@ -128,9 +146,11 @@ def pause_scraper(): @bp.route("/status") def scraper_status(): """Get the current status of the scraper.""" + scraper_state = ScraperState.get_current_state() + return jsonify({ - "active": SCRAPER_ACTIVE, - "paused": SCRAPER_PAUSED, + "active": scraper_state.is_active, + "paused": scraper_state.is_paused, "current_hour": datetime.now().hour, }) @@ -348,13 +368,12 @@ def dummy_scheduled_scraper(): The main scheduler task that runs every hour to process papers according to the configured schedule. """ - global SCRAPER_ACTIVE, SCRAPER_PAUSED - - if not SCRAPER_ACTIVE or SCRAPER_PAUSED: + # Check if scraper is active using ScraperState + if not ScraperState.is_scraper_active(): ActivityLog.log_scraper_activity( action="scheduled_scraping", status="info", - description=f"Scheduled scraping skipped: active={SCRAPER_ACTIVE}, paused={SCRAPER_PAUSED}" + description=f"Scheduled scraping skipped: inactive or paused" ) return False @@ -382,9 +401,19 @@ def dummy_scheduled_scraper(): # Create dummy pending papers for i in range(papers_to_download): + # Generate a unique DOI by checking if it exists in the database + while True: + random_id = random.randint(1000, 9999) + doi = f"10.1234/dummy-pending.{random_id}" + + # Check if the DOI already exists + existing = PaperMetadata.query.filter_by(doi=doi).first() + if not existing: + break + new_paper = PaperMetadata( - title=f"Dummy Pending Paper {random.randint(1000, 9999)}", - doi=f"10.1234/dummy-pending.{random.randint(1000, 9999)}", + title=f"Dummy Pending Paper {random_id}", + doi=doi, journal=random.choice([ "Nature", "Science", "PLOS ONE", "Journal of Dummy Research", "Proceedings of the Dummy Society", "Cell", "Dummy Review Letters" @@ -396,7 +425,18 @@ def dummy_scheduled_scraper(): ) db.session.add(new_paper) - db.session.commit() + # Commit all at once after creating all papers + try: + db.session.commit() + except Exception as e: + # Log the error and rollback + ActivityLog.log_error( + error_message="Failed to create dummy pending papers", + exception=e, + source="dummy_scheduled_scraper" + ) + db.session.rollback() + return False # Get the newly created papers pending_papers = PaperMetadata.query.filter_by(status="Pending").all() diff --git a/scipaperloader/models.py b/scipaperloader/models.py index 48ed970..159ed22 100644 --- a/scipaperloader/models.py +++ b/scipaperloader/models.py @@ -211,6 +211,46 @@ class VolumeConfig(db.Model): volume = db.Column(db.Float) # volume of papers to scrape per day +class ScraperState(db.Model): + """Model to store the current state of the scraper.""" + id = db.Column(db.Integer, primary_key=True) + is_active = db.Column(db.Boolean, default=False) + is_paused = db.Column(db.Boolean, default=False) + last_updated = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + @classmethod + def get_current_state(cls): + """Get the current scraper state, creating it if it doesn't exist.""" + state = cls.query.first() + if not state: + state = cls(is_active=False, is_paused=False) + db.session.add(state) + db.session.commit() + return state + + @classmethod + def set_active(cls, active): + """Set the active state of the scraper.""" + state = cls.get_current_state() + state.is_active = active + db.session.commit() + return state + + @classmethod + def set_paused(cls, paused): + """Set the paused state of the scraper.""" + state = cls.get_current_state() + state.is_paused = paused + db.session.commit() + return state + + @classmethod + def is_scraper_active(cls): + """Check if the scraper is active.""" + state = cls.get_current_state() + return state.is_active and not state.is_paused + + def init_schedule_config(): """Initialize ScheduleConfig with default values if empty""" if ScheduleConfig.query.count() == 0: