diff --git a/celery_worker.py b/celery_worker.py index dbae178..8911111 100644 --- a/celery_worker.py +++ b/celery_worker.py @@ -1,9 +1,11 @@ from scipaperloader.celery import celery, configure_celery # Import all task modules to ensure they are registered with Celery +import scipaperloader.scrapers.tasks # Import new scheduler tasks import scipaperloader.blueprints.scraper # Import the scraper module with our tasks # Configure celery with Flask app configure_celery() if __name__ == '__main__': - celery.start() \ No newline at end of file + # Start the Celery worker + celery.start(['worker', '--loglevel=info', '--concurrency=2']) \ No newline at end of file diff --git a/scipaperloader/blueprints/api.py b/scipaperloader/blueprints/api.py index 435793a..ced4829 100644 --- a/scipaperloader/blueprints/api.py +++ b/scipaperloader/blueprints/api.py @@ -9,7 +9,7 @@ bp = Blueprint("api", __name__, url_prefix="/api") def get_activity_logs(): """Get activity logs with filtering options.""" # Get query parameters - category = request.args.get("category") + categories = request.args.getlist("category") # Changed to getlist for multiple values action = request.args.get("action") after = request.args.get("after") limit = request.args.get("limit", 20, type=int) @@ -17,8 +17,9 @@ def get_activity_logs(): # Build query query = ActivityLog.query - if category: - query = query.filter(ActivityLog.category == category) + if categories: + # Filter by multiple categories using in_() for SQL IN clause + query = query.filter(ActivityLog.category.in_(categories)) if action: query = query.filter(ActivityLog.action == action) diff --git a/scipaperloader/blueprints/config.py b/scipaperloader/blueprints/config.py index ca805df..8fdf4a8 100644 --- a/scipaperloader/blueprints/config.py +++ b/scipaperloader/blueprints/config.py @@ -34,21 +34,8 @@ def _update_volume(new_volume): if new_volume <= 0 or new_volume > MAX_VOLUME: return False, f"Volume must be between 1 and {MAX_VOLUME}", None - volume_config = VolumeConfig.query.first() - if not volume_config: - volume_config = VolumeConfig(volume=new_volume) - db.session.add(volume_config) - else: - old_value = volume_config.volume - volume_config.volume = new_volume - ActivityLog.log_config_change( - config_key="scraper_volume", - old_value=old_value, - new_value=new_volume, - description="Updated scraper volume" - ) - - db.session.commit() + # Use the new class method to set the volume + volume_config = VolumeConfig.set_volume(new_volume) # Invalidate and recalculate the hourly quota cache try: diff --git a/scipaperloader/blueprints/scraper.py b/scipaperloader/blueprints/scraper.py index 07b955a..44e863a 100644 --- a/scipaperloader/blueprints/scraper.py +++ b/scipaperloader/blueprints/scraper.py @@ -1,928 +1,549 @@ -import random -import json -import time -import math -import os # Import os for path joining -from datetime import datetime, timedelta -from flask import Blueprint, jsonify, render_template, request, current_app, flash -# Import the new model -from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig, ScraperState, DownloadPathConfig, ScraperModuleConfig +""" +Simplified scraper blueprint using the new ScraperManager and hourly scheduling system. +""" +from flask import Blueprint, jsonify, render_template, request +from ..models import ActivityLog, PaperMetadata, ScraperState, VolumeConfig +from ..scrapers.manager import ScraperManager +from ..scrapers.factory import get_available_scrapers from ..db import db -from ..celery import celery from ..defaults import MAX_VOLUME -from ..cache_utils import get_cached_hourly_quota, invalidate_hourly_quota_cache -from celery.schedules import crontab -from sqlalchemy import func -from scipaperloader.scrapers.factory import get_scraper, get_available_scrapers +from datetime import datetime, timedelta bp = Blueprint("scraper", __name__, url_prefix="/scraper") -# Track the periodic task ID for proper revocation -PERIODIC_TASK_ID = None - -# Setup periodic task to run every minute for testing purposes -@celery.on_after_configure.connect -def setup_periodic_tasks(sender, **kwargs): - global PERIODIC_TASK_ID - # Run the dummy scraper every minute for testing purposes with a unique task name - # Using a constant name allows us to revoke this specific periodic task when stopping - result = sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), - name='run_dummy_scraper_every_minute') - PERIODIC_TASK_ID = result.id - # Log that we've registered the periodic task with its ID - ActivityLog.log_scraper_command( - action="register_periodic_task", - status="success", - description=f"Registered periodic scraper task with ID: {PERIODIC_TASK_ID}" - ) - -# Function to revoke the periodic task properly -def revoke_periodic_task(): - global PERIODIC_TASK_ID - if PERIODIC_TASK_ID: - try: - # Revoke by ID is more reliable than by name - celery.control.revoke(PERIODIC_TASK_ID, terminate=True) - - # Attempt to revoke by name as a fallback (standard Celery method) - # Note: We're removing the non-standard revoke_from_control method - # and replacing it with a more reliable approach - i = celery.control.inspect() - scheduled = i.scheduled() or {} - - # Look for our periodic task by name in the scheduled tasks - for worker, tasks in scheduled.items(): - for task in tasks: - if task.get('name') == 'run_dummy_scraper_every_minute': - celery.control.revoke(task['id'], terminate=True) - - # Log the action - ActivityLog.log_scraper_command( - action="revoke_periodic_task", - status="success", - description=f"Revoked periodic task with ID: {PERIODIC_TASK_ID}" - ) - return True - except Exception as e: - ActivityLog.log_error( - error_message=f"Failed to revoke periodic task: {str(e)}", - source="revoke_periodic_task" - ) - return False - return False - - -@celery.task -def run_periodic_dummy_scraper(): - """Periodic task to run the dummy scraper if it's active and not paused""" - scraper_state = ScraperState.get_current_state() - # Log every time this runs to track execution - ActivityLog.log_scraper_activity( - action="periodic_check", - status="info", - description=f"Periodic check running. Scraper active: {scraper_state.is_active}, paused: {scraper_state.is_paused}" - ) - - if scraper_state.is_active and not scraper_state.is_paused: - dummy_scheduled_scraper.delay() - return True - return False - +# Initialize the scraper manager +scraper_manager = ScraperManager() @bp.route("/") def index(): - """Render the scraper control panel.""" - volume_config = VolumeConfig.query.first() - - # Ensure we have volume config - if not volume_config: - volume_config = VolumeConfig(volume=100) # Default value - db.session.add(volume_config) - db.session.commit() - - # Get scraper state + """Main scraper page.""" + # Get current scraper state scraper_state = ScraperState.get_current_state() - + + # Get available scrapers + available_scrapers = get_available_scrapers() + + # Get recent activity logs + recent_logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).limit(50).all() + + # Get volume configuration + volume_config = VolumeConfig.get_current_volume() + + # Get paper counts by status + paper_counts = { + 'new': PaperMetadata.query.filter_by(status='New').count(), + 'processing': PaperMetadata.query.filter_by(status='Processing').count(), + 'done': PaperMetadata.query.filter_by(status='Done').count(), + 'failed': PaperMetadata.query.filter_by(status='Failed').count(), + 'pending': PaperMetadata.query.filter_by(status='Pending').count(), + 'retrying': PaperMetadata.query.filter_by(status='Retrying').count(), + } + return render_template( "scraper.html.jinja", + scraper_state=scraper_state, + available_scrapers=available_scrapers, + recent_logs=recent_logs, + paper_counts=paper_counts, volume_config=volume_config, - scraper_active=scraper_state.is_active, - scraper_paused=scraper_state.is_paused, max_volume=MAX_VOLUME ) - @bp.route("/start", methods=["POST"]) def start_scraper(): - """Start the scraper.""" - scraper_state = ScraperState.get_current_state() - - if not scraper_state.is_active: - # Update scraper state - ScraperState.set_active(True) - ScraperState.set_paused(False) - - # Log the action + """Start the hourly scraper scheduling.""" + try: + # Handle both JSON and form data + if request.is_json: + data = request.get_json() or {} + else: + data = request.form.to_dict() + + scraper_name = data.get('scraper_name', 'dummy') + + # Start the scraper using manager + result = scraper_manager.start_scraper() + + if result["status"] == "success": + ActivityLog.log_scraper_command( + action="start_scraper", + status="success", + description="Started scraper with hourly scheduling" + ) + + return jsonify({ + "success": True, + "message": result["message"] + }) + else: + return jsonify({ + "success": False, + "message": result["message"] + }), 400 + + except Exception as e: ActivityLog.log_scraper_command( action="start_scraper", - status="success", - description="Scheduled scraper started - will follow hourly configuration" + status="error", + description=f"Failed to start scraper: {str(e)}" ) - - # Immediately trigger a task to test the scheduler and provide feedback - dummy_scheduled_scraper.delay() - - return jsonify({ - "success": True, - "message": "Scraper started - following hourly schedule configuration" - }) - else: return jsonify({ "success": False, - "message": "Scraper is already running" - }) - - -@bp.route("/stop", methods=["POST"]) -def stop_scraper(): - """Stop the scraper completely.""" - scraper_state = ScraperState.get_current_state() - - if scraper_state.is_active: - # Update scraper state first - ScraperState.set_active(False) - ScraperState.set_paused(False) - - # Stop any running tasks - task_types_to_revoke = [ - 'scipaperloader.blueprints.scraper.process_paper', - 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper', - 'scipaperloader.blueprints.scraper.run_periodic_dummy_scraper' - ] - - # Use our utility function to revoke tasks - revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True) - - # Revoke the periodic task specifically - revoke_periodic_task() - - # Clear all pending tasks from the queue more aggressively - try: - # This purges all tasks in all queues - celery.control.purge() - ActivityLog.log_scraper_command( - action="purge_queue", - status="success", - description="Purged all task queues" - ) - except Exception as e: - ActivityLog.log_error( - error_message=f"Failed to purge task queues: {str(e)}", - source="stop_scraper" - ) - # Fallback to discard_all if purge fails - celery.control.discard_all() - - # Restart the worker to ensure clean state - try: - celery.control.broadcast('pool_restart', arguments={'reload': True}) - ActivityLog.log_scraper_command( - action="restart_worker", - status="success", - description="Worker pool restart requested" - ) - except Exception as e: - ActivityLog.log_error( - error_message=f"Failed to restart worker pool: {str(e)}", - source="stop_scraper" - ) - - ActivityLog.log_scraper_command( - action="stop_scraper", - status="success", - description=f"Scraper stopped manually. Revoked {revoked_count} pending tasks. Worker pool restart requested." - ) - - return jsonify({ - "success": True, - "message": f"Scraper stopped. Revoked {revoked_count} pending tasks and requested worker restart." - }) - else: - return jsonify({ - "success": False, - "message": "Scraper is not running" - }) - + "message": f"Error starting scraper: {str(e)}" + }), 500 @bp.route("/pause", methods=["POST"]) def pause_scraper(): """Pause the scraper.""" - scraper_state = ScraperState.get_current_state() - - if scraper_state.is_active and not scraper_state.is_paused: - # Update scraper state - ScraperState.set_paused(True) - - # Just revoke processing tasks, but leave the periodic tasks running - # so it can continue to check the state (which is now paused) - task_types_to_revoke = [ - 'scipaperloader.blueprints.scraper.process_paper', - 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper' - ] - - # Use our utility function to revoke tasks - revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True) - - # Also clear the queue - celery.control.discard_all() - + try: + result = scraper_manager.pause_scraper() + + if result["status"] == "success": + ActivityLog.log_scraper_command( + action="pause_scraper", + status="success", + description="Scraper paused successfully" + ) + + return jsonify({ + "success": True, + "message": result["message"] + }) + else: + return jsonify({ + "success": False, + "message": result["message"] + }), 400 + + except Exception as e: ActivityLog.log_scraper_command( action="pause_scraper", - status="success", - description=f"Scraper paused manually. Revoked {revoked_count} pending tasks." + status="error", + description=f"Failed to pause scraper: {str(e)}" ) - - return jsonify({ - "success": True, - "message": f"Scraper paused. Revoked {revoked_count} pending tasks." - }) - elif scraper_state.is_active and scraper_state.is_paused: - # Update scraper state - ScraperState.set_paused(False) - - ActivityLog.log_scraper_command( - action="resume_scraper", - status="success", - description="Scraper resumed manually" - ) - - return jsonify({ - "success": True, - "message": "Scraper resumed" - }) - else: return jsonify({ "success": False, - "message": "Scraper is not running" - }) - - -@bp.route("/status") -def scraper_status(): - """Get the current status of the scraper.""" - scraper_state = ScraperState.get_current_state() - - return jsonify({ - "active": scraper_state.is_active, - "paused": scraper_state.is_paused, - "current_hour": datetime.now().hour, - }) - - -@bp.route("/stats") -def scraper_stats(): - """Get scraper statistics for the dashboard.""" - # Get the last 24 hours of activity - hours = 24 - if request.args.get('hours'): - try: - hours = int(request.args.get('hours')) - except ValueError: - pass - - current_time = datetime.utcnow() - # Use timedelta for proper date calculation instead of simple hour subtraction - cutoff_time = (current_time - timedelta(hours=hours)).replace( - minute=0, second=0, microsecond=0 - ) - - # Get activity logs for scraper actions - logs = ActivityLog.query.filter( - ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value, - ActivityLog.timestamp >= cutoff_time - ).all() - - # Group by hour and status - stats = {} - for hour in range(hours): - # Calculate the hour as offset from current time - target_hour = (current_time.hour - hour) % 24 - stats[target_hour] = { - "success": 0, - "error": 0, - "pending": 0, - "hour": target_hour, - } - - for log in logs: - hour = log.timestamp.hour - if hour in stats: - if log.status == "success": - stats[hour]["success"] += 1 - elif log.status == "error": - stats[hour]["error"] += 1 - elif log.status == "pending": - stats[hour]["pending"] += 1 - - # Convert to list for easier consumption by JavaScript - result = [stats[hour] for hour in sorted(stats.keys())] - - return jsonify(result) - - -@bp.route("/update_config", methods=["POST"]) -def update_config(): - """Update scraper configuration.""" - data = request.json + "message": f"Error pausing scraper: {str(e)}" + }), 500 +@bp.route("/stop", methods=["POST"]) +def stop_scraper(): + """Stop the scraper and revert processing papers.""" try: - if "volume" in data: - try: - new_volume = float(data["volume"]) - - # Validate volume value - if new_volume <= 0 or new_volume > MAX_VOLUME: - return jsonify({ - "success": False, - "message": f"Volume must be between 1 and {MAX_VOLUME}" - }) - - volume_config = VolumeConfig.query.first() - if not volume_config: - volume_config = VolumeConfig(volume=new_volume) - db.session.add(volume_config) - else: - old_value = volume_config.volume - volume_config.volume = new_volume - ActivityLog.log_config_change( - config_key="scraper_volume", - old_value=old_value, - new_value=new_volume, - description="Updated scraper volume" - ) - - # Invalidate hourly quota cache when volume changes - invalidate_hourly_quota_cache() - - db.session.commit() - except (ValueError, TypeError): - return jsonify({ - "success": False, - "message": "Invalid volume value" - }) - - return jsonify({"success": True, "message": "Configuration updated"}) - - except Exception as e: - db.session.rollback() - return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"}) - - - - - -@celery.task -def calculate_papers_for_current_hour(): - """ - Calculate how many papers should be downloaded in the current hour - based on schedule configuration. - - Returns: - int: Number of papers to download this hour - """ - current_hour = datetime.now().hour - - # Get volume configuration - volume_config = VolumeConfig.query.first() - if not volume_config: - volume_config = VolumeConfig(volume=100) # Default to 100 papers per day - db.session.add(volume_config) - db.session.commit() - - # Get all schedule configurations to calculate total weight - schedule_configs = ScheduleConfig.query.all() - if not schedule_configs: - # If no schedule configs, create default with equal weights - for hour in range(24): - config = ScheduleConfig(hour=hour, weight=1.0) - db.session.add(config) - db.session.commit() - schedule_configs = ScheduleConfig.query.all() - - # Calculate total weight across all hours - total_weight = sum(config.weight for config in schedule_configs) - - # Find the weight for the current hour - current_hour_config = ScheduleConfig.query.get(current_hour) - if not current_hour_config: - # Create config for current hour if it doesn't exist - current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0) - db.session.add(current_hour_config) - db.session.commit() - - # Calculate papers for current hour: (hour_weight / total_weight) * daily_volume - if total_weight > 0: - weight_ratio = current_hour_config.weight / total_weight - papers_this_hour = math.floor(weight_ratio * volume_config.volume) - else: - papers_this_hour = 0 - - return papers_this_hour - - -@celery.task -def dummy_scheduled_scraper(): - """ - Selects new papers based on the hourly schedule and marks them as Pending. - Then schedules their processing randomly within the hour. - """ - scraper_state = ScraperState.get_current_state() - if not scraper_state.is_active or scraper_state.is_paused: - ActivityLog.log_scraper_activity( - action="dummy_scheduled_scraper_skip", - status="info", - description="Skipping run because scraper is inactive or paused." + result = scraper_manager.stop_scraper() + + # Add debugging to see what the manager returns + print(f"DEBUG: stop_scraper result: {result}") + + # Always log the stop attempt regardless of result + ActivityLog.log_scraper_command( + action="stop_scraper_attempt", + status=result.get("status", "unknown"), + description=f"Stop scraper called - result: {result}" ) - return False # Stop if not active/paused - - # Use cached hourly quota instead of calculating each time - papers_to_select = get_cached_hourly_quota(calculate_papers_for_current_hour) - - if papers_to_select <= 0: - ActivityLog.log_scraper_activity( - action="dummy_scheduled_scraper_info", - status="info", - description=f"Hourly quota is {papers_to_select}. No papers to select this hour." - ) - return True # Nothing to do this hour based on schedule - - # --- Core Logic Change: Select NEW papers --- - try: - # Find "New" papers, select randomly up to the calculated limit - new_papers = PaperMetadata.query.filter_by(status="New") \ - .order_by(func.random()) \ - .limit(papers_to_select) \ - .all() - - if not new_papers: - ActivityLog.log_scraper_activity( - action="dummy_scheduled_scraper_info", - status="info", - description="No 'New' papers found in the database. Stopping scraper." - ) - - # Stop the scraper since there are no more papers to process - ScraperState.set_active(False) + + if result["status"] == "success": ActivityLog.log_scraper_command( - action="auto_stop_scraper", + action="stop_scraper", status="success", - description="Scraper automatically stopped due to no 'New' papers left to process." + description="Scraper stopped and papers reverted to original status" ) - return True - - selected_paper_ids = [p.id for p in new_papers] - - # Update status to "Pending" in bulk for efficiency - PaperMetadata.query.filter(PaperMetadata.id.in_(selected_paper_ids)) \ - .update({"status": "Pending", "updated_at": datetime.utcnow()}, synchronize_session=False) - db.session.commit() - - ActivityLog.log_scraper_activity( - action="select_new_papers", - status="success", - description=f"Selected {len(selected_paper_ids)} 'New' papers and marked as 'Pending'. IDs: {selected_paper_ids}" - ) - - # --- Now schedule processing for the newly selected "Pending" papers --- - # (Using the new modular process_paper task) - # Add random delays for processing within the hour (e.g., up to 3600 seconds) - for paper_id in selected_paper_ids: - delay = random.uniform(1, 3500) # Random delay up to ~58 minutes - process_paper.apply_async(args=[paper_id], countdown=delay) + return jsonify({ + "success": True, + "message": result["message"] + }) + else: + return jsonify({ + "success": False, + "message": result["message"] + }), 400 - ActivityLog.log_scraper_activity( - action="schedule_processing", - status="success", - description=f"Scheduled processing for {len(selected_paper_ids)} papers with random delays." - ) - - return True - except Exception as e: - db.session.rollback() # Rollback DB changes on error - ActivityLog.log_error( - error_message=f"Error in dummy_scheduled_scraper: {str(e)}", - source="dummy_scheduled_scraper" + ActivityLog.log_scraper_command( + action="stop_scraper", + status="error", + description=f"Failed to stop scraper: {str(e)}" ) - return False - - -@celery.task(bind=True) -def process_paper_batch(self, paper_ids): - """ - Process a batch of papers to improve throughput and reduce overhead. - - Args: - paper_ids (list): List of paper IDs to process in this batch - """ - # Check if scraper is still active - scraper_state = ScraperState.get_current_state() - if not scraper_state.is_active or scraper_state.is_paused: - ActivityLog.log_scraper_activity( - action="process_paper_batch", - status="info", - description=f"Skipped batch of {len(paper_ids)} papers because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" - ) - return False - - # Log the batch starting - ActivityLog.log_scraper_activity( - action="process_paper_batch", - status="info", - description=f"Started processing batch of {len(paper_ids)} papers" - ) - - # Process each paper in the batch - results = { - "success": 0, - "failure": 0, - "skipped": 0 - } - - # Begin a transaction for the entire batch - try: - for paper_id in paper_ids: - # Double-check scraper state before each paper - scraper_state = ScraperState.get_current_state() - if not scraper_state.is_active or scraper_state.is_paused: - results["skipped"] += 1 - continue - - # Get the paper from database - paper = PaperMetadata.query.get(paper_id) - if not paper: - results["skipped"] += 1 - continue - - # Simulate random success/failure (70% success rate) - success = random.random() < 0.7 - - # Simulate some processing time (0.5-2 seconds per paper in batch) - time.sleep(random.uniform(0.5, 2)) - - if success: - # Update paper status to "Done" - paper.status = "Done" - paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf" - results["success"] += 1 - - # Log individual paper success with minimal overhead - ActivityLog.log_scraper_activity( - action="process_paper", - paper_id=paper.id, - status="success", - description=f"Processed in batch: {paper.title}" - ) - else: - # Update paper status to "Failed" - paper.status = "Failed" - - # Generate random error message - error_message = random.choice([ - "Publisher website unavailable", - "No PDF download link found", - "Access restricted", - "Download timeout", - "Invalid DOI", - "Rate limited by publisher" - ]) - paper.error_msg = error_message - results["failure"] += 1 - - # Log individual paper failure with minimal overhead - ActivityLog.log_scraper_activity( - action="process_paper", - paper_id=paper.id, - status="error", - description=f"Failed in batch: {error_message}" - ) - - # Update the timestamp - paper.updated_at = datetime.utcnow() - - # Commit the entire batch at once - db.session.commit() - - except Exception as e: - # If any error occurs, roll back the entire batch - db.session.rollback() - ActivityLog.log_error( - error_message=f"Error processing paper batch: {str(e)}", - source="process_paper_batch" - ) - return False - - # Log batch completion - ActivityLog.log_scraper_activity( - action="process_paper_batch", - status="success", - description=f"Completed batch processing: {results['success']} succeeded, {results['failure']} failed, {results['skipped']} skipped" - ) - - return results - + return jsonify({ + "success": False, + "message": f"Error stopping scraper: {str(e)}" + }), 500 @bp.route("/reset", methods=["POST"]) def reset_scraper(): - """ - Reset the scraper completely: - 1. Stop all running tasks - 2. Optionally purge all papers except those with 'Pending' status - 3. Reset scraper state to active and unpaused - 4. Trigger a new scraping cycle - """ - # First stop everything - stop_scraper() - - # Check if we should clear papers - clear_papers = request.json.get('clear_papers', True) if request.is_json else True - - if clear_papers: - try: - # Get all papers that aren't in Pending status - papers = PaperMetadata.query.filter(PaperMetadata.status != "Pending").all() - count = len(papers) - - # Delete them all - for paper in papers: - db.session.delete(paper) - - db.session.commit() - + """Reset the scraper state and revert all processing papers.""" + try: + result = scraper_manager.reset_scraper() + + if result["status"] == "success": ActivityLog.log_scraper_command( action="reset_scraper", status="success", - description=f"Reset scraper and cleared {count} non-pending papers" - ) - except Exception as e: - db.session.rollback() - ActivityLog.log_error( - error_message=f"Failed to reset papers: {str(e)}", - source="reset_scraper" + description="Scraper reset and all processing papers reverted" ) + + return jsonify({ + "success": True, + "message": result["message"] + }) + else: return jsonify({ "success": False, - "message": f"Error clearing papers: {str(e)}" - }) - - # Set state to active and unpaused - ScraperState.set_active(True) - ScraperState.set_paused(False) - - # Re-register the periodic task if needed - setup_periodic_tasks(celery) - - # Kick off a fresh scraping cycle - dummy_scheduled_scraper.delay() - - return jsonify({ - "success": True, - "message": "Scraper has been completely reset and restarted" - }) - -# Common utility function to revoke tasks by type -def revoke_tasks_by_type(task_types, terminate=True): - """ - Revokes all tasks of specified types across scheduled, reserved and active queues. - - Args: - task_types (list): List of task name strings to revoke - terminate (bool): Whether to terminate running tasks - - Returns: - int: Count of revoked tasks - """ - # Get all tasks of all types - i = celery.control.inspect() - scheduled = i.scheduled() or {} - reserved = i.reserved() or {} - active = i.active() or {} - - revoked_count = 0 - - # Revoke all scheduled tasks - for worker, tasks in scheduled.items(): - for task in tasks: - if task['name'] in task_types: - celery.control.revoke(task['id'], terminate=terminate) - revoked_count += 1 - - # Revoke all reserved tasks - for worker, tasks in reserved.items(): - for task in tasks: - if task['name'] in task_types: - celery.control.revoke(task['id'], terminate=terminate) - revoked_count += 1 - - # Revoke all active tasks - for worker, tasks in active.items(): - for task in tasks: - if task['name'] in task_types: - celery.control.revoke(task['id'], terminate=terminate) - revoked_count += 1 - - return revoked_count - -import math -from datetime import datetime -from ..models import VolumeConfig, ScheduleConfig, PaperMetadata # Ensure imports -from ..db import db # Ensure import - -def calculate_papers_for_current_hour(): - """Calculates the target number of papers for the current hour using w*N/24.""" - current_hour = datetime.now().hour - volume_config = VolumeConfig.query.first() - # Use default if not set - volume = volume_config.volume if volume_config else 100 - - current_hour_config = ScheduleConfig.query.filter_by(hour=current_hour).first() - # Use default weight 1.0 if not set for the hour - weight = current_hour_config.weight if current_hour_config else 1.0 - - # Calculate papers using the formula: w * N / 24 - papers_this_hour = math.floor(weight * volume / 24) - - # Log the calculation for debugging - ActivityLog.log_scraper_activity( - action="calculate_hourly_quota", - status="info", - description=f"Hour {current_hour}: weight={weight:.2f}, total_volume={volume}, target_papers={papers_this_hour}" - ) - - return papers_this_hour - - -@celery.task(bind=True) -def process_paper(self, paper_id): - """Process a paper using the configured scraper.""" - from scipaperloader.models import PaperMetadata - paper = PaperMetadata.query.get(paper_id) - if not paper: - return {"status": "error", "message": f"Paper with ID {paper_id} not found"} - - scraper = get_scraper() - result = scraper.scrape(paper.doi) - - return { - "paper_id": paper_id, - "status": result.status, - "message": result.message - } - - -@celery.task(bind=True) -@celery.task(bind=True) -def process_paper_with_scraper(self, paper_id, scraper_module): - """Process a paper using a specific scraper module.""" - from scipaperloader.models import PaperMetadata - import importlib - from ..scrapers.base import BaseScraper - - paper = PaperMetadata.query.get(paper_id) - if not paper: - return {"status": "error", "message": f"Paper with ID {paper_id} not found"} - - try: - # Import the specified scraper module - module = importlib.import_module(f"scipaperloader.scrapers.{scraper_module}") - cls = getattr(module, "Scraper") - - # Validate that it's a BaseScraper - if not issubclass(cls, BaseScraper): - error_msg = f"Scraper class in module '{scraper_module}' does not inherit from BaseScraper" - ActivityLog.log_error( - error_message=error_msg, - source="process_paper_with_scraper" - ) - return {"status": "error", "message": error_msg} + "message": result["message"] + }), 400 - # Instantiate and use the scraper - scraper = cls() - result = scraper.scrape(paper.doi) - - return { - "paper_id": paper_id, - "status": result.status, - "message": result.message, - "scraper": scraper_module - } - - except (ImportError, AttributeError) as e: - error_msg = f"Failed to load scraper module '{scraper_module}': {str(e)}" - ActivityLog.log_error( - error_message=error_msg, - source="process_paper_with_scraper" - ) - return {"status": "error", "message": error_msg} except Exception as e: - error_msg = f"Error processing paper with scraper '{scraper_module}': {str(e)}" - ActivityLog.log_error( - error_message=error_msg, - source="process_paper_with_scraper", - exception=e + ActivityLog.log_scraper_command( + action="reset_scraper", + status="error", + description=f"Failed to reset scraper: {str(e)}" ) - return {"status": "error", "message": error_msg} + return jsonify({ + "success": False, + "message": f"Error resetting scraper: {str(e)}" + }), 500 +@bp.route("/status") +def get_status(): + """Get current scraper status and statistics.""" + try: + scraper_state = ScraperState.get_current_state() + + # Get paper counts by status + paper_counts = { + 'new': PaperMetadata.query.filter_by(status='New').count(), + 'processing': PaperMetadata.query.filter_by(status='Processing').count(), + 'done': PaperMetadata.query.filter_by(status='Done').count(), + 'failed': PaperMetadata.query.filter_by(status='Failed').count(), + 'pending': PaperMetadata.query.filter_by(status='Pending').count(), + 'retrying': PaperMetadata.query.filter_by(status='Retrying').count(), + } + + # Get current hour quota info + current_quota = scraper_manager.get_current_hour_quota() + + return jsonify({ + "success": True, + "scraper_state": { + "active": scraper_state.is_active, + "paused": scraper_state.is_paused, + "last_updated": scraper_state.last_updated.isoformat() if scraper_state.last_updated else None + }, + "paper_counts": paper_counts, + "current_quota": current_quota + }) + + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error getting status: {str(e)}" + }), 500 + +@bp.route("/logs") +def get_logs(): + """Get recent activity logs.""" + try: + limit = request.args.get('limit', 50, type=int) + logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).limit(limit).all() + + return jsonify({ + "success": True, + "logs": [{ + "id": log.id, + "timestamp": log.timestamp.isoformat(), + "action": log.action, + "status": log.status, + "description": log.description, + "category": log.category.name if log.category else None + } for log in logs] + }) + + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error getting logs: {str(e)}" + }), 500 + +@bp.route("/scrapers") +def get_scrapers(): + """Get available scrapers and their configurations.""" + try: + available_scrapers = get_available_scrapers() + scraper_info = [] + + for scraper_dict in available_scrapers: + try: + scraper_class = scraper_dict["class"] + scraper_info.append({ + "name": scraper_dict["name"], + "description": scraper_dict["description"], + "input_statuses": list(scraper_class.INPUT_STATUSES), + "output_status_success": scraper_class.OUTPUT_STATUS_SUCCESS, + "output_status_failure": scraper_class.OUTPUT_STATUS_FAILURE, + "output_status_processing": scraper_class.OUTPUT_STATUS_PROCESSING + }) + except Exception as e: + scraper_info.append({ + "name": scraper_dict.get("name", "unknown"), + "error": f"Failed to load scraper info: {str(e)}" + }) + + return jsonify({ + "success": True, + "scrapers": scraper_info + }) + + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error getting scrapers: {str(e)}" + }), 500 + +@bp.route("/process-papers", methods=["POST"]) +def process_papers_manually(): + """Manually trigger paper processing for current hour.""" + try: + data = request.get_json() or {} + scraper_name = data.get('scraper_name') + + if not scraper_name: + return jsonify({ + "success": False, + "message": "Scraper name is required" + }), 400 + + # Process papers for current hour + papers = scraper_manager.select_papers_for_processing() + processed_count = len(papers) if papers else 0 + + result_msg = f"Manual processing triggered - {processed_count} papers selected for processing" + + ActivityLog.log_scraper_command( + action="manual_process", + status="success", + description=result_msg + ) + + return jsonify({ + "success": True, + "message": result_msg, + "processed_count": processed_count + }) + + except Exception as e: + ActivityLog.log_scraper_command( + action="manual_process", + status="error", + description=f"Failed to manually process papers: {str(e)}" + ) + return jsonify({ + "success": False, + "message": f"Error processing papers: {str(e)}" + }), 500 + +@bp.route("/trigger-immediate", methods=["POST"]) +def trigger_immediate_processing(): + """Trigger immediate processing of papers without waiting for hourly schedule.""" + try: + from ..scrapers.tasks import process_papers_batch + + # Get papers that should be processed this hour + manager = ScraperManager() + papers = manager.select_papers_for_processing() + + if not papers: + return jsonify({ + "success": True, + "message": "No papers available for immediate processing", + "papers_scheduled": 0 + }) + + # Get paper IDs for batch processing + paper_ids = [paper.id for paper in papers] + + # Trigger immediate batch processing (no delay) + task = process_papers_batch.delay(paper_ids) + + ActivityLog.log_scraper_command( + action="trigger_immediate_processing", + status="success", + description=f"Triggered immediate processing of {len(paper_ids)} papers" + ) + + return jsonify({ + "success": True, + "message": f"Immediate processing started for {len(paper_ids)} papers", + "papers_scheduled": len(paper_ids), + "task_id": task.id + }) + + except Exception as e: + ActivityLog.log_scraper_command( + action="trigger_immediate_processing", + status="error", + description=f"Failed to trigger immediate processing: {str(e)}" + ) + return jsonify({ + "success": False, + "message": f"Error triggering immediate processing: {str(e)}" + }), 500 + +@bp.route("/available_scrapers") +def get_available_scrapers_endpoint(): + """Get available scrapers for the UI dropdown.""" + try: + available_scrapers = get_available_scrapers() + + return jsonify({ + "success": True, + "scrapers": [{ + "name": scraper["name"], + "description": scraper["description"], + "is_current": False # Could implement current scraper detection + } for scraper in available_scrapers] + }) + + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error getting scrapers: {str(e)}" + }), 500 + +@bp.route("/stats") +def get_stats(): + """Get scraper statistics for the dashboard.""" + try: + hours = int(request.args.get('hours', 24)) + current_time = datetime.utcnow() + cutoff_time = current_time.replace(minute=0, second=0, microsecond=0) + + # Get activity logs for scraper actions in the last N hours + from ..models import ActivityCategory + start_time = cutoff_time - timedelta(hours=hours) + logs = ActivityLog.query.filter( + ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value, + ActivityLog.timestamp >= start_time + ).all() + + # Group by hour and status + stats = {} + for hour_offset in range(hours): + target_hour = (current_time.hour - hour_offset) % 24 + stats[target_hour] = { + "success": 0, + "error": 0, + "pending": 0, + "hour": target_hour, + } + + for log in logs: + hour = log.timestamp.hour + if hour in stats: + if log.status == "success": + stats[hour]["success"] += 1 + elif log.status == "error": + stats[hour]["error"] += 1 + elif log.status in ("pending", "info"): + stats[hour]["pending"] += 1 + + # Convert to list for easier consumption by JavaScript + result = [stats[hour] for hour in sorted(stats.keys())] + return jsonify(result) + + except Exception as e: + return jsonify({ + "success": False, + "message": f"Error getting stats: {str(e)}" + }), 500 @bp.route("/process_single/", methods=["POST"]) -def process_single_paper(paper_id): +def process_single_paper_endpoint(paper_id): """Process a single paper by ID.""" try: - # Check if paper exists + data = request.get_json() or {} + scraper_name = data.get('scraper_module') + + # Get the paper paper = PaperMetadata.query.get(paper_id) if not paper: return jsonify({ "success": False, - "message": f"Paper with ID {paper_id} not found" - }) + "message": "Paper not found" + }), 404 - # Get the scraper module name from the request - scraper_module = None - if request.is_json and request.json: - scraper_module = request.json.get('scraper_module') + # Process the paper using the manager + result = scraper_manager.process_paper(paper) - # Update status to Pending - old_status = paper.status - paper.status = "Pending" - paper.updated_at = datetime.utcnow() - db.session.commit() - - # Log that we're processing this paper - ActivityLog.log_scraper_activity( - action="manual_process_paper", - paper_id=paper_id, - status="pending", - description=f"Manual processing initiated for paper: {paper.title}" + - (f" using {scraper_module} scraper" if scraper_module else "") + ActivityLog.log_scraper_command( + action="manual_process_single", + status="success", + description=f"Manually processed paper {paper.doi}" ) - # Start the task (without delay since it's manual) - if scraper_module: - task = process_paper_with_scraper.delay(paper_id, scraper_module) - else: - task = process_paper.delay(paper_id) - return jsonify({ "success": True, - "task_id": task.id, - "message": f"Processing paper '{paper.title}' (ID: {paper_id})" + - (f" using {scraper_module} scraper" if scraper_module else "") + - f". Previous status: {old_status}" + "message": f"Processing started for paper {paper.doi}", + "paper_id": paper_id }) except Exception as e: - db.session.rollback() - ActivityLog.log_error( - error_message=f"Failed to process paper {paper_id}: {str(e)}", - exception=e, - source="process_single_paper" + ActivityLog.log_scraper_command( + action="manual_process_single", + status="error", + description=f"Failed to process paper {paper_id}: {str(e)}" ) return jsonify({ "success": False, - "message": f"Error: {str(e)}" - }) + "message": f"Error processing paper: {str(e)}" + }), 500 -@bp.route("/available_scrapers") -def available_scrapers(): - """Get list of available scraper modules.""" - from scipaperloader.scrapers.factory import get_available_scrapers - from ..models import ScraperModuleConfig - +@bp.route("/update_config", methods=["POST"]) +def update_scraper_config(): + """Update scraper configuration.""" try: - scrapers = get_available_scrapers() - current_module = ScraperModuleConfig.get_current_module() + data = request.get_json() or {} + + # Handle volume configuration updates for daily quota + if "volume" in data: + # Import the helper function from config module + from .config import _update_volume + + new_volume = data["volume"] + success, message, volume_config = _update_volume(new_volume) + + if success: + ActivityLog.log_scraper_command( + action="update_volume_config", + status="success", + description=f"Updated daily volume to {new_volume} papers per day" + ) + + return jsonify({ + "success": True, + "message": message + }) + else: + return jsonify({ + "success": False, + "message": message + }), 400 + + # Handle other configuration updates here if needed in the future return jsonify({ "success": True, - "scrapers": [ - { - "name": s["name"], - "description": s["description"], - "is_current": s["name"] == current_module - } for s in scrapers - ], - "current": current_module + "message": "Configuration updated successfully" }) except Exception as e: - ActivityLog.log_error( - error_message=f"Failed to get available scrapers: {str(e)}", - source="available_scrapers" + ActivityLog.log_scraper_command( + action="update_scraper_config", + status="error", + description=f"Failed to update scraper config: {str(e)}" ) return jsonify({ "success": False, - "message": f"Error: {str(e)}", - "scrapers": [] - }) + "message": f"Error updating scraper config: {str(e)}" + }), 500 \ No newline at end of file diff --git a/scipaperloader/celery.py b/scipaperloader/celery.py index 5828e47..f6e7d02 100644 --- a/scipaperloader/celery.py +++ b/scipaperloader/celery.py @@ -32,8 +32,8 @@ def configure_celery(app=None): task_reject_on_worker_lost=True, # Requeue tasks if worker dies # Configure Beat schedule for periodic tasks beat_schedule={ - 'scheduled-scraper-hourly': { - 'task': 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper', + 'hourly-scraper-scheduler': { + 'task': 'scipaperloader.scrapers.tasks.hourly_scraper_scheduler', 'schedule': crontab(minute=0), # Run at the start of every hour 'options': {'expires': 3600} }, diff --git a/scipaperloader/models.py b/scipaperloader/models.py index 7b3c25d..8ddb780 100644 --- a/scipaperloader/models.py +++ b/scipaperloader/models.py @@ -91,12 +91,13 @@ class ActivityLog(db.Model): return log @classmethod - def log_scraper_command(cls, action, status=None, user_id=None, **extra): + def log_scraper_command(cls, action, status=None, description=None, user_id=None, **extra): """Log a scraper command (start/stop/pause).""" log = cls( category=ActivityCategory.SCRAPER_COMMAND.value, action=action, status=status, + description=description, user_id=user_id ) log.set_extra_data(extra) @@ -191,6 +192,7 @@ class PaperMetadata(db.Model): language = db.Column(db.String(50)) published_online = db.Column(db.Date) # or DateTime/String status = db.Column(db.String(10)) # 'Pending','Done','Failed' + previous_status = db.Column(db.String(10), nullable=True) # Store previous status for reversion file_path = db.Column(db.Text) error_msg = db.Column(db.Text) created_at = db.Column(db.DateTime, default=db.func.current_timestamp()) @@ -209,6 +211,35 @@ class ScheduleConfig(db.Model): class VolumeConfig(db.Model): id = db.Column(db.Integer, primary_key=True) volume = db.Column(db.Float) # volume of papers to scrape per day + + @classmethod + def get_current_volume(cls): + """Get the current volume configuration, creating default if needed.""" + config = cls.query.first() + if not config: + config = cls(volume=100) + db.session.add(config) + db.session.commit() + return config.volume + + @classmethod + def set_volume(cls, new_volume): + """Set the volume configuration.""" + config = cls.query.first() + if not config: + config = cls(volume=new_volume) + db.session.add(config) + else: + old_value = config.volume + config.volume = new_volume + ActivityLog.log_config_change( + config_key="scraper_volume", + old_value=old_value, + new_value=new_volume, + description="Updated scraper volume configuration" + ) + db.session.commit() + return config class DownloadPathConfig(db.Model): """Model to store the base path for downloaded files.""" @@ -220,7 +251,7 @@ class DownloadPathConfig(db.Model): """Get the configured download path, creating default if needed.""" config = cls.query.first() if not config: - config = cls(path="/path/to/dummy/papers") # Ensure default exists + config = cls(path="/tmp/") # Ensure default exists db.session.add(config) db.session.commit() return config.path @@ -341,6 +372,7 @@ def init_schedule_config(): default_volume = VolumeConfig(volume=100) db.session.add(default_volume) db.session.commit() + # Initialize DownloadPathConfig if it doesn't exist if DownloadPathConfig.query.count() == 0: diff --git a/scipaperloader/scrapers/__init__.py b/scipaperloader/scrapers/__init__.py index 0e80737..160b631 100644 --- a/scipaperloader/scrapers/__init__.py +++ b/scipaperloader/scrapers/__init__.py @@ -1,2 +1,18 @@ # This package contains all scraper modules. # Each scraper should implement the BaseScraper interface from base.py. + +from .base import BaseScraper, ScrapeResult +from .factory import get_scraper, get_available_scrapers +from .manager import ScraperManager +from .dummy import Scraper as DummyScraper +from .failed_retry import Scraper as FailedRetryScraper + +__all__ = [ + 'BaseScraper', + 'ScrapeResult', + 'get_scraper', + 'get_available_scrapers', + 'ScraperManager', + 'DummyScraper', + 'FailedRetryScraper' +] diff --git a/scipaperloader/scrapers/base.py b/scipaperloader/scrapers/base.py index 5cf443c..341be94 100644 --- a/scipaperloader/scrapers/base.py +++ b/scipaperloader/scrapers/base.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import NamedTuple, Optional, Dict +from typing import NamedTuple, Optional, Dict, List from datetime import datetime class ScrapeResult(NamedTuple): @@ -12,6 +12,12 @@ class ScrapeResult(NamedTuple): class BaseScraper(ABC): """Base class for all scraper implementations.""" + # Default input/output statuses - can be overridden by subclasses + INPUT_STATUSES = ["New"] # Which paper statuses this scraper will process + OUTPUT_STATUS_SUCCESS = "Done" # Status to set on successful scraping + OUTPUT_STATUS_FAILURE = "Failed" # Status to set on failed scraping + OUTPUT_STATUS_PROCESSING = "Pending" # Status to set while processing + @abstractmethod def scrape(self, doi: str) -> ScrapeResult: """ @@ -32,3 +38,15 @@ class BaseScraper(ABC): def get_description(self) -> str: """Return a description of this scraper.""" return getattr(self.__class__, "__doc__", "No description available") + + def get_input_statuses(self) -> List[str]: + """Return list of paper statuses this scraper can process.""" + return self.INPUT_STATUSES + + def get_output_statuses(self) -> Dict[str, str]: + """Return mapping of result types to output statuses.""" + return { + "success": self.OUTPUT_STATUS_SUCCESS, + "failure": self.OUTPUT_STATUS_FAILURE, + "processing": self.OUTPUT_STATUS_PROCESSING + } diff --git a/scipaperloader/scrapers/dummy.py b/scipaperloader/scrapers/dummy.py index 2192bda..f77f193 100644 --- a/scipaperloader/scrapers/dummy.py +++ b/scipaperloader/scrapers/dummy.py @@ -10,6 +10,12 @@ from ..db import db class Scraper(BaseScraper): """Dummy scraper for testing purposes that simulates paper downloading.""" + # This scraper processes "New" papers and outputs "Done"/"Failed" + INPUT_STATUSES = ["New"] + OUTPUT_STATUS_SUCCESS = "Done" + OUTPUT_STATUS_FAILURE = "Failed" + OUTPUT_STATUS_PROCESSING = "Pending" + def scrape(self, doi: str) -> ScrapeResult: """Simulate scraping a paper with realistic timing and random success/failure.""" start_time = time.time() diff --git a/scipaperloader/scrapers/factory.py b/scipaperloader/scrapers/factory.py index 080612f..efbdb65 100644 --- a/scipaperloader/scrapers/factory.py +++ b/scipaperloader/scrapers/factory.py @@ -1,5 +1,4 @@ import importlib -from flask import current_app from .base import BaseScraper def get_scraper() -> BaseScraper: @@ -7,10 +6,16 @@ def get_scraper() -> BaseScraper: from ..models import ScraperModuleConfig, ActivityLog try: - # Get module name from database first, fallback to config + # Get module name from database first, fallback to dummy name = ScraperModuleConfig.get_current_module() if not name: - name = current_app.config.get("SCRAPER_MODULE", "dummy") + # Only try to access Flask config if we're in app context + try: + from flask import current_app + name = current_app.config.get("SCRAPER_MODULE", "dummy") + except RuntimeError: + # No app context, use dummy + name = "dummy" module = importlib.import_module(f"scipaperloader.scrapers.{name}") cls = getattr(module, "Scraper") diff --git a/scipaperloader/scrapers/failed_retry.py b/scipaperloader/scrapers/failed_retry.py new file mode 100644 index 0000000..916eb16 --- /dev/null +++ b/scipaperloader/scrapers/failed_retry.py @@ -0,0 +1,123 @@ +import time +import random +import os +from datetime import datetime +from .base import BaseScraper, ScrapeResult +from flask import current_app +from ..models import PaperMetadata, ActivityLog, DownloadPathConfig +from ..db import db + +class Scraper(BaseScraper): + """Retry scraper that attempts to re-process failed papers with different strategies.""" + + # This scraper specifically targets "Failed" papers and retries them + INPUT_STATUSES = ["Failed"] + OUTPUT_STATUS_SUCCESS = "Done" + OUTPUT_STATUS_FAILURE = "Failed" + OUTPUT_STATUS_PROCESSING = "Retrying" + + def scrape(self, doi: str) -> ScrapeResult: + """Retry scraping a failed paper with enhanced error handling.""" + start_time = time.time() + + paper = PaperMetadata.query.filter_by(doi=doi).first() + if not paper: + return ScrapeResult( + status="error", + message=f"No paper found for DOI {doi}", + data=None, + duration=time.time() - start_time, + timestamp=datetime.utcnow() + ) + + # Log retry attempt + ActivityLog.log_scraper_activity( + action="retry_failed_paper", + status="info", + description=f"Retrying failed paper: {paper.title}", + paper_id=paper.id + ) + + # Simulate longer processing time for retry (2-5 seconds) + processing_time = random.uniform(2, 5) + time.sleep(processing_time) + + # Simulate 60% success rate on retry (lower than initial attempt) + success = random.random() < 0.6 + + result_data = {} + + if success: + # Get download path and create dummy file + download_path = DownloadPathConfig.get_path() + file_name = f"{doi.replace('/', '_')}_retry.pdf" + file_path = f"{download_path}/{file_name}" + + try: + # Ensure directory exists + os.makedirs(download_path, exist_ok=True) + + # Create a dummy PDF file + with open(file_path, 'w') as f: + f.write(f"Dummy PDF content for retry of {doi}") + + result_data = {"file_path": file_path} + + # Log success + ActivityLog.log_scraper_activity( + action="retry_scrape_success", + status="success", + description=f"Successfully retried {doi} on second attempt", + paper_id=paper.id + ) + + result = ScrapeResult( + status="success", + message=f"Successfully retried paper {doi}", + data=result_data, + duration=time.time() - start_time, + timestamp=datetime.utcnow() + ) + + except Exception as e: + error_msg = f"Failed to save retry file: {str(e)}" + ActivityLog.log_scraper_activity( + action="retry_scrape_file_error", + status="error", + description=error_msg, + paper_id=paper.id + ) + + result = ScrapeResult( + status="error", + message=error_msg, + data=None, + duration=time.time() - start_time, + timestamp=datetime.utcnow() + ) + else: + # Retry failed - generate different error message + error_messages = [ + "Retry failed: Still no access to publisher", + "Retry failed: Alternative download methods exhausted", + "Retry failed: DOI appears permanently inaccessible", + "Retry failed: Network timeout persists" + ] + error_msg = random.choice(error_messages) + + ActivityLog.log_scraper_activity( + action="retry_scrape_failure", + status="error", + description=f"Retry failed for {doi}: {error_msg}", + paper_id=paper.id + ) + + result = ScrapeResult( + status="error", + message=error_msg, + data=None, + duration=time.time() - start_time, + timestamp=datetime.utcnow() + ) + + return result diff --git a/scipaperloader/scrapers/manager.py b/scipaperloader/scrapers/manager.py new file mode 100644 index 0000000..e07f80c --- /dev/null +++ b/scipaperloader/scrapers/manager.py @@ -0,0 +1,747 @@ +""" +Simplified scraper management system with hourly quota scheduling. +""" + +import random +import math +import redis +from datetime import datetime, timedelta +from typing import List, Dict, Optional +from sqlalchemy import func + +from ..models import ( + PaperMetadata, + ScheduleConfig, + VolumeConfig, + ScraperState, + ActivityLog, + ScraperModuleConfig +) +from ..db import db +from ..cache_utils import get_cached_hourly_quota +from .factory import get_scraper, get_available_scrapers +from ..celery import celery + + +class ScraperManager: + """Manages scraper operations with hourly quota-based scheduling.""" + + def __init__(self): + self.current_scraper = None + self.pending_papers = [] # Track papers being processed + # Initialize Redis client for delayed task management + self.redis_client = None + self._init_redis_client() + + def _init_redis_client(self): + """Initialize Redis client for delayed task management.""" + try: + # Use same Redis configuration as Celery + self.redis_client = redis.Redis( + host='localhost', + port=6379, + db=0, + decode_responses=True + ) + # Test connection + self.redis_client.ping() + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to initialize Redis client: {str(e)}", + source="ScraperManager._init_redis_client" + ) + self.redis_client = None + + def _clear_delayed_tasks_from_redis(self) -> int: + """Clear delayed tasks from Redis structures used by Celery. + + Based on analysis, Celery stores delayed tasks in: + - 'unacked_index': Sorted set containing task IDs with execution timestamps + - 'unacked': Hash containing task data keyed by task ID + + Returns: + int: Number of delayed tasks cleared + """ + if not self.redis_client: + try: + ActivityLog.log_error( + error_message="Redis client not available - cannot clear delayed tasks", + source="ScraperManager._clear_delayed_tasks_from_redis" + ) + except RuntimeError: + # Working outside application context - just print instead + print("❌ Redis client not available - cannot clear delayed tasks") + return 0 + + cleared_count = 0 + try: + # Define scraper task patterns to identify our tasks + scraper_patterns = [ + 'process_single_paper', + 'process_papers_batch', + 'hourly_scraper_scheduler' + ] + + try: + ActivityLog.log_scraper_activity( + action="check_delayed_tasks", + status="info", + description="Checking Celery delayed task structures (unacked_index, unacked)" + ) + except RuntimeError: + print("🔍 Checking Celery delayed task structures (unacked_index, unacked)") + + # Check 'unacked_index' (sorted set with task IDs and timestamps) + unacked_index_cleared = 0 + if self.redis_client.exists('unacked_index'): + try: + # Get all task IDs from the sorted set + task_ids = self.redis_client.zrange('unacked_index', 0, -1) + + if task_ids: + try: + ActivityLog.log_scraper_activity( + action="scan_unacked_index", + status="info", + description=f"Found {len(task_ids)} tasks in 'unacked_index'" + ) + except RuntimeError: + print(f"📋 Found {len(task_ids)} tasks in 'unacked_index'") + + # Check each task ID against the 'unacked' hash to get task details + scraper_task_ids = [] + for task_id in task_ids: + try: + # Get task data from 'unacked' hash + task_data = self.redis_client.hget('unacked', task_id) + if task_data: + # Check if this task contains any of our scraper patterns + if any(pattern in str(task_data) for pattern in scraper_patterns): + scraper_task_ids.append(task_id) + except Exception: + # Skip individual task errors + continue + + # Remove scraper task IDs from both structures + for task_id in scraper_task_ids: + try: + # Remove from unacked_index (sorted set) + removed_from_index = self.redis_client.zrem('unacked_index', task_id) + # Remove from unacked (hash) + removed_from_hash = self.redis_client.hdel('unacked', task_id) + + if removed_from_index or removed_from_hash: + unacked_index_cleared += 1 + + except Exception as e: + try: + ActivityLog.log_error( + error_message=f"Error removing delayed task {task_id}: {str(e)}", + source="ScraperManager._clear_delayed_tasks_from_redis" + ) + except RuntimeError: + print(f"❌ Error removing delayed task {task_id}: {str(e)}") + continue + + cleared_count += unacked_index_cleared + + if unacked_index_cleared > 0: + try: + ActivityLog.log_scraper_activity( + action="clear_unacked_tasks", + status="success", + description=f"Cleared {unacked_index_cleared} scraper tasks from unacked structures" + ) + except RuntimeError: + print(f"✅ Cleared {unacked_index_cleared} scraper tasks from unacked structures") + else: + try: + ActivityLog.log_scraper_activity( + action="check_unacked_index", + status="info", + description="No tasks found in 'unacked_index'" + ) + except RuntimeError: + print("â„šī¸ No tasks found in 'unacked_index'") + + except Exception as e: + try: + ActivityLog.log_error( + error_message=f"Error accessing 'unacked_index': {str(e)}", + source="ScraperManager._clear_delayed_tasks_from_redis" + ) + except RuntimeError: + print(f"❌ Error accessing 'unacked_index': {str(e)}") + else: + try: + ActivityLog.log_scraper_activity( + action="check_unacked_index", + status="info", + description="'unacked_index' key does not exist - no delayed tasks" + ) + except RuntimeError: + print("â„šī¸ 'unacked_index' key does not exist - no delayed tasks") + + # Also check the 'celery' queue for immediate tasks (backup check) + celery_cleared = 0 + try: + queue_length = self.redis_client.llen('celery') + if queue_length and queue_length > 0: + # Scan for any scraper tasks in the immediate queue + scraper_tasks = [] + for i in range(queue_length): + try: + task_data = self.redis_client.lindex('celery', i) + if task_data and any(pattern in str(task_data) for pattern in scraper_patterns): + scraper_tasks.append(task_data) + except Exception: + continue + + # Remove scraper tasks from celery queue + for task_data in scraper_tasks: + try: + removed_count = self.redis_client.lrem('celery', 0, task_data) + celery_cleared += removed_count + except Exception: + continue + + cleared_count += celery_cleared + + if celery_cleared > 0: + try: + ActivityLog.log_scraper_activity( + action="clear_celery_tasks", + status="success", + description=f"Cleared {celery_cleared} scraper tasks from 'celery' queue" + ) + except RuntimeError: + print(f"✅ Cleared {celery_cleared} scraper tasks from 'celery' queue") + + except Exception as e: + try: + ActivityLog.log_error( + error_message=f"Error checking 'celery' queue: {str(e)}", + source="ScraperManager._clear_delayed_tasks_from_redis" + ) + except RuntimeError: + print(f"❌ Error checking 'celery' queue: {str(e)}") + + # Summary + if cleared_count > 0: + try: + ActivityLog.log_scraper_activity( + action="clear_delayed_tasks_complete", + status="success", + description=f"Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})" + ) + except RuntimeError: + print(f"✅ Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})") + else: + try: + ActivityLog.log_scraper_activity( + action="clear_delayed_tasks_complete", + status="info", + description="No delayed scraper tasks found to clear in Redis" + ) + except RuntimeError: + print("â„šī¸ No delayed scraper tasks found to clear in Redis") + + return cleared_count + + except Exception as e: + try: + ActivityLog.log_error( + error_message=f"Failed to clear delayed tasks from Redis: {str(e)}", + source="ScraperManager._clear_delayed_tasks_from_redis" + ) + except RuntimeError: + print(f"❌ Failed to clear delayed tasks from Redis: {str(e)}") + return 0 + + def start_scraper(self) -> Dict[str, str]: + """Start the scraper system.""" + try: + # Get current scraper + self.current_scraper = get_scraper() + + # Activate scraper state + ScraperState.set_active(True) + ScraperState.set_paused(False) + + scraper_name = self.current_scraper.get_name() + + ActivityLog.log_scraper_command( + action="start_scraper", + status="success", + description=f"Started scraper: {scraper_name}. Use /trigger-immediate endpoint to immediately schedule papers instead of waiting for the next hourly boundary." + ) + + return {"status": "success", "message": "Scraper started successfully. Papers will be scheduled at the next hourly boundary, or use /trigger-immediate to schedule immediately."} + + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to start scraper: {str(e)}", + source="ScraperManager.start_scraper" + ) + return {"status": "error", "message": str(e)} + + def pause_scraper(self) -> Dict[str, str]: + """Pause the scraper system.""" + try: + ScraperState.set_paused(True) + + ActivityLog.log_scraper_command( + action="pause_scraper", + status="success", + description="Scraper paused - processing will halt" + ) + + return {"status": "success", "message": "Scraper paused"} + + except Exception as e: + return {"status": "error", "message": str(e)} + + def resume_scraper(self) -> Dict[str, str]: + """Resume the scraper system.""" + try: + ScraperState.set_paused(False) + + ActivityLog.log_scraper_command( + action="resume_scraper", + status="success", + description="Scraper resumed - processing will continue" + ) + + return {"status": "success", "message": "Scraper resumed"} + + except Exception as e: + return {"status": "error", "message": str(e)} + + def stop_scraper(self) -> Dict[str, str]: + """Stop the scraper, revoke all running tasks, and revert pending papers.""" + try: + # First, revoke all running tasks + revoked_count = 0 + delayed_cleared_count = 0 + + ActivityLog.log_scraper_command( + action="stop_scraper_start", + status="info", + description="Beginning scraper stop process with task revocation and delayed task clearing" + ) + + try: + # Get Celery inspector to check for running tasks + i = celery.control.inspect() + active = i.active() or {} + scheduled = i.scheduled() or {} + reserved = i.reserved() or {} + + # Revoke active tasks + for worker, tasks in active.items(): + for task in tasks: + if 'id' in task: + celery.control.revoke(task['id'], terminate=True) + revoked_count += 1 + ActivityLog.log_scraper_activity( + action="revoke_task", + status="success", + description=f"Revoked active task: {task.get('name', 'unknown')} (ID: {task['id']})" + ) + + # Revoke scheduled tasks + for worker, tasks in scheduled.items(): + for task in tasks: + if 'id' in task: + celery.control.revoke(task['id'], terminate=True) + revoked_count += 1 + ActivityLog.log_scraper_activity( + action="revoke_task", + status="success", + description=f"Revoked scheduled task: {task.get('name', 'unknown')} (ID: {task['id']})" + ) + + # Revoke reserved tasks + for worker, tasks in reserved.items(): + for task in tasks: + if 'id' in task: + celery.control.revoke(task['id'], terminate=True) + revoked_count += 1 + ActivityLog.log_scraper_activity( + action="revoke_task", + status="success", + description=f"Revoked reserved task: {task.get('name', 'unknown')} (ID: {task['id']})" + ) + + # Purge all task queues + celery.control.purge() + ActivityLog.log_scraper_activity( + action="purge_queues", + status="success", + description="Purged all task queues" + ) + + # **NEW: Clear delayed tasks from Redis sorted sets** + delayed_cleared_count = self._clear_delayed_tasks_from_redis() + + # Additional cleanup: revoke any remaining scraper-related tasks by name pattern + try: + # Use broadcast to revoke tasks that match scraper patterns + scraper_task_patterns = [ + 'process_single_paper', + 'process_papers_batch', + 'hourly_scraper_scheduler' + ] + + # Get a fresh inspection of tasks after purge + fresh_inspect = celery.control.inspect() + all_tasks = {} + all_tasks.update(fresh_inspect.active() or {}) + all_tasks.update(fresh_inspect.scheduled() or {}) + all_tasks.update(fresh_inspect.reserved() or {}) + + additional_revoked = 0 + for worker, tasks in all_tasks.items(): + for task in tasks: + task_name = task.get('name', '') + task_id = task.get('id', '') + if any(pattern in task_name for pattern in scraper_task_patterns) and task_id: + celery.control.revoke(task_id, terminate=True) + additional_revoked += 1 + ActivityLog.log_scraper_activity( + action="revoke_scraper_task", + status="success", + description=f"Revoked lingering scraper task: {task_name} (ID: {task_id})" + ) + + if additional_revoked > 0: + ActivityLog.log_scraper_activity( + action="cleanup_scraper_tasks", + status="success", + description=f"Additional cleanup: revoked {additional_revoked} lingering scraper tasks" + ) + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error during additional scraper task cleanup: {str(e)}", + source="ScraperManager.stop_scraper.cleanup" + ) + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error revoking tasks: {str(e)}", + source="ScraperManager.stop_scraper" + ) + # Continue with paper reversion even if task revocation fails + + # Get current scraper to know what status to revert to + scraper = get_scraper() + input_statuses = scraper.get_input_statuses() + + # Find papers that are currently being processed + processing_status = scraper.get_output_statuses()["processing"] + pending_papers = PaperMetadata.query.filter_by(status=processing_status).all() + + # Revert their status to the first input status + reverted_count = 0 + if pending_papers and input_statuses: + revert_status = input_statuses[0] # Use first input status as default + + for paper in pending_papers: + # Try to use previous_status if available, otherwise use first input status + if hasattr(paper, 'previous_status') and paper.previous_status: + paper.status = paper.previous_status + else: + paper.status = revert_status + paper.updated_at = datetime.utcnow() + reverted_count += 1 + + db.session.commit() + + ActivityLog.log_scraper_activity( + action="revert_pending_papers", + status="success", + description=f"Reverted {reverted_count} papers from '{processing_status}' to previous status" + ) + + # Deactivate scraper + ScraperState.set_active(False) + ScraperState.set_paused(False) + + ActivityLog.log_scraper_command( + action="stop_scraper", + status="success", + description=f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers." + ) + + return { + "status": "success", + "message": f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers to previous status." + } + + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to stop scraper: {str(e)}", + source="ScraperManager.stop_scraper" + ) + return {"status": "error", "message": str(e)} + + def reset_scraper(self) -> Dict[str, str]: + """Reset scraper state, revoke all running tasks, and clear all processing statuses.""" + try: + # First, revoke all running tasks (similar to stop_scraper) + revoked_count = 0 + + ActivityLog.log_scraper_command( + action="reset_scraper_start", + status="info", + description="Beginning scraper reset process with task revocation" + ) + + try: + # Get Celery inspector to check for running tasks + i = celery.control.inspect() + active = i.active() or {} + scheduled = i.scheduled() or {} + reserved = i.reserved() or {} + + # Revoke all tasks (active, scheduled, reserved) + for queue_name, queue_tasks in [("active", active), ("scheduled", scheduled), ("reserved", reserved)]: + for worker, tasks in queue_tasks.items(): + for task in tasks: + if 'id' in task: + celery.control.revoke(task['id'], terminate=True) + revoked_count += 1 + ActivityLog.log_scraper_activity( + action="revoke_task", + status="success", + description=f"Revoked {queue_name} task: {task.get('name', 'unknown')} (ID: {task['id']})" + ) + + # Purge all task queues + celery.control.purge() + ActivityLog.log_scraper_activity( + action="purge_queues", + status="success", + description="Purged all task queues during reset" + ) + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error revoking tasks during reset: {str(e)}", + source="ScraperManager.reset_scraper" + ) + # Continue with paper reversion even if task revocation fails + + # Get current scraper configuration + scraper = get_scraper() + input_statuses = scraper.get_input_statuses() + processing_status = scraper.get_output_statuses()["processing"] + + # Reset all papers in processing status + pending_papers = PaperMetadata.query.filter_by(status=processing_status).all() + reverted_count = 0 + + if pending_papers and input_statuses: + revert_status = input_statuses[0] + + for paper in pending_papers: + # Try to use previous_status if available, otherwise use first input status + if hasattr(paper, 'previous_status') and paper.previous_status: + paper.status = paper.previous_status + else: + paper.status = revert_status + paper.updated_at = datetime.utcnow() + paper.error_msg = None # Clear any error messages + reverted_count += 1 + + db.session.commit() + + # Reset scraper state + ScraperState.set_active(False) + ScraperState.set_paused(False) + + ActivityLog.log_scraper_command( + action="reset_scraper", + status="success", + description=f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers." + ) + + return { + "status": "success", + "message": f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers to original status." + } + + except Exception as e: + return {"status": "error", "message": str(e)} + + def get_current_hour_quota(self) -> int: + """Calculate papers to process in current hour based on schedule.""" + try: + return get_cached_hourly_quota(self._calculate_papers_for_current_hour) + except Exception as e: + ActivityLog.log_error( + error_message=f"Error calculating hourly quota: {str(e)}", + source="ScraperManager.get_current_hour_quota" + ) + return 0 + + def _calculate_papers_for_current_hour(self) -> int: + """Internal method to calculate hourly quota.""" + try: + # Get current hour and volume config + current_hour = datetime.now().hour + volume_config = VolumeConfig.get_current_volume() + daily_volume = volume_config if volume_config else 100 + + # Get schedule config for current hour + schedule_config = ScheduleConfig.query.filter_by(hour=current_hour).first() + current_weight = schedule_config.weight if schedule_config else 1.0 + + # Get total weight across all hours + total_weight = db.session.query(func.sum(ScheduleConfig.weight)).scalar() or 24.0 + + # Calculate quota: (current_weight / total_weight) * daily_volume + quota = math.ceil((current_weight / total_weight) * daily_volume) + + ActivityLog.log_scraper_activity( + action="calculate_hourly_quota", + status="info", + description=f"Hour {current_hour}: quota={quota} (weight={current_weight}, total_weight={total_weight}, daily_volume={daily_volume})" + ) + + return max(1, quota) # Ensure at least 1 paper per hour + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error in quota calculation: {str(e)}", + source="ScraperManager._calculate_papers_for_current_hour" + ) + return 1 # Fallback to 1 paper per hour + + def select_papers_for_processing(self, limit: Optional[int] = None) -> List[PaperMetadata]: + """Select papers for processing based on current scraper configuration.""" + try: + scraper = get_scraper() + input_statuses = scraper.get_input_statuses() + + if not input_statuses: + return [] + + # Use provided limit or calculate from hourly quota + papers_needed = limit if limit is not None else self.get_current_hour_quota() + + # Query papers with input statuses, randomize selection + papers = (PaperMetadata.query + .filter(PaperMetadata.status.in_(input_statuses)) + .order_by(func.random()) + .limit(papers_needed) + .all()) + + ActivityLog.log_scraper_activity( + action="select_papers", + status="info", + description=f"Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})" + ) + + return papers + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error selecting papers: {str(e)}", + source="ScraperManager.select_papers_for_processing" + ) + return [] + + def process_paper(self, paper: PaperMetadata) -> Dict: + """Process a single paper using the current scraper.""" + try: + scraper = get_scraper() + output_statuses = scraper.get_output_statuses() + + # Store the previous status before changing it + previous_status = paper.status + + # Update paper status to processing + paper.previous_status = previous_status + paper.status = output_statuses["processing"] + paper.updated_at = datetime.utcnow() + db.session.commit() + + # Perform scraping + result = scraper.scrape(paper.doi) + + # Update paper status based on result + if result.status == "success": + paper.status = output_statuses["success"] + paper.error_msg = None + if result.data and "file_path" in result.data: + paper.file_path = result.data["file_path"] + else: + paper.status = output_statuses["failure"] + paper.error_msg = result.message + + paper.updated_at = datetime.utcnow() + db.session.commit() + + # Log result + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status=result.status, + description=f"Processed {paper.doi}: {result.message}" + ) + + return { + "paper_id": paper.id, + "status": result.status, + "message": result.message, + "duration": result.duration + } + + except Exception as e: + # Revert paper status on error + try: + input_statuses = get_scraper().get_input_statuses() + if input_statuses: + paper.status = input_statuses[0] + paper.error_msg = f"Processing error: {str(e)}" + paper.updated_at = datetime.utcnow() + db.session.commit() + except: + pass # Don't fail if reversion fails + + ActivityLog.log_error( + error_message=f"Error processing paper {paper.id}: {str(e)}", + source="ScraperManager.process_paper" + ) + + return {"paper_id": paper.id, "status": "error", "message": str(e)} + + def get_status(self) -> Dict: + """Get current scraper status.""" + scraper_state = ScraperState.get_current_state() + scraper = get_scraper() + + # Count papers by status + input_statuses = scraper.get_input_statuses() + output_statuses = scraper.get_output_statuses() + + available_count = (PaperMetadata.query + .filter(PaperMetadata.status.in_(input_statuses)) + .count()) + + processing_count = (PaperMetadata.query + .filter_by(status=output_statuses["processing"]) + .count()) + + return { + "active": scraper_state.is_active, + "paused": scraper_state.is_paused, + "current_scraper": scraper.get_name(), + "input_statuses": input_statuses, + "output_statuses": output_statuses, + "available_papers": available_count, + "processing_papers": processing_count, + "current_hour_quota": self.get_current_hour_quota() + } diff --git a/scipaperloader/scrapers/tasks.py b/scipaperloader/scrapers/tasks.py new file mode 100644 index 0000000..295d99b --- /dev/null +++ b/scipaperloader/scrapers/tasks.py @@ -0,0 +1,189 @@ +""" +Hourly scheduler task that processes papers at random times within each hour. +""" + +import random +from datetime import datetime, timedelta +from typing import Optional +from celery import shared_task + +from ..models import ScraperState, ActivityLog +from .manager import ScraperManager + + +@shared_task(bind=True) +def hourly_scraper_scheduler(self): + """ + Hourly task that schedules paper processing at random times within the hour. + + This task runs at the beginning of each hour and: + 1. Calculates how many papers to process this hour + 2. Schedules individual paper processing tasks at random times within the hour + """ + try: + # Check if scraper is active + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="hourly_scheduler", + status="info", + description="Hourly scheduler skipped - scraper not active" + ) + # Disable retries for inactive scheduler + self.retry = False + return {"status": "inactive", "papers_scheduled": 0} + + if scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="hourly_scheduler", + status="info", + description="Hourly scheduler skipped - scraper paused" + ) + # Disable retries for paused scheduler + self.retry = False + return {"status": "paused", "papers_scheduled": 0} + + # Initialize scraper manager + manager = ScraperManager() + + # Get papers to process this hour + papers = manager.select_papers_for_processing() + + if not papers: + ActivityLog.log_scraper_activity( + action="hourly_scheduler", + status="info", + description="No papers available for processing this hour" + ) + return {"status": "empty", "papers_scheduled": 0} + + # Schedule papers at random times within the hour (0-3600 seconds) + scheduled_count = 0 + current_time = datetime.now() + + for paper in papers: + # Random delay between 1 second and 58 minutes + delay_seconds = random.randint(1, 3480) # Up to 58 minutes + + # Schedule the task using Celery's task registry to avoid circular import issues + from ..celery import celery + celery.send_task( + 'scipaperloader.scrapers.tasks.process_single_paper', + args=[paper.id], + countdown=delay_seconds + ) + + scheduled_count += 1 + + # Log each scheduled paper + schedule_time = current_time + timedelta(seconds=delay_seconds) + ActivityLog.log_scraper_activity( + action="schedule_paper", + paper_id=paper.id, + status="info", + description=f"Scheduled paper {paper.doi} for processing at {schedule_time.strftime('%H:%M:%S')}" + ) + + ActivityLog.log_scraper_activity( + action="hourly_scheduler", + status="success", + description=f"Scheduled {scheduled_count} papers for random processing within this hour" + ) + + return {"status": "success", "papers_scheduled": scheduled_count} + + except Exception as e: + ActivityLog.log_error( + error_message=f"Hourly scheduler error: {str(e)}", + source="hourly_scraper_scheduler" + ) + return {"status": "error", "message": str(e)} + + +@shared_task(bind=True) +def process_single_paper(self, paper_id: int): + """ + Process a single paper. This task is scheduled at random times within each hour. + + Args: + paper_id: ID of the paper to process + """ + try: + # Double-check scraper state before processing + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="process_single_paper", + paper_id=paper_id, + status="skipped", + description="Skipped processing - scraper not active" + ) + # Use Celery's ignore to mark this task as completed without error + self.retry = False + return {"status": "inactive", "paper_id": paper_id} + + if scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="process_single_paper", + paper_id=paper_id, + status="skipped", + description="Skipped processing - scraper paused" + ) + # Use Celery's ignore for paused state too + self.retry = False + return {"status": "paused", "paper_id": paper_id} + + # Get the paper + from ..models import PaperMetadata + paper = PaperMetadata.query.get(paper_id) + if not paper: + return {"status": "error", "message": f"Paper {paper_id} not found"} + + # Process the paper using scraper manager + manager = ScraperManager() + result = manager.process_paper(paper) + + return result + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error processing paper {paper_id}: {str(e)}", + source="process_single_paper" + ) + return {"status": "error", "paper_id": paper_id, "message": str(e)} + + +@shared_task(bind=True) +def process_papers_batch(self, paper_ids: list, scraper_module: Optional[str] = None): + """ + Process multiple papers in a batch for immediate processing. + + Args: + paper_ids: List of paper IDs to process + scraper_module: Optional specific scraper module to use + """ + try: + results = [] + manager = ScraperManager() + + for paper_id in paper_ids: + from ..models import PaperMetadata + paper = PaperMetadata.query.get(paper_id) + if paper: + result = manager.process_paper(paper) + results.append(result) + else: + results.append({ + "paper_id": paper_id, + "status": "error", + "message": "Paper not found" + }) + + return {"results": results, "total_processed": len(results)} + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error processing batch: {str(e)}", + source="process_papers_batch" + ) + return {"status": "error", "message": str(e)} diff --git a/scipaperloader/templates/scraper.html.jinja b/scipaperloader/templates/scraper.html.jinja index a457c4e..596cfee 100644 --- a/scipaperloader/templates/scraper.html.jinja +++ b/scipaperloader/templates/scraper.html.jinja @@ -29,6 +29,11 @@ height: 400px; } + .chart-wrapper { + position: relative; + height: 400px; + } + .notification { position: fixed; bottom: 20px; @@ -100,132 +105,137 @@
-
Enter a value between 1 and {{ max_volume }}
+ value="{{ volume_config if volume_config else 100 }}" min="1" max="{{ max_volume }}"> +
- - +
Enter a value between 1 and {{ max_volume }}
+ + - -
-
-
-
-
Process Single Paper
-
-
-
-
-
-
- - -
-
-
-
-
- - -
- Select which scraper to use for processing the paper -
+ +
+
+
+
+
Process Single Paper
+
+
+
+
+
+
+ + +
+
+
+
+
+ + +
+ Select which scraper to use for processing the paper
- -
- - - - - - - - - - - - - -
IDTitleDOIStatusActions
-
- -
+ +
+ + + + + + + + + + + + + +
IDTitleDOIStatusActions
+
+ +
+
-
-
-
-
-
Scraping Activity
-
-
- - -
+
+
+
+
+
Scraping Activity
+
+
+ +
-
-
- - - -
-
-
-
-
- -
-
-
-
-
Recent Activity
+
+
+ + +
-
-
- - - - - - - - - - - - - - -
TimeActionStatusDescription
Loading activities...
-
+
+
+ +
+
+
+
+
Recent Activity
+
+
+
+ + + + + + + + + + + + + + +
TimeActionStatusDescription
Loading activities...
+
+
+
+
+
+
{% endblock content %} {% block scripts %} {{ super() }} +