""" Simplified scraper management system with hourly quota scheduling. Uses APScheduler for all task processing - no Celery dependencies. """ import random import math from datetime import datetime, timedelta, UTC from typing import List, Dict, Optional from sqlalchemy import func from flask import current_app from ..models import ( PaperMetadata, ScheduleConfig, VolumeConfig, ScraperState, ActivityLog, ScraperModuleConfig ) from ..db import db from ..cache_utils import get_cached_hourly_quota from .factory import get_scraper, get_available_scrapers class ScraperManager: """Manages scraper operations with hourly quota-based scheduling.""" def __init__(self): self.current_scraper = None self.pending_papers = [] # Track papers being processed # No more Redis client initialization - using APScheduler now def _get_scheduler(self): """Get the APScheduler instance from Flask app config.""" try: return current_app.config.get('SCHEDULER') except RuntimeError: # Outside application context return None def _clear_delayed_tasks_from_apscheduler(self) -> int: """Clear delayed tasks from APScheduler - clean replacement for Redis manipulation. Returns: int: Number of delayed tasks cleared """ scheduler = self._get_scheduler() if not scheduler: try: ActivityLog.log_error( error_message="APScheduler not available - cannot clear delayed tasks", source="ScraperManager._clear_delayed_tasks_from_apscheduler" ) except RuntimeError: print("❌ APScheduler not available - cannot clear delayed tasks") return 0 try: cleared_count = scheduler.revoke_all_scraper_jobs() # Summary logging if cleared_count > 0: try: ActivityLog.log_scraper_activity( action="clear_delayed_tasks_complete_apscheduler", status="success", description=f"Total delayed scraper tasks cleared from APScheduler: {cleared_count}" ) except RuntimeError: print(f"✅ Total delayed scraper tasks cleared from APScheduler: {cleared_count}") else: try: ActivityLog.log_scraper_activity( action="clear_delayed_tasks_complete_apscheduler", status="info", description="No delayed scraper tasks found to clear in APScheduler" ) except RuntimeError: print("ℹ️ No delayed scraper tasks found to clear in APScheduler") return cleared_count except Exception as e: try: ActivityLog.log_error( error_message=f"Failed to clear delayed tasks from APScheduler: {str(e)}", source="ScraperManager._clear_delayed_tasks_from_apscheduler" ) except RuntimeError: print(f"❌ Failed to clear delayed tasks from APScheduler: {str(e)}") return 0 def start_scraper(self) -> Dict[str, str]: """Start the scraper system.""" try: # Get current scraper self.current_scraper = get_scraper() # Activate scraper state ScraperState.set_active(True) ScraperState.set_paused(False) scraper_name = self.current_scraper.get_name() ActivityLog.log_scraper_command( action="start_scraper", status="success", description=f"Started scraper: {scraper_name}. Use /trigger-immediate endpoint to immediately schedule papers instead of waiting for the next hourly boundary." ) return {"status": "success", "message": "Scraper started successfully. Papers will be scheduled at the next hourly boundary, or use /trigger-immediate to schedule immediately."} except Exception as e: ActivityLog.log_error( error_message=f"Failed to start scraper: {str(e)}", source="ScraperManager.start_scraper" ) return {"status": "error", "message": str(e)} def pause_scraper(self) -> Dict[str, str]: """Pause the scraper system.""" try: ScraperState.set_paused(True) ActivityLog.log_scraper_command( action="pause_scraper", status="success", description="Scraper paused - processing will halt" ) return {"status": "success", "message": "Scraper paused"} except Exception as e: return {"status": "error", "message": str(e)} def resume_scraper(self) -> Dict[str, str]: """Resume the scraper system.""" try: ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="resume_scraper", status="success", description="Scraper resumed - processing will continue" ) return {"status": "success", "message": "Scraper resumed"} except Exception as e: return {"status": "error", "message": str(e)} def stop_scraper(self) -> Dict[str, str]: """Stop the scraper, revoke all APScheduler jobs, and revert pending papers.""" try: # STEP 1: Immediately set scraper as inactive - this is critical for race condition prevention ScraperState.set_active(False) ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="stop_scraper_start", status="info", description="Scraper stop initiated - marked as inactive. Beginning APScheduler job revocation." ) # STEP 2: Brief pause to allow running jobs to see the inactive state import time time.sleep(0.2) # STEP 3: Revoke all APScheduler jobs delayed_cleared_count = self._clear_delayed_tasks_from_apscheduler() # STEP 4: Wait a bit for any remaining jobs to finish their checks and exit time.sleep(1.0) # STEP 5: Revert papers from processing status scraper = get_scraper() input_statuses = scraper.get_input_statuses() # Find papers that are currently being processed processing_status = scraper.get_output_statuses()["processing"] pending_papers = PaperMetadata.query.filter_by(status=processing_status).all() # Revert their status to the first input status reverted_count = 0 if pending_papers and input_statuses: revert_status = input_statuses[0] # Use first input status as default for paper in pending_papers: # Try to use previous_status if available, otherwise use first input status if hasattr(paper, 'previous_status') and paper.previous_status: paper.status = paper.previous_status else: paper.status = revert_status paper.updated_at = datetime.now(UTC) reverted_count += 1 db.session.commit() ActivityLog.log_scraper_activity( action="revert_pending_papers", status="success", description=f"Reverted {reverted_count} papers from '{processing_status}' to previous status" ) ActivityLog.log_scraper_command( action="stop_scraper", status="success", description=f"Scraper stopped completely. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers." ) return { "status": "success", "message": f"Scraper stopped. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers to previous status." } except Exception as e: ActivityLog.log_error( error_message=f"Failed to stop scraper: {str(e)}", source="ScraperManager.stop_scraper" ) return {"status": "error", "message": str(e)} def reset_scraper(self) -> Dict[str, str]: """Reset scraper state, revoke all APScheduler jobs, and clear all processing statuses.""" try: ActivityLog.log_scraper_command( action="reset_scraper_start", status="info", description="Beginning scraper reset process with APScheduler job revocation" ) # Clear all APScheduler jobs delayed_cleared_count = self._clear_delayed_tasks_from_apscheduler() # Get current scraper configuration scraper = get_scraper() input_statuses = scraper.get_input_statuses() processing_status = scraper.get_output_statuses()["processing"] # Reset all papers in processing status pending_papers = PaperMetadata.query.filter_by(status=processing_status).all() reverted_count = 0 if pending_papers and input_statuses: revert_status = input_statuses[0] for paper in pending_papers: # Try to use previous_status if available, otherwise use first input status if hasattr(paper, 'previous_status') and paper.previous_status: paper.status = paper.previous_status else: paper.status = revert_status paper.updated_at = datetime.now(UTC) paper.error_msg = None # Clear any error messages reverted_count += 1 db.session.commit() # Reset scraper state ScraperState.set_active(False) ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="reset_scraper", status="success", description=f"Scraper reset. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers." ) return { "status": "success", "message": f"Scraper reset. Cleared {delayed_cleared_count} APScheduler jobs and reverted {reverted_count} papers to original status." } except Exception as e: return {"status": "error", "message": str(e)} def get_current_hour_quota(self) -> int: """Calculate papers to process in current hour based on schedule.""" try: return get_cached_hourly_quota(self._calculate_papers_for_current_hour) except Exception as e: ActivityLog.log_error( error_message=f"Error calculating hourly quota: {str(e)}", source="ScraperManager.get_current_hour_quota" ) return 0 def _calculate_papers_for_current_hour(self) -> int: """Internal method to calculate hourly quota.""" try: # Get current hour and volume config current_hour = datetime.now().hour volume_config = VolumeConfig.get_current_volume() daily_volume = volume_config if volume_config else 100 # Get schedule config for current hour schedule_config = ScheduleConfig.query.filter_by(hour=current_hour).first() current_weight = schedule_config.weight if schedule_config else 1.0 # Get total weight across all hours total_weight = db.session.query(func.sum(ScheduleConfig.weight)).scalar() or 24.0 # Calculate quota: (current_weight / total_weight) * daily_volume quota = math.ceil((current_weight / total_weight) * daily_volume) ActivityLog.log_scraper_activity( action="calculate_hourly_quota", status="info", description=f"Hour {current_hour}: quota={quota} (weight={current_weight}, total_weight={total_weight}, daily_volume={daily_volume})" ) return max(1, quota) # Ensure at least 1 paper per hour except Exception as e: ActivityLog.log_error( error_message=f"Error in quota calculation: {str(e)}", source="ScraperManager._calculate_papers_for_current_hour" ) return 1 # Fallback to 1 paper per hour def select_papers_for_processing(self, limit: Optional[int] = None) -> List[PaperMetadata]: """Select papers for processing based on current scraper configuration.""" try: scraper = get_scraper() input_statuses = scraper.get_input_statuses() if not input_statuses: return [] # Use provided limit or calculate from hourly quota papers_needed = limit if limit is not None else self.get_current_hour_quota() # Query papers with input statuses, randomize selection papers = (PaperMetadata.query .filter(PaperMetadata.status.in_(input_statuses)) .order_by(func.random()) .limit(papers_needed) .all()) ActivityLog.log_scraper_activity( action="select_papers", status="info", description=f"Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})" ) return papers except Exception as e: ActivityLog.log_error( error_message=f"Error selecting papers: {str(e)}", source="ScraperManager.select_papers_for_processing" ) return [] def process_paper(self, paper: PaperMetadata) -> Dict: """Process a single paper using the current scraper.""" try: # **RACE CONDITION FIX**: Double-check scraper state before proceeding scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="skipped", description="Skipped processing - scraper deactivated during task execution" ) return {"paper_id": paper.id, "status": "skipped", "message": "Scraper not active"} if scraper_state.is_paused: ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="skipped", description="Skipped processing - scraper paused during task execution" ) return {"paper_id": paper.id, "status": "skipped", "message": "Scraper paused"} scraper = get_scraper() output_statuses = scraper.get_output_statuses() # Store the previous status before changing it previous_status = paper.status # Update paper status to processing paper.previous_status = previous_status paper.status = output_statuses["processing"] paper.updated_at = datetime.now(UTC) db.session.commit() # **ADDITIONAL RACE CONDITION CHECK**: Verify scraper is still active before expensive scraping operation scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: # Scraper was deactivated after we marked paper as processing - revert and exit paper.status = previous_status paper.updated_at = datetime.now(UTC) db.session.commit() ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="cancelled", description="Cancelled processing - scraper deactivated after paper marked as processing" ) return {"paper_id": paper.id, "status": "cancelled", "message": "Scraper deactivated during processing"} # Perform scraping result = scraper.scrape(paper.doi) # Update paper status based on result if result.status == "success": paper.status = output_statuses["success"] paper.error_msg = None if result.data and "file_path" in result.data: paper.file_path = result.data["file_path"] else: paper.status = output_statuses["failure"] paper.error_msg = result.message paper.updated_at = datetime.now(UTC) db.session.commit() # Log result ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status=result.status, description=f"Processed {paper.doi}: {result.message}" ) return { "paper_id": paper.id, "status": result.status, "message": result.message, "duration": result.duration } except Exception as e: # Revert paper status on error try: input_statuses = get_scraper().get_input_statuses() if input_statuses: paper.status = input_statuses[0] paper.error_msg = f"Processing error: {str(e)}" paper.updated_at = datetime.now(UTC) db.session.commit() except: pass # Don't fail if reversion fails ActivityLog.log_error( error_message=f"Error processing paper {paper.id}: {str(e)}", source="ScraperManager.process_paper" ) return {"paper_id": paper.id, "status": "error", "message": str(e)} def process_paper_manual(self, paper: PaperMetadata, scraper_name: Optional[str] = None) -> Dict: """Process a single paper manually, bypassing scraper state checks.""" try: # Get scraper configuration but skip state validation for manual processing if scraper_name: # Use the specified scraper import importlib from .base import BaseScraper try: module = importlib.import_module(f"scipaperloader.scrapers.{scraper_name}") scraper_cls = getattr(module, "Scraper") if not issubclass(scraper_cls, BaseScraper): raise TypeError(f"Scraper class in module '{scraper_name}' does not inherit from BaseScraper") scraper = scraper_cls() except (ImportError, AttributeError, TypeError) as e: ActivityLog.log_error( error_message=f"Failed to load specified scraper '{scraper_name}': {str(e)}. Falling back to system default.", source="ScraperManager.process_paper_manual" ) scraper = get_scraper() else: # Use system default scraper scraper = get_scraper() output_statuses = scraper.get_output_statuses() # Store the previous status before changing it previous_status = paper.status # Update paper status to processing paper.previous_status = previous_status paper.status = output_statuses["processing"] paper.updated_at = datetime.now(UTC) db.session.commit() # Perform scraping (no state checks for manual processing) result = scraper.scrape(paper.doi) # Update paper status based on result if result.status == "success": paper.status = output_statuses["success"] paper.error_msg = None if result.data and "file_path" in result.data: paper.file_path = result.data["file_path"] else: paper.status = output_statuses["failure"] paper.error_msg = result.message paper.updated_at = datetime.now(UTC) db.session.commit() # Log result ActivityLog.log_scraper_activity( action="process_paper_manual", paper_id=paper.id, status=result.status, description=f"Manually processed {paper.doi}: {result.message}" ) return { "paper_id": paper.id, "status": result.status, "message": result.message, "duration": result.duration } except Exception as e: # Revert paper status on error try: input_statuses = get_scraper().get_input_statuses() if input_statuses: paper.status = input_statuses[0] paper.error_msg = f"Manual processing error: {str(e)}" paper.updated_at = datetime.now(UTC) db.session.commit() except: pass # Don't fail if reversion fails ActivityLog.log_error( error_message=f"Error manually processing paper {paper.id}: {str(e)}", source="ScraperManager.process_paper_manual" ) return {"paper_id": paper.id, "status": "error", "message": str(e)} def get_status(self) -> Dict: """Get current scraper status.""" scraper_state = ScraperState.get_current_state() scraper = get_scraper() # Count papers by status input_statuses = scraper.get_input_statuses() output_statuses = scraper.get_output_statuses() available_count = (PaperMetadata.query .filter(PaperMetadata.status.in_(input_statuses)) .count()) processing_count = (PaperMetadata.query .filter_by(status=output_statuses["processing"]) .count()) return { "active": scraper_state.is_active, "paused": scraper_state.is_paused, "current_scraper": scraper.get_name(), "input_statuses": input_statuses, "output_statuses": output_statuses, "available_papers": available_count, "processing_papers": processing_count, "current_hour_quota": self.get_current_hour_quota() }