import random import json import time import math import os # Import os for path joining from datetime import datetime, timedelta from flask import Blueprint, jsonify, render_template, request, current_app, flash # Import the new model from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig, ScraperState, DownloadPathConfig from ..db import db from ..celery import celery from ..defaults import MAX_VOLUME from celery.schedules import crontab from sqlalchemy import func bp = Blueprint("scraper", __name__, url_prefix="/scraper") # Track the periodic task ID for proper revocation PERIODIC_TASK_ID = None # Setup periodic task to run every minute for testing purposes @celery.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): global PERIODIC_TASK_ID # Run the dummy scraper every minute for testing purposes with a unique task name # Using a constant name allows us to revoke this specific periodic task when stopping result = sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), name='run_dummy_scraper_every_minute') PERIODIC_TASK_ID = result.id # Log that we've registered the periodic task with its ID ActivityLog.log_scraper_command( action="register_periodic_task", status="success", description=f"Registered periodic scraper task with ID: {PERIODIC_TASK_ID}" ) # Function to revoke the periodic task properly def revoke_periodic_task(): global PERIODIC_TASK_ID if PERIODIC_TASK_ID: try: # Revoke by ID is more reliable than by name celery.control.revoke(PERIODIC_TASK_ID, terminate=True) # Attempt to revoke by name as a fallback (standard Celery method) # Note: We're removing the non-standard revoke_from_control method # and replacing it with a more reliable approach i = celery.control.inspect() scheduled = i.scheduled() or {} # Look for our periodic task by name in the scheduled tasks for worker, tasks in scheduled.items(): for task in tasks: if task.get('name') == 'run_dummy_scraper_every_minute': celery.control.revoke(task['id'], terminate=True) # Log the action ActivityLog.log_scraper_command( action="revoke_periodic_task", status="success", description=f"Revoked periodic task with ID: {PERIODIC_TASK_ID}" ) return True except Exception as e: ActivityLog.log_error( error_message=f"Failed to revoke periodic task: {str(e)}", source="revoke_periodic_task" ) return False return False @celery.task def run_periodic_dummy_scraper(): """Periodic task to run the dummy scraper if it's active and not paused""" scraper_state = ScraperState.get_current_state() # Log every time this runs to track execution ActivityLog.log_scraper_activity( action="periodic_check", status="info", description=f"Periodic check running. Scraper active: {scraper_state.is_active}, paused: {scraper_state.is_paused}" ) if scraper_state.is_active and not scraper_state.is_paused: dummy_scheduled_scraper.delay() return True return False @bp.route("/") def index(): """Render the scraper control panel.""" volume_config = VolumeConfig.query.first() # Ensure we have volume config if not volume_config: volume_config = VolumeConfig(volume=100) # Default value db.session.add(volume_config) db.session.commit() # Get scraper state scraper_state = ScraperState.get_current_state() return render_template( "scraper.html.jinja", volume_config=volume_config, scraper_active=scraper_state.is_active, scraper_paused=scraper_state.is_paused, max_volume=MAX_VOLUME ) @bp.route("/start", methods=["POST"]) def start_scraper(): """Start the scraper.""" scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: # Update scraper state ScraperState.set_active(True) ScraperState.set_paused(False) # Log the action ActivityLog.log_scraper_command( action="start_scraper", status="success", description="Scheduled scraper started - will follow hourly configuration" ) # Immediately trigger a task to test the scheduler and provide feedback dummy_scheduled_scraper.delay() return jsonify({ "success": True, "message": "Scraper started - following hourly schedule configuration" }) else: return jsonify({ "success": False, "message": "Scraper is already running" }) @bp.route("/stop", methods=["POST"]) def stop_scraper(): """Stop the scraper completely.""" scraper_state = ScraperState.get_current_state() if scraper_state.is_active: # Update scraper state first ScraperState.set_active(False) ScraperState.set_paused(False) # Stop any running tasks task_types_to_revoke = [ 'scipaperloader.blueprints.scraper.dummy_process_paper', 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper', 'scipaperloader.blueprints.scraper.run_periodic_dummy_scraper' ] # Use our utility function to revoke tasks revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True) # Revoke the periodic task specifically revoke_periodic_task() # Clear all pending tasks from the queue more aggressively try: # This purges all tasks in all queues celery.control.purge() ActivityLog.log_scraper_command( action="purge_queue", status="success", description="Purged all task queues" ) except Exception as e: ActivityLog.log_error( error_message=f"Failed to purge task queues: {str(e)}", source="stop_scraper" ) # Fallback to discard_all if purge fails celery.control.discard_all() # Restart the worker to ensure clean state try: celery.control.broadcast('pool_restart', arguments={'reload': True}) ActivityLog.log_scraper_command( action="restart_worker", status="success", description="Worker pool restart requested" ) except Exception as e: ActivityLog.log_error( error_message=f"Failed to restart worker pool: {str(e)}", source="stop_scraper" ) ActivityLog.log_scraper_command( action="stop_scraper", status="success", description=f"Scraper stopped manually. Revoked {revoked_count} pending tasks. Worker pool restart requested." ) return jsonify({ "success": True, "message": f"Scraper stopped. Revoked {revoked_count} pending tasks and requested worker restart." }) else: return jsonify({ "success": False, "message": "Scraper is not running" }) @bp.route("/pause", methods=["POST"]) def pause_scraper(): """Pause the scraper.""" scraper_state = ScraperState.get_current_state() if scraper_state.is_active and not scraper_state.is_paused: # Update scraper state ScraperState.set_paused(True) # Just revoke processing tasks, but leave the periodic tasks running # so it can continue to check the state (which is now paused) task_types_to_revoke = [ 'scipaperloader.blueprints.scraper.dummy_process_paper', 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper' ] # Use our utility function to revoke tasks revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True) # Also clear the queue celery.control.discard_all() ActivityLog.log_scraper_command( action="pause_scraper", status="success", description=f"Scraper paused manually. Revoked {revoked_count} pending tasks." ) return jsonify({ "success": True, "message": f"Scraper paused. Revoked {revoked_count} pending tasks." }) elif scraper_state.is_active and scraper_state.is_paused: # Update scraper state ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="resume_scraper", status="success", description="Scraper resumed manually" ) return jsonify({ "success": True, "message": "Scraper resumed" }) else: return jsonify({ "success": False, "message": "Scraper is not running" }) @bp.route("/status") def scraper_status(): """Get the current status of the scraper.""" scraper_state = ScraperState.get_current_state() return jsonify({ "active": scraper_state.is_active, "paused": scraper_state.is_paused, "current_hour": datetime.now().hour, }) @bp.route("/stats") def scraper_stats(): """Get scraper statistics for the dashboard.""" # Get the last 24 hours of activity hours = 24 if request.args.get('hours'): try: hours = int(request.args.get('hours')) except ValueError: pass current_time = datetime.utcnow() # Use timedelta for proper date calculation instead of simple hour subtraction cutoff_time = (current_time - timedelta(hours=hours)).replace( minute=0, second=0, microsecond=0 ) # Get activity logs for scraper actions logs = ActivityLog.query.filter( ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value, ActivityLog.timestamp >= cutoff_time ).all() # Group by hour and status stats = {} for hour in range(hours): # Calculate the hour as offset from current time target_hour = (current_time.hour - hour) % 24 stats[target_hour] = { "success": 0, "error": 0, "pending": 0, "hour": target_hour, } for log in logs: hour = log.timestamp.hour if hour in stats: if log.status == "success": stats[hour]["success"] += 1 elif log.status == "error": stats[hour]["error"] += 1 elif log.status == "pending": stats[hour]["pending"] += 1 # Convert to list for easier consumption by JavaScript result = [stats[hour] for hour in sorted(stats.keys())] return jsonify(result) @bp.route("/update_config", methods=["POST"]) def update_config(): """Update scraper configuration.""" data = request.json try: if "volume" in data: try: new_volume = float(data["volume"]) # Validate volume value if new_volume <= 0 or new_volume > MAX_VOLUME: return jsonify({ "success": False, "message": f"Volume must be between 1 and {MAX_VOLUME}" }) volume_config = VolumeConfig.query.first() if not volume_config: volume_config = VolumeConfig(volume=new_volume) db.session.add(volume_config) else: old_value = volume_config.volume volume_config.volume = new_volume ActivityLog.log_config_change( config_key="scraper_volume", old_value=old_value, new_value=new_volume, description="Updated scraper volume" ) db.session.commit() except (ValueError, TypeError): return jsonify({ "success": False, "message": "Invalid volume value" }) return jsonify({"success": True, "message": "Configuration updated"}) except Exception as e: db.session.rollback() return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"}) @celery.task(bind=True) def dummy_scrape_paper(self): """Simulate scraping a single paper.""" # Simulate success or failure success = random.random() > 0.3 # 70% success rate # Simulate processing time import time time.sleep(random.randint(2, 5)) # 2-5 seconds if success: # Create a dummy paper new_paper = PaperMetadata( title=f"Dummy Paper {random.randint(1000, 9999)}", doi=f"10.1234/dummy.{random.randint(1000, 9999)}", journal=random.choice([ "Nature", "Science", "PLOS ONE", "Journal of Dummy Research", "Proceedings of the Dummy Society", "Cell", "Dummy Review Letters" ]), type="article", language="en", published_online=datetime.now().date(), status="Done", file_path="/path/to/dummy/paper.pdf" ) db.session.add(new_paper) db.session.commit() # Log the successful scrape ActivityLog.log_scraper_activity( action="scrape_paper", paper_id=new_paper.id, status="success", description=f"Successfully scraped paper {new_paper.doi}" ) return { "success": True, "paper_id": new_paper.id, "title": new_paper.title, "doi": new_paper.doi } else: # Log the failed scrape error_message = random.choice([ "Connection timeout", "404 Not Found", "Access denied", "Invalid DOI format", "PDF download failed", "Rate limited by publisher" ]) ActivityLog.log_scraper_activity( action="scrape_paper", status="error", description=f"Failed to scrape paper: {error_message}" ) return { "success": False, "error": error_message } @celery.task def calculate_papers_for_current_hour(): """ Calculate how many papers should be downloaded in the current hour based on schedule configuration. Returns: int: Number of papers to download this hour """ current_hour = datetime.now().hour # Get volume configuration volume_config = VolumeConfig.query.first() if not volume_config: volume_config = VolumeConfig(volume=100) # Default to 100 papers per day db.session.add(volume_config) db.session.commit() # Get all schedule configurations to calculate total weight schedule_configs = ScheduleConfig.query.all() if not schedule_configs: # If no schedule configs, create default with equal weights for hour in range(24): config = ScheduleConfig(hour=hour, weight=1.0) db.session.add(config) db.session.commit() schedule_configs = ScheduleConfig.query.all() # Calculate total weight across all hours total_weight = sum(config.weight for config in schedule_configs) # Find the weight for the current hour current_hour_config = ScheduleConfig.query.get(current_hour) if not current_hour_config: # Create config for current hour if it doesn't exist current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0) db.session.add(current_hour_config) db.session.commit() # Calculate papers for current hour: (hour_weight / total_weight) * daily_volume if total_weight > 0: weight_ratio = current_hour_config.weight / total_weight papers_this_hour = math.floor(weight_ratio * volume_config.volume) else: papers_this_hour = 0 return papers_this_hour @celery.task def dummy_scheduled_scraper(): """ Selects new papers based on the hourly schedule and marks them as Pending. Then schedules their processing randomly within the hour. """ scraper_state = ScraperState.get_current_state() if not scraper_state.is_active or scraper_state.is_paused: ActivityLog.log_scraper_activity( action="dummy_scheduled_scraper_skip", status="info", description="Skipping run because scraper is inactive or paused." ) return False # Stop if not active/paused papers_to_select = calculate_papers_for_current_hour() if papers_to_select <= 0: ActivityLog.log_scraper_activity( action="dummy_scheduled_scraper_info", status="info", description=f"Hourly quota is {papers_to_select}. No papers to select this hour." ) return True # Nothing to do this hour based on schedule # --- Core Logic Change: Select NEW papers --- try: # Find "New" papers, select randomly up to the calculated limit new_papers = PaperMetadata.query.filter_by(status="New") \ .order_by(func.random()) \ .limit(papers_to_select) \ .all() if not new_papers: ActivityLog.log_scraper_activity( action="dummy_scheduled_scraper_info", status="info", description="No 'New' papers found in the database to select." ) # Optional: Depending on requirements, you might want to check later # or handle this case differently. For now, we just log and exit. return True selected_paper_ids = [p.id for p in new_papers] # Update status to "Pending" in bulk for efficiency PaperMetadata.query.filter(PaperMetadata.id.in_(selected_paper_ids)) \ .update({"status": "Pending", "updated_at": datetime.utcnow()}, synchronize_session=False) db.session.commit() ActivityLog.log_scraper_activity( action="select_new_papers", status="success", description=f"Selected {len(selected_paper_ids)} 'New' papers and marked as 'Pending'. IDs: {selected_paper_ids}" ) # --- Now schedule processing for the newly selected "Pending" papers --- # (Assuming dummy_process_paper takes a paper_id) # Add random delays for processing within the hour (e.g., up to 3600 seconds) for paper_id in selected_paper_ids: delay = random.uniform(1, 3500) # Random delay up to ~58 minutes dummy_process_paper.apply_async(args=[paper_id], countdown=delay) ActivityLog.log_scraper_activity( action="schedule_processing", status="success", description=f"Scheduled processing for {len(selected_paper_ids)} papers with random delays." ) return True except Exception as e: db.session.rollback() # Rollback DB changes on error ActivityLog.log_error( error_message=f"Error in dummy_scheduled_scraper: {str(e)}", source="dummy_scheduled_scraper" ) return False @celery.task(bind=True) def dummy_process_paper(self, paper_id): """ Process a single paper for the dummy scraper. Args: paper_id (int): ID of the paper to process """ # First check if the scraper is still active and not paused scraper_state = ScraperState.get_current_state() if not scraper_state.is_active or scraper_state.is_paused: # Log that task was skipped due to scraper being stopped or paused ActivityLog.log_scraper_activity( action="process_paper", status="info", description=f"Skipped processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" ) return False # Get the paper from database paper = PaperMetadata.query.get(paper_id) if not paper: # Log error if paper not found ActivityLog.log_scraper_activity( action="process_paper", status="error", description=f"Paper with ID {paper_id} not found" ) return False # Simulate random success/failure (70% success rate) success = random.random() < 0.7 # Simulate processing time (1-5 seconds) process_time = random.uniform(1, 5) time.sleep(process_time) # Check again if scraper is still active and not paused after the time delay # This ensures we don't process papers if the scraper was stopped during the delay scraper_state = ScraperState.get_current_state() if not scraper_state.is_active or scraper_state.is_paused: ActivityLog.log_scraper_activity( action="process_paper", status="info", description=f"Cancelled processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" ) return False if success: # --- Get configured download path --- download_base_path = DownloadPathConfig.get_path() # Ensure the base path exists (optional, but good practice) # os.makedirs(download_base_path, exist_ok=True) # --- Construct the file path --- # Sanitize DOI for use in filename safe_doi = paper.doi.replace('/', '_').replace(':', '_') filename = f"{safe_doi}.pdf" full_path = os.path.join(download_base_path, filename) # Update paper status to "Done" and set the file path paper.status = "Done" paper.file_path = full_path # Use the constructed path # Log success ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="success", description=f"Successfully processed paper: {paper.doi}. File at: {full_path}" # Log path ) else: # Update paper status to "Failed" paper.status = "Failed" # Generate random error message error_message = random.choice([ "Publisher website unavailable", "No PDF download link found", "Access restricted", "Download timeout", "Invalid DOI", "Rate limited by publisher" ]) paper.error_msg = error_message # Log failure ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="error", description=f"Failed to process paper: {error_message}" ) # Update the timestamp paper.updated_at = datetime.utcnow() # Commit changes to database db.session.commit() return success @celery.task(bind=True) def process_paper_batch(self, paper_ids): """ Process a batch of papers to improve throughput and reduce overhead. Args: paper_ids (list): List of paper IDs to process in this batch """ # Check if scraper is still active scraper_state = ScraperState.get_current_state() if not scraper_state.is_active or scraper_state.is_paused: ActivityLog.log_scraper_activity( action="process_paper_batch", status="info", description=f"Skipped batch of {len(paper_ids)} papers because scraper is {'paused' if scraper_state.is_paused else 'stopped'}" ) return False # Log the batch starting ActivityLog.log_scraper_activity( action="process_paper_batch", status="info", description=f"Started processing batch of {len(paper_ids)} papers" ) # Process each paper in the batch results = { "success": 0, "failure": 0, "skipped": 0 } # Begin a transaction for the entire batch try: for paper_id in paper_ids: # Double-check scraper state before each paper scraper_state = ScraperState.get_current_state() if not scraper_state.is_active or scraper_state.is_paused: results["skipped"] += 1 continue # Get the paper from database paper = PaperMetadata.query.get(paper_id) if not paper: results["skipped"] += 1 continue # Simulate random success/failure (70% success rate) success = random.random() < 0.7 # Simulate some processing time (0.5-2 seconds per paper in batch) time.sleep(random.uniform(0.5, 2)) if success: # Update paper status to "Done" paper.status = "Done" paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf" results["success"] += 1 # Log individual paper success with minimal overhead ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="success", description=f"Processed in batch: {paper.title}" ) else: # Update paper status to "Failed" paper.status = "Failed" # Generate random error message error_message = random.choice([ "Publisher website unavailable", "No PDF download link found", "Access restricted", "Download timeout", "Invalid DOI", "Rate limited by publisher" ]) paper.error_msg = error_message results["failure"] += 1 # Log individual paper failure with minimal overhead ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="error", description=f"Failed in batch: {error_message}" ) # Update the timestamp paper.updated_at = datetime.utcnow() # Commit the entire batch at once db.session.commit() except Exception as e: # If any error occurs, roll back the entire batch db.session.rollback() ActivityLog.log_error( error_message=f"Error processing paper batch: {str(e)}", source="process_paper_batch" ) return False # Log batch completion ActivityLog.log_scraper_activity( action="process_paper_batch", status="success", description=f"Completed batch processing: {results['success']} succeeded, {results['failure']} failed, {results['skipped']} skipped" ) return results @bp.route("/reset", methods=["POST"]) def reset_scraper(): """ Reset the scraper completely: 1. Stop all running tasks 2. Optionally purge all papers except those with 'Pending' status 3. Reset scraper state to active and unpaused 4. Trigger a new scraping cycle """ # First stop everything stop_scraper() # Check if we should clear papers clear_papers = request.json.get('clear_papers', True) if request.is_json else True if clear_papers: try: # Get all papers that aren't in Pending status papers = PaperMetadata.query.filter(PaperMetadata.status != "Pending").all() count = len(papers) # Delete them all for paper in papers: db.session.delete(paper) db.session.commit() ActivityLog.log_scraper_command( action="reset_scraper", status="success", description=f"Reset scraper and cleared {count} non-pending papers" ) except Exception as e: db.session.rollback() ActivityLog.log_error( error_message=f"Failed to reset papers: {str(e)}", source="reset_scraper" ) return jsonify({ "success": False, "message": f"Error clearing papers: {str(e)}" }) # Set state to active and unpaused ScraperState.set_active(True) ScraperState.set_paused(False) # Re-register the periodic task if needed setup_periodic_tasks(celery) # Kick off a fresh scraping cycle dummy_scheduled_scraper.delay() return jsonify({ "success": True, "message": "Scraper has been completely reset and restarted" }) # Common utility function to revoke tasks by type def revoke_tasks_by_type(task_types, terminate=True): """ Revokes all tasks of specified types across scheduled, reserved and active queues. Args: task_types (list): List of task name strings to revoke terminate (bool): Whether to terminate running tasks Returns: int: Count of revoked tasks """ # Get all tasks of all types i = celery.control.inspect() scheduled = i.scheduled() or {} reserved = i.reserved() or {} active = i.active() or {} revoked_count = 0 # Revoke all scheduled tasks for worker, tasks in scheduled.items(): for task in tasks: if task['name'] in task_types: celery.control.revoke(task['id'], terminate=terminate) revoked_count += 1 # Revoke all reserved tasks for worker, tasks in reserved.items(): for task in tasks: if task['name'] in task_types: celery.control.revoke(task['id'], terminate=terminate) revoked_count += 1 # Revoke all active tasks for worker, tasks in active.items(): for task in tasks: if task['name'] in task_types: celery.control.revoke(task['id'], terminate=terminate) revoked_count += 1 return revoked_count import math from datetime import datetime from ..models import VolumeConfig, ScheduleConfig, PaperMetadata # Ensure imports from ..db import db # Ensure import def calculate_papers_for_current_hour(): """Calculates the target number of papers for the current hour using w*N/24.""" current_hour = datetime.now().hour volume_config = VolumeConfig.query.first() # Use default if not set volume = volume_config.volume if volume_config else 100 current_hour_config = ScheduleConfig.query.filter_by(hour=current_hour).first() # Use default weight 1.0 if not set for the hour weight = current_hour_config.weight if current_hour_config else 1.0 # Calculate papers using the formula: w * N / 24 papers_this_hour = math.floor(weight * volume / 24) # Log the calculation for debugging ActivityLog.log_scraper_activity( action="calculate_hourly_quota", status="info", description=f"Hour {current_hour}: weight={weight:.2f}, total_volume={volume}, target_papers={papers_this_hour}" ) return papers_this_hour