From a0fa541de3b65a1a5a17bdc4fdc5a65a13411b2f Mon Sep 17 00:00:00 2001 From: Michael Beck Date: Wed, 16 Apr 2025 21:39:59 +0200 Subject: [PATCH] fixes dummy scraper --- scipaperloader/blueprints/scraper.py | 709 +++++++++++++++----- scipaperloader/templates/scraper.html.jinja | 45 ++ 2 files changed, 581 insertions(+), 173 deletions(-) diff --git a/scipaperloader/blueprints/scraper.py b/scipaperloader/blueprints/scraper.py index 8029d65..9372013 100644 --- a/scipaperloader/blueprints/scraper.py +++ b/scipaperloader/blueprints/scraper.py @@ -9,151 +9,272 @@ from ..db import db from ..celery import celery from ..defaults import MAX_VOLUME from celery.schedules import crontab +from sqlalchemy import func bp = Blueprint("scraper", __name__, url_prefix="/scraper") +# Track the periodic task ID for proper revocation +PERIODIC_TASK_ID = None + # Setup periodic task to run every minute for testing purposes @celery.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): - # Run the dummy scraper every minute for testing purposes - sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), name='run dummy scraper every minute') + global PERIODIC_TASK_ID + # Run the dummy scraper every minute for testing purposes with a unique task name + # Using a constant name allows us to revoke this specific periodic task when stopping + result = sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), + name='run_dummy_scraper_every_minute') + PERIODIC_TASK_ID = result.id + # Log that we've registered the periodic task with its ID + ActivityLog.log_scraper_command( + action="register_periodic_task", + status="success", + description=f"Registered periodic scraper task with ID: {PERIODIC_TASK_ID}" + ) + +# Function to revoke the periodic task properly +def revoke_periodic_task(): + global PERIODIC_TASK_ID + if PERIODIC_TASK_ID: + try: + # Revoke by ID is more reliable than by name + celery.control.revoke(PERIODIC_TASK_ID, terminate=True) + + # Attempt to revoke by name as a fallback (standard Celery method) + # Note: We're removing the non-standard revoke_from_control method + # and replacing it with a more reliable approach + i = celery.control.inspect() + scheduled = i.scheduled() or {} + + # Look for our periodic task by name in the scheduled tasks + for worker, tasks in scheduled.items(): + for task in tasks: + if task.get('name') == 'run_dummy_scraper_every_minute': + celery.control.revoke(task['id'], terminate=True) + + # Log the action + ActivityLog.log_scraper_command( + action="revoke_periodic_task", + status="success", + description=f"Revoked periodic task with ID: {PERIODIC_TASK_ID}" + ) + return True + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to revoke periodic task: {str(e)}", + source="revoke_periodic_task" + ) + return False + return False + @celery.task def run_periodic_dummy_scraper(): """Periodic task to run the dummy scraper if it's active and not paused""" - if ScraperState.is_scraper_active(): + scraper_state = ScraperState.get_current_state() + # Log every time this runs to track execution + ActivityLog.log_scraper_activity( + action="periodic_check", + status="info", + description=f"Periodic check running. Scraper active: {scraper_state.is_active}, paused: {scraper_state.is_paused}" + ) + + if scraper_state.is_active and not scraper_state.is_paused: dummy_scheduled_scraper.delay() return True return False + @bp.route("/") def index(): """Render the scraper control panel.""" volume_config = VolumeConfig.query.first() - + # Ensure we have volume config if not volume_config: volume_config = VolumeConfig(volume=100) # Default value db.session.add(volume_config) db.session.commit() - + # Get scraper state scraper_state = ScraperState.get_current_state() - + return render_template( - "scraper.html.jinja", + "scraper.html.jinja", volume_config=volume_config, scraper_active=scraper_state.is_active, scraper_paused=scraper_state.is_paused, max_volume=MAX_VOLUME ) + @bp.route("/start", methods=["POST"]) def start_scraper(): """Start the scraper.""" scraper_state = ScraperState.get_current_state() - + if not scraper_state.is_active: # Update scraper state ScraperState.set_active(True) ScraperState.set_paused(False) - + # Log the action ActivityLog.log_scraper_command( action="start_scraper", status="success", description="Scheduled scraper started - will follow hourly configuration" ) - + # Immediately trigger a task to test the scheduler and provide feedback dummy_scheduled_scraper.delay() - + return jsonify({ - "success": True, + "success": True, "message": "Scraper started - following hourly schedule configuration" }) else: return jsonify({ - "success": False, + "success": False, "message": "Scraper is already running" }) + @bp.route("/stop", methods=["POST"]) def stop_scraper(): - """Stop the scraper.""" + """Stop the scraper completely.""" scraper_state = ScraperState.get_current_state() - + if scraper_state.is_active: - # Update scraper state + # Update scraper state first ScraperState.set_active(False) ScraperState.set_paused(False) + + # Stop any running tasks + task_types_to_revoke = [ + 'scipaperloader.blueprints.scraper.dummy_process_paper', + 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper', + 'scipaperloader.blueprints.scraper.run_periodic_dummy_scraper' + ] + # Use our utility function to revoke tasks + revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True) + + # Revoke the periodic task specifically + revoke_periodic_task() + + # Clear all pending tasks from the queue more aggressively + try: + # This purges all tasks in all queues + celery.control.purge() + ActivityLog.log_scraper_command( + action="purge_queue", + status="success", + description="Purged all task queues" + ) + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to purge task queues: {str(e)}", + source="stop_scraper" + ) + # Fallback to discard_all if purge fails + celery.control.discard_all() + + # Restart the worker to ensure clean state + try: + celery.control.broadcast('pool_restart', arguments={'reload': True}) + ActivityLog.log_scraper_command( + action="restart_worker", + status="success", + description="Worker pool restart requested" + ) + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to restart worker pool: {str(e)}", + source="stop_scraper" + ) + ActivityLog.log_scraper_command( action="stop_scraper", status="success", - description="Scraper stopped manually" + description=f"Scraper stopped manually. Revoked {revoked_count} pending tasks. Worker pool restart requested." ) - + return jsonify({ - "success": True, - "message": "Scraper stopped" + "success": True, + "message": f"Scraper stopped. Revoked {revoked_count} pending tasks and requested worker restart." }) else: return jsonify({ - "success": False, + "success": False, "message": "Scraper is not running" }) + @bp.route("/pause", methods=["POST"]) def pause_scraper(): """Pause the scraper.""" scraper_state = ScraperState.get_current_state() - + if scraper_state.is_active and not scraper_state.is_paused: # Update scraper state ScraperState.set_paused(True) - + + # Just revoke processing tasks, but leave the periodic tasks running + # so it can continue to check the state (which is now paused) + task_types_to_revoke = [ + 'scipaperloader.blueprints.scraper.dummy_process_paper', + 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper' + ] + + # Use our utility function to revoke tasks + revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True) + + # Also clear the queue + celery.control.discard_all() + ActivityLog.log_scraper_command( action="pause_scraper", status="success", - description="Scraper paused manually" + description=f"Scraper paused manually. Revoked {revoked_count} pending tasks." ) - + return jsonify({ - "success": True, - "message": "Scraper paused" + "success": True, + "message": f"Scraper paused. Revoked {revoked_count} pending tasks." }) elif scraper_state.is_active and scraper_state.is_paused: # Update scraper state ScraperState.set_paused(False) - + ActivityLog.log_scraper_command( action="resume_scraper", status="success", description="Scraper resumed manually" ) - + return jsonify({ - "success": True, + "success": True, "message": "Scraper resumed" }) else: return jsonify({ - "success": False, + "success": False, "message": "Scraper is not running" }) + @bp.route("/status") def scraper_status(): """Get the current status of the scraper.""" scraper_state = ScraperState.get_current_state() - + return jsonify({ "active": scraper_state.is_active, "paused": scraper_state.is_paused, "current_hour": datetime.now().hour, }) + @bp.route("/stats") def scraper_stats(): """Get scraper statistics for the dashboard.""" @@ -164,19 +285,19 @@ def scraper_stats(): hours = int(request.args.get('hours')) except ValueError: pass - + current_time = datetime.utcnow() # Use timedelta for proper date calculation instead of simple hour subtraction cutoff_time = (current_time - timedelta(hours=hours)).replace( minute=0, second=0, microsecond=0 ) - + # Get activity logs for scraper actions logs = ActivityLog.query.filter( ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value, ActivityLog.timestamp >= cutoff_time ).all() - + # Group by hour and status stats = {} for hour in range(hours): @@ -188,7 +309,7 @@ def scraper_stats(): "pending": 0, "hour": target_hour, } - + for log in logs: hour = log.timestamp.hour if hour in stats: @@ -198,29 +319,30 @@ def scraper_stats(): stats[hour]["error"] += 1 elif log.status == "pending": stats[hour]["pending"] += 1 - + # Convert to list for easier consumption by JavaScript result = [stats[hour] for hour in sorted(stats.keys())] - + return jsonify(result) + @bp.route("/update_config", methods=["POST"]) def update_config(): """Update scraper configuration.""" data = request.json - + try: if "volume" in data: try: new_volume = float(data["volume"]) - + # Validate volume value if new_volume <= 0 or new_volume > MAX_VOLUME: return jsonify({ - "success": False, + "success": False, "message": f"Volume must be between 1 and {MAX_VOLUME}" }) - + volume_config = VolumeConfig.query.first() if not volume_config: volume_config = VolumeConfig(volume=new_volume) @@ -234,30 +356,31 @@ def update_config(): new_value=new_volume, description="Updated scraper volume" ) - + db.session.commit() except (ValueError, TypeError): return jsonify({ - "success": False, + "success": False, "message": "Invalid volume value" }) - + return jsonify({"success": True, "message": "Configuration updated"}) - + except Exception as e: db.session.rollback() return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"}) + @celery.task(bind=True) def dummy_scrape_paper(self): """Simulate scraping a single paper.""" # Simulate success or failure success = random.random() > 0.3 # 70% success rate - + # Simulate processing time import time time.sleep(random.randint(2, 5)) # 2-5 seconds - + if success: # Create a dummy paper new_paper = PaperMetadata( @@ -273,10 +396,10 @@ def dummy_scrape_paper(self): status="Done", file_path="/path/to/dummy/paper.pdf" ) - + db.session.add(new_paper) db.session.commit() - + # Log the successful scrape ActivityLog.log_scraper_activity( action="scrape_paper", @@ -284,7 +407,7 @@ def dummy_scrape_paper(self): status="success", description=f"Successfully scraped paper {new_paper.doi}" ) - + return { "success": True, "paper_id": new_paper.id, @@ -301,36 +424,37 @@ def dummy_scrape_paper(self): "PDF download failed", "Rate limited by publisher" ]) - + ActivityLog.log_scraper_activity( action="scrape_paper", status="error", description=f"Failed to scrape paper: {error_message}" ) - + return { "success": False, "error": error_message } + @celery.task def calculate_papers_for_current_hour(): """ Calculate how many papers should be downloaded in the current hour based on schedule configuration. - + Returns: int: Number of papers to download this hour """ current_hour = datetime.now().hour - + # Get volume configuration volume_config = VolumeConfig.query.first() if not volume_config: volume_config = VolumeConfig(volume=100) # Default to 100 papers per day db.session.add(volume_config) db.session.commit() - + # Get all schedule configurations to calculate total weight schedule_configs = ScheduleConfig.query.all() if not schedule_configs: @@ -340,10 +464,10 @@ def calculate_papers_for_current_hour(): db.session.add(config) db.session.commit() schedule_configs = ScheduleConfig.query.all() - + # Calculate total weight across all hours total_weight = sum(config.weight for config in schedule_configs) - + # Find the weight for the current hour current_hour_config = ScheduleConfig.query.get(current_hour) if not current_hour_config: @@ -351,167 +475,161 @@ def calculate_papers_for_current_hour(): current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0) db.session.add(current_hour_config) db.session.commit() - + # Calculate papers for current hour: (hour_weight / total_weight) * daily_volume if total_weight > 0: weight_ratio = current_hour_config.weight / total_weight papers_this_hour = math.floor(weight_ratio * volume_config.volume) else: papers_this_hour = 0 - + return papers_this_hour @celery.task def dummy_scheduled_scraper(): """ - The main scheduler task that runs every hour to process papers - according to the configured schedule. + Selects new papers based on the hourly schedule and marks them as Pending. + Then schedules their processing randomly within the hour. """ - # Check if scraper is active using ScraperState - if not ScraperState.is_scraper_active(): + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active or scraper_state.is_paused: ActivityLog.log_scraper_activity( - action="scheduled_scraping", - status="info", - description=f"Scheduled scraping skipped: inactive or paused" + action="dummy_scheduled_scraper_skip", + status="info", + description="Skipping run because scraper is inactive or paused." + ) + return False # Stop if not active/paused + + papers_to_select = calculate_papers_for_current_hour() + + if papers_to_select <= 0: + ActivityLog.log_scraper_activity( + action="dummy_scheduled_scraper_info", + status="info", + description=f"Hourly quota is {papers_to_select}. No papers to select this hour." + ) + return True # Nothing to do this hour based on schedule + + # --- Core Logic Change: Select NEW papers --- + try: + # Find "New" papers, select randomly up to the calculated limit + new_papers = PaperMetadata.query.filter_by(status="New") \ + .order_by(func.random()) \ + .limit(papers_to_select) \ + .all() + + if not new_papers: + ActivityLog.log_scraper_activity( + action="dummy_scheduled_scraper_info", + status="info", + description="No 'New' papers found in the database to select." + ) + # Optional: Depending on requirements, you might want to check later + # or handle this case differently. For now, we just log and exit. + return True + + selected_paper_ids = [p.id for p in new_papers] + + # Update status to "Pending" in bulk for efficiency + PaperMetadata.query.filter(PaperMetadata.id.in_(selected_paper_ids)) \ + .update({"status": "Pending", "updated_at": datetime.utcnow()}, synchronize_session=False) + db.session.commit() + + ActivityLog.log_scraper_activity( + action="select_new_papers", + status="success", + description=f"Selected {len(selected_paper_ids)} 'New' papers and marked as 'Pending'. IDs: {selected_paper_ids}" + ) + + # --- Now schedule processing for the newly selected "Pending" papers --- + # (Assuming dummy_process_paper takes a paper_id) + # Add random delays for processing within the hour (e.g., up to 3600 seconds) + for paper_id in selected_paper_ids: + delay = random.uniform(1, 3500) # Random delay up to ~58 minutes + dummy_process_paper.apply_async(args=[paper_id], countdown=delay) + + ActivityLog.log_scraper_activity( + action="schedule_processing", + status="success", + description=f"Scheduled processing for {len(selected_paper_ids)} papers with random delays." + ) + + return True + + except Exception as e: + db.session.rollback() # Rollback DB changes on error + ActivityLog.log_error( + error_message=f"Error in dummy_scheduled_scraper: {str(e)}", + source="dummy_scheduled_scraper" ) return False - - # Calculate how many papers to download this hour - papers_to_download = calculate_papers_for_current_hour() - - if papers_to_download <= 0: - ActivityLog.log_scraper_activity( - action="scheduled_scraping", - status="info", - description=f"No papers scheduled for current hour" - ) - return True - - # Get all pending papers - pending_papers = PaperMetadata.query.filter_by(status="Pending").all() - - # If no pending papers available, create some dummy pending papers - if not pending_papers: - ActivityLog.log_scraper_activity( - action="scheduled_scraping", - status="info", - description=f"No pending papers found - creating {papers_to_download} dummy pending papers" - ) - - # Create dummy pending papers - for i in range(papers_to_download): - # Generate a unique DOI by checking if it exists in the database - while True: - random_id = random.randint(1000, 9999) - doi = f"10.1234/dummy-pending.{random_id}" - - # Check if the DOI already exists - existing = PaperMetadata.query.filter_by(doi=doi).first() - if not existing: - break - - new_paper = PaperMetadata( - title=f"Dummy Pending Paper {random_id}", - doi=doi, - journal=random.choice([ - "Nature", "Science", "PLOS ONE", "Journal of Dummy Research", - "Proceedings of the Dummy Society", "Cell", "Dummy Review Letters" - ]), - type="article", - language="en", - published_online=datetime.now().date(), - status="Pending" - ) - db.session.add(new_paper) - - # Commit all at once after creating all papers - try: - db.session.commit() - except Exception as e: - # Log the error and rollback - ActivityLog.log_error( - error_message="Failed to create dummy pending papers", - exception=e, - source="dummy_scheduled_scraper" - ) - db.session.rollback() - return False - - # Get the newly created papers - pending_papers = PaperMetadata.query.filter_by(status="Pending").all() - - # Select papers_to_download random papers from pending_papers - selected_papers = random.sample( - pending_papers, - min(papers_to_download, len(pending_papers)) - ) - - ActivityLog.log_scraper_activity( - action="scheduled_scraping", - status="info", - description=f"Starting scheduled scraping of {len(selected_papers)} papers for hour {datetime.now().hour}" - ) - - # For each paper, schedule it to run at a random time within the hour - current_time = time.time() - one_hour_in_seconds = 3600 - - for paper in selected_papers: - # Random delay within this hour (0 to 60 minutes) - random_delay = random.randint(0, one_hour_in_seconds) - - # Schedule the dummy_process_paper task with the random delay - dummy_process_paper.apply_async( - args=[paper.id], - countdown=random_delay - ) - - return True @celery.task(bind=True) def dummy_process_paper(self, paper_id): """ Process a single paper for the dummy scraper. - + Args: paper_id (int): ID of the paper to process """ + # First check if the scraper is still active and not paused + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active or scraper_state.is_paused: + # Log that task was skipped due to scraper being stopped or paused + ActivityLog.log_scraper_activity( + action="process_paper", + status="info", + description=f"Skipped processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" + ) + return False + # Get the paper from database paper = PaperMetadata.query.get(paper_id) if not paper: # Log error if paper not found ActivityLog.log_scraper_activity( action="process_paper", - status="error", + status="error", description=f"Paper with ID {paper_id} not found" ) return False - + # Simulate random success/failure (70% success rate) success = random.random() < 0.7 - + # Simulate processing time (1-5 seconds) process_time = random.uniform(1, 5) time.sleep(process_time) - + + # Check again if scraper is still active and not paused after the time delay + # This ensures we don't process papers if the scraper was stopped during the delay + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active or scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="process_paper", + status="info", + description=f"Cancelled processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" + ) + return False + if success: # Update paper status to "Done" paper.status = "Done" paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf" - + # Log success ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, - status="success", - description=f"Successfully processed paper: {paper.title}" + status="success", + description=f"Successfully processed paper: {paper.doi}" ) else: # Update paper status to "Failed" paper.status = "Failed" - + # Generate random error message error_message = random.choice([ "Publisher website unavailable", @@ -522,19 +640,264 @@ def dummy_process_paper(self, paper_id): "Rate limited by publisher" ]) paper.error_msg = error_message - + # Log failure ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, - status="error", + status="error", description=f"Failed to process paper: {error_message}" ) - + # Update the timestamp paper.updated_at = datetime.utcnow() - + # Commit changes to database db.session.commit() + + return success + + +@celery.task(bind=True) +def process_paper_batch(self, paper_ids): + """ + Process a batch of papers to improve throughput and reduce overhead. - return success \ No newline at end of file + Args: + paper_ids (list): List of paper IDs to process in this batch + """ + # Check if scraper is still active + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active or scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="process_paper_batch", + status="info", + description=f"Skipped batch of {len(paper_ids)} papers because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" + ) + return False + + # Log the batch starting + ActivityLog.log_scraper_activity( + action="process_paper_batch", + status="info", + description=f"Started processing batch of {len(paper_ids)} papers" + ) + + # Process each paper in the batch + results = { + "success": 0, + "failure": 0, + "skipped": 0 + } + + # Begin a transaction for the entire batch + try: + for paper_id in paper_ids: + # Double-check scraper state before each paper + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active or scraper_state.is_paused: + results["skipped"] += 1 + continue + + # Get the paper from database + paper = PaperMetadata.query.get(paper_id) + if not paper: + results["skipped"] += 1 + continue + + # Simulate random success/failure (70% success rate) + success = random.random() < 0.7 + + # Simulate some processing time (0.5-2 seconds per paper in batch) + time.sleep(random.uniform(0.5, 2)) + + if success: + # Update paper status to "Done" + paper.status = "Done" + paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf" + results["success"] += 1 + + # Log individual paper success with minimal overhead + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="success", + description=f"Processed in batch: {paper.title}" + ) + else: + # Update paper status to "Failed" + paper.status = "Failed" + + # Generate random error message + error_message = random.choice([ + "Publisher website unavailable", + "No PDF download link found", + "Access restricted", + "Download timeout", + "Invalid DOI", + "Rate limited by publisher" + ]) + paper.error_msg = error_message + results["failure"] += 1 + + # Log individual paper failure with minimal overhead + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="error", + description=f"Failed in batch: {error_message}" + ) + + # Update the timestamp + paper.updated_at = datetime.utcnow() + + # Commit the entire batch at once + db.session.commit() + + except Exception as e: + # If any error occurs, roll back the entire batch + db.session.rollback() + ActivityLog.log_error( + error_message=f"Error processing paper batch: {str(e)}", + source="process_paper_batch" + ) + return False + + # Log batch completion + ActivityLog.log_scraper_activity( + action="process_paper_batch", + status="success", + description=f"Completed batch processing: {results['success']} succeeded, {results['failure']} failed, {results['skipped']} skipped" + ) + + return results + + +@bp.route("/reset", methods=["POST"]) +def reset_scraper(): + """ + Reset the scraper completely: + 1. Stop all running tasks + 2. Optionally purge all papers except those with 'Pending' status + 3. Reset scraper state to active and unpaused + 4. Trigger a new scraping cycle + """ + # First stop everything + stop_scraper() + + # Check if we should clear papers + clear_papers = request.json.get('clear_papers', True) if request.is_json else True + + if clear_papers: + try: + # Get all papers that aren't in Pending status + papers = PaperMetadata.query.filter(PaperMetadata.status != "Pending").all() + count = len(papers) + + # Delete them all + for paper in papers: + db.session.delete(paper) + + db.session.commit() + + ActivityLog.log_scraper_command( + action="reset_scraper", + status="success", + description=f"Reset scraper and cleared {count} non-pending papers" + ) + except Exception as e: + db.session.rollback() + ActivityLog.log_error( + error_message=f"Failed to reset papers: {str(e)}", + source="reset_scraper" + ) + return jsonify({ + "success": False, + "message": f"Error clearing papers: {str(e)}" + }) + + # Set state to active and unpaused + ScraperState.set_active(True) + ScraperState.set_paused(False) + + # Re-register the periodic task if needed + setup_periodic_tasks(celery) + + # Kick off a fresh scraping cycle + dummy_scheduled_scraper.delay() + + return jsonify({ + "success": True, + "message": "Scraper has been completely reset and restarted" + }) + +# Common utility function to revoke tasks by type +def revoke_tasks_by_type(task_types, terminate=True): + """ + Revokes all tasks of specified types across scheduled, reserved and active queues. + + Args: + task_types (list): List of task name strings to revoke + terminate (bool): Whether to terminate running tasks + + Returns: + int: Count of revoked tasks + """ + # Get all tasks of all types + i = celery.control.inspect() + scheduled = i.scheduled() or {} + reserved = i.reserved() or {} + active = i.active() or {} + + revoked_count = 0 + + # Revoke all scheduled tasks + for worker, tasks in scheduled.items(): + for task in tasks: + if task['name'] in task_types: + celery.control.revoke(task['id'], terminate=terminate) + revoked_count += 1 + + # Revoke all reserved tasks + for worker, tasks in reserved.items(): + for task in tasks: + if task['name'] in task_types: + celery.control.revoke(task['id'], terminate=terminate) + revoked_count += 1 + + # Revoke all active tasks + for worker, tasks in active.items(): + for task in tasks: + if task['name'] in task_types: + celery.control.revoke(task['id'], terminate=terminate) + revoked_count += 1 + + return revoked_count + +import math +from datetime import datetime +from ..models import VolumeConfig, ScheduleConfig, PaperMetadata # Ensure imports +from ..db import db # Ensure import + +def calculate_papers_for_current_hour(): + """Calculates the target number of papers for the current hour using w*N/24.""" + current_hour = datetime.now().hour + volume_config = VolumeConfig.query.first() + # Use default if not set + volume = volume_config.volume if volume_config else 100 + + current_hour_config = ScheduleConfig.query.filter_by(hour=current_hour).first() + # Use default weight 1.0 if not set for the hour + weight = current_hour_config.weight if current_hour_config else 1.0 + + # Calculate papers using the formula: w * N / 24 + papers_this_hour = math.floor(weight * volume / 24) + + # Log the calculation for debugging + ActivityLog.log_scraper_activity( + action="calculate_hourly_quota", + status="info", + description=f"Hour {current_hour}: weight={weight:.2f}, total_volume={volume}, target_papers={papers_this_hour}" + ) + + return papers_this_hour diff --git a/scipaperloader/templates/scraper.html.jinja b/scipaperloader/templates/scraper.html.jinja index b142ae9..750b15f 100644 --- a/scipaperloader/templates/scraper.html.jinja +++ b/scipaperloader/templates/scraper.html.jinja @@ -62,6 +62,7 @@ + @@ -160,6 +161,7 @@ const startButton = document.getElementById('startButton'); const pauseButton = document.getElementById('pauseButton'); const stopButton = document.getElementById('stopButton'); + const resetButton = document.getElementById('resetButton'); const notificationsToggle = document.getElementById('notificationsToggle'); const activityLog = document.getElementById('activityLog'); @@ -173,6 +175,7 @@ startButton.addEventListener('click', startScraper); pauseButton.addEventListener('click', togglePauseScraper); stopButton.addEventListener('click', stopScraper); + resetButton.addEventListener('click', resetScraper); notificationsToggle.addEventListener('click', toggleNotifications); document.getElementById('volumeForm').addEventListener('submit', function (e) { @@ -213,12 +216,14 @@ startButton.disabled = true; pauseButton.disabled = false; stopButton.disabled = false; + resetButton.disabled = false; // Enable reset when active } else { statusIndicator.className = 'status-indicator status-inactive'; statusText.textContent = 'Inactive'; startButton.disabled = false; pauseButton.disabled = true; stopButton.disabled = true; + resetButton.disabled = false; // Enable reset when inactive too } }); } @@ -276,6 +281,46 @@ }); } + function resetScraper() { + if (confirm("Are you sure you want to reset the scraper? This will stop all current tasks, optionally clear non-pending papers, and restart the scraper.")) { + // Disable button to prevent multiple clicks + resetButton.disabled = true; + + // Show a loading message + showFlashMessage('Resetting scraper, please wait...', 'info'); + + fetch('/scraper/reset', { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ + clear_papers: true // You could make this configurable with a checkbox + }) + }) + .then(response => response.json()) + .then(data => { + if (data.success) { + showFlashMessage('Scraper has been completely reset and restarted', 'success'); + // Update everything + updateStatus(); + loadActivityStats(currentTimeRange); + setTimeout(() => { loadRecentActivity(); }, 1000); + } else { + showFlashMessage(data.message || 'Error resetting scraper', 'error'); + } + // Re-enable button + resetButton.disabled = false; + }) + .catch(error => { + console.error("Error resetting scraper:", error); + showFlashMessage('Error resetting scraper: ' + error.message, 'error'); + // Re-enable button + resetButton.disabled = false; + }); + } + } + function updateVolume() { const volume = document.getElementById('volumeInput').value;