diff --git a/scipaperloader/blueprints/scraper.py b/scipaperloader/blueprints/scraper.py index b8cb66b..fd81d49 100644 --- a/scipaperloader/blueprints/scraper.py +++ b/scipaperloader/blueprints/scraper.py @@ -1,8 +1,10 @@ import random import json +import time +import math from datetime import datetime from flask import Blueprint, jsonify, render_template, request, current_app, flash -from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory +from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig from ..db import db from ..celery import celery @@ -43,13 +45,15 @@ def start_scraper(): ActivityLog.log_scraper_command( action="start_scraper", status="success", - description="Scraper started manually" + description="Scheduled scraper started - will follow hourly configuration" ) - # Trigger the schedule.py to start actual scheduling + # Immediately trigger a task to test the scheduler and provide feedback + dummy_scheduled_scraper.delay() + return jsonify({ "success": True, - "message": "Scraper started" + "message": "Scraper started - following hourly schedule configuration" }) else: return jsonify({ @@ -282,4 +286,210 @@ def dummy_scrape_paper(self): return { "success": False, "error": error_message - } \ No newline at end of file + } + +@celery.task +def calculate_papers_for_current_hour(): + """ + Calculate how many papers should be downloaded in the current hour + based on schedule configuration. + + Returns: + int: Number of papers to download this hour + """ + current_hour = datetime.now().hour + + # Get volume configuration + volume_config = VolumeConfig.query.first() + if not volume_config: + volume_config = VolumeConfig(volume=100) # Default to 100 papers per day + db.session.add(volume_config) + db.session.commit() + + # Get all schedule configurations to calculate total weight + schedule_configs = ScheduleConfig.query.all() + if not schedule_configs: + # If no schedule configs, create default with equal weights + for hour in range(24): + config = ScheduleConfig(hour=hour, weight=1.0) + db.session.add(config) + db.session.commit() + schedule_configs = ScheduleConfig.query.all() + + # Calculate total weight across all hours + total_weight = sum(config.weight for config in schedule_configs) + + # Find the weight for the current hour + current_hour_config = ScheduleConfig.query.get(current_hour) + if not current_hour_config: + # Create config for current hour if it doesn't exist + current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0) + db.session.add(current_hour_config) + db.session.commit() + + # Calculate papers for current hour: (hour_weight / total_weight) * daily_volume + if total_weight > 0: + weight_ratio = current_hour_config.weight / total_weight + papers_this_hour = math.floor(weight_ratio * volume_config.volume) + else: + papers_this_hour = 0 + + return papers_this_hour + + +@celery.task +def dummy_scheduled_scraper(): + """ + The main scheduler task that runs every hour to process papers + according to the configured schedule. + """ + global SCRAPER_ACTIVE, SCRAPER_PAUSED + + if not SCRAPER_ACTIVE or SCRAPER_PAUSED: + ActivityLog.log_scraper_activity( + action="scheduled_scraping", + status="info", + description=f"Scheduled scraping skipped: active={SCRAPER_ACTIVE}, paused={SCRAPER_PAUSED}" + ) + return False + + # Calculate how many papers to download this hour + papers_to_download = calculate_papers_for_current_hour() + + if papers_to_download <= 0: + ActivityLog.log_scraper_activity( + action="scheduled_scraping", + status="info", + description=f"No papers scheduled for current hour" + ) + return True + + # Get all pending papers + pending_papers = PaperMetadata.query.filter_by(status="Pending").all() + + # If no pending papers available, create some dummy pending papers + if not pending_papers: + ActivityLog.log_scraper_activity( + action="scheduled_scraping", + status="info", + description=f"No pending papers found - creating {papers_to_download} dummy pending papers" + ) + + # Create dummy pending papers + for i in range(papers_to_download): + new_paper = PaperMetadata( + title=f"Dummy Pending Paper {random.randint(1000, 9999)}", + doi=f"10.1234/dummy-pending.{random.randint(1000, 9999)}", + journal=random.choice([ + "Nature", "Science", "PLOS ONE", "Journal of Dummy Research", + "Proceedings of the Dummy Society", "Cell", "Dummy Review Letters" + ]), + type="article", + language="en", + published_online=datetime.now().date(), + status="Pending" + ) + db.session.add(new_paper) + + db.session.commit() + + # Get the newly created papers + pending_papers = PaperMetadata.query.filter_by(status="Pending").all() + + # Select papers_to_download random papers from pending_papers + selected_papers = random.sample( + pending_papers, + min(papers_to_download, len(pending_papers)) + ) + + ActivityLog.log_scraper_activity( + action="scheduled_scraping", + status="info", + description=f"Starting scheduled scraping of {len(selected_papers)} papers for hour {datetime.now().hour}" + ) + + # For each paper, schedule it to run at a random time within the hour + current_time = time.time() + one_hour_in_seconds = 3600 + + for paper in selected_papers: + # Random delay within this hour (0 to 60 minutes) + random_delay = random.randint(0, one_hour_in_seconds) + + # Schedule the dummy_process_paper task with the random delay + dummy_process_paper.apply_async( + args=[paper.id], + countdown=random_delay + ) + + return True + + +@celery.task(bind=True) +def dummy_process_paper(self, paper_id): + """ + Process a single paper for the dummy scraper. + + Args: + paper_id (int): ID of the paper to process + """ + # Get the paper from database + paper = PaperMetadata.query.get(paper_id) + if not paper: + # Log error if paper not found + ActivityLog.log_scraper_activity( + action="process_paper", + status="error", + description=f"Paper with ID {paper_id} not found" + ) + return False + + # Simulate random success/failure (70% success rate) + success = random.random() < 0.7 + + # Simulate processing time (1-5 seconds) + process_time = random.uniform(1, 5) + time.sleep(process_time) + + if success: + # Update paper status to "Done" + paper.status = "Done" + paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf" + + # Log success + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="success", + description=f"Successfully processed paper: {paper.title}" + ) + else: + # Update paper status to "Failed" + paper.status = "Failed" + + # Generate random error message + error_message = random.choice([ + "Publisher website unavailable", + "No PDF download link found", + "Access restricted", + "Download timeout", + "Invalid DOI", + "Rate limited by publisher" + ]) + paper.error_msg = error_message + + # Log failure + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="error", + description=f"Failed to process paper: {error_message}" + ) + + # Update the timestamp + paper.updated_at = datetime.utcnow() + + # Commit changes to database + db.session.commit() + + return success \ No newline at end of file diff --git a/scipaperloader/celery.py b/scipaperloader/celery.py index e2e7474..5828e47 100644 --- a/scipaperloader/celery.py +++ b/scipaperloader/celery.py @@ -1,4 +1,5 @@ from celery import Celery +from celery.schedules import crontab # Create Celery instance without Flask app initially celery = Celery( @@ -29,6 +30,14 @@ def configure_celery(app=None): worker_max_memory_per_child=1000000, # 1GB memory limit task_acks_late=True, # Acknowledge tasks after completion task_reject_on_worker_lost=True, # Requeue tasks if worker dies + # Configure Beat schedule for periodic tasks + beat_schedule={ + 'scheduled-scraper-hourly': { + 'task': 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper', + 'schedule': crontab(minute=0), # Run at the start of every hour + 'options': {'expires': 3600} + }, + } ) # Create a custom task class that pushes the Flask application context