""" Simplified scraper management system with hourly quota scheduling. """ import random import math import redis from datetime import datetime, timedelta from typing import List, Dict, Optional from sqlalchemy import func from ..models import ( PaperMetadata, ScheduleConfig, VolumeConfig, ScraperState, ActivityLog, ScraperModuleConfig ) from ..db import db from ..cache_utils import get_cached_hourly_quota from .factory import get_scraper, get_available_scrapers from ..celery import celery class ScraperManager: """Manages scraper operations with hourly quota-based scheduling.""" def __init__(self): self.current_scraper = None self.pending_papers = [] # Track papers being processed # Initialize Redis client for delayed task management self.redis_client = None self._init_redis_client() def _init_redis_client(self): """Initialize Redis client for delayed task management.""" try: # Use same Redis configuration as Celery self.redis_client = redis.Redis( host='localhost', port=6379, db=0, decode_responses=True ) # Test connection self.redis_client.ping() except Exception as e: ActivityLog.log_error( error_message=f"Failed to initialize Redis client: {str(e)}", source="ScraperManager._init_redis_client" ) self.redis_client = None def _clear_delayed_tasks_from_redis(self) -> int: """Clear delayed tasks from Redis structures used by Celery. Based on analysis, Celery stores delayed tasks in: - 'unacked_index': Sorted set containing task IDs with execution timestamps - 'unacked': Hash containing task data keyed by task ID Returns: int: Number of delayed tasks cleared """ if not self.redis_client: try: ActivityLog.log_error( error_message="Redis client not available - cannot clear delayed tasks", source="ScraperManager._clear_delayed_tasks_from_redis" ) except RuntimeError: # Working outside application context - just print instead print("❌ Redis client not available - cannot clear delayed tasks") return 0 cleared_count = 0 try: # Define scraper task patterns to identify our tasks scraper_patterns = [ 'process_single_paper', 'process_papers_batch', 'hourly_scraper_scheduler' ] try: ActivityLog.log_scraper_activity( action="check_delayed_tasks", status="info", description="Checking Celery delayed task structures (unacked_index, unacked)" ) except RuntimeError: print("🔍 Checking Celery delayed task structures (unacked_index, unacked)") # Check 'unacked_index' (sorted set with task IDs and timestamps) unacked_index_cleared = 0 if self.redis_client.exists('unacked_index'): try: # Get all task IDs from the sorted set task_ids = self.redis_client.zrange('unacked_index', 0, -1) if task_ids: try: ActivityLog.log_scraper_activity( action="scan_unacked_index", status="info", description=f"Found {len(task_ids)} tasks in 'unacked_index'" ) except RuntimeError: print(f"📋 Found {len(task_ids)} tasks in 'unacked_index'") # Check each task ID against the 'unacked' hash to get task details scraper_task_ids = [] for task_id in task_ids: try: # Get task data from 'unacked' hash task_data = self.redis_client.hget('unacked', task_id) if task_data: # Check if this task contains any of our scraper patterns if any(pattern in str(task_data) for pattern in scraper_patterns): scraper_task_ids.append(task_id) except Exception: # Skip individual task errors continue # Remove scraper task IDs from both structures for task_id in scraper_task_ids: try: # Remove from unacked_index (sorted set) removed_from_index = self.redis_client.zrem('unacked_index', task_id) # Remove from unacked (hash) removed_from_hash = self.redis_client.hdel('unacked', task_id) if removed_from_index or removed_from_hash: unacked_index_cleared += 1 except Exception as e: try: ActivityLog.log_error( error_message=f"Error removing delayed task {task_id}: {str(e)}", source="ScraperManager._clear_delayed_tasks_from_redis" ) except RuntimeError: print(f"❌ Error removing delayed task {task_id}: {str(e)}") continue cleared_count += unacked_index_cleared if unacked_index_cleared > 0: try: ActivityLog.log_scraper_activity( action="clear_unacked_tasks", status="success", description=f"Cleared {unacked_index_cleared} scraper tasks from unacked structures" ) except RuntimeError: print(f"✅ Cleared {unacked_index_cleared} scraper tasks from unacked structures") else: try: ActivityLog.log_scraper_activity( action="check_unacked_index", status="info", description="No tasks found in 'unacked_index'" ) except RuntimeError: print("â„šī¸ No tasks found in 'unacked_index'") except Exception as e: try: ActivityLog.log_error( error_message=f"Error accessing 'unacked_index': {str(e)}", source="ScraperManager._clear_delayed_tasks_from_redis" ) except RuntimeError: print(f"❌ Error accessing 'unacked_index': {str(e)}") else: try: ActivityLog.log_scraper_activity( action="check_unacked_index", status="info", description="'unacked_index' key does not exist - no delayed tasks" ) except RuntimeError: print("â„šī¸ 'unacked_index' key does not exist - no delayed tasks") # Also check the 'celery' queue for immediate tasks (backup check) celery_cleared = 0 try: queue_length = self.redis_client.llen('celery') if queue_length and queue_length > 0: # Scan for any scraper tasks in the immediate queue scraper_tasks = [] for i in range(queue_length): try: task_data = self.redis_client.lindex('celery', i) if task_data and any(pattern in str(task_data) for pattern in scraper_patterns): scraper_tasks.append(task_data) except Exception: continue # Remove scraper tasks from celery queue for task_data in scraper_tasks: try: removed_count = self.redis_client.lrem('celery', 0, task_data) celery_cleared += removed_count except Exception: continue cleared_count += celery_cleared if celery_cleared > 0: try: ActivityLog.log_scraper_activity( action="clear_celery_tasks", status="success", description=f"Cleared {celery_cleared} scraper tasks from 'celery' queue" ) except RuntimeError: print(f"✅ Cleared {celery_cleared} scraper tasks from 'celery' queue") except Exception as e: try: ActivityLog.log_error( error_message=f"Error checking 'celery' queue: {str(e)}", source="ScraperManager._clear_delayed_tasks_from_redis" ) except RuntimeError: print(f"❌ Error checking 'celery' queue: {str(e)}") # Summary if cleared_count > 0: try: ActivityLog.log_scraper_activity( action="clear_delayed_tasks_complete", status="success", description=f"Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})" ) except RuntimeError: print(f"✅ Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})") else: try: ActivityLog.log_scraper_activity( action="clear_delayed_tasks_complete", status="info", description="No delayed scraper tasks found to clear in Redis" ) except RuntimeError: print("â„šī¸ No delayed scraper tasks found to clear in Redis") return cleared_count except Exception as e: try: ActivityLog.log_error( error_message=f"Failed to clear delayed tasks from Redis: {str(e)}", source="ScraperManager._clear_delayed_tasks_from_redis" ) except RuntimeError: print(f"❌ Failed to clear delayed tasks from Redis: {str(e)}") return 0 def start_scraper(self) -> Dict[str, str]: """Start the scraper system.""" try: # Get current scraper self.current_scraper = get_scraper() # Activate scraper state ScraperState.set_active(True) ScraperState.set_paused(False) scraper_name = self.current_scraper.get_name() ActivityLog.log_scraper_command( action="start_scraper", status="success", description=f"Started scraper: {scraper_name}. Use /trigger-immediate endpoint to immediately schedule papers instead of waiting for the next hourly boundary." ) return {"status": "success", "message": "Scraper started successfully. Papers will be scheduled at the next hourly boundary, or use /trigger-immediate to schedule immediately."} except Exception as e: ActivityLog.log_error( error_message=f"Failed to start scraper: {str(e)}", source="ScraperManager.start_scraper" ) return {"status": "error", "message": str(e)} def pause_scraper(self) -> Dict[str, str]: """Pause the scraper system.""" try: ScraperState.set_paused(True) ActivityLog.log_scraper_command( action="pause_scraper", status="success", description="Scraper paused - processing will halt" ) return {"status": "success", "message": "Scraper paused"} except Exception as e: return {"status": "error", "message": str(e)} def resume_scraper(self) -> Dict[str, str]: """Resume the scraper system.""" try: ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="resume_scraper", status="success", description="Scraper resumed - processing will continue" ) return {"status": "success", "message": "Scraper resumed"} except Exception as e: return {"status": "error", "message": str(e)} def stop_scraper(self) -> Dict[str, str]: """Stop the scraper, revoke all running tasks, and revert pending papers.""" try: # First, revoke all running tasks revoked_count = 0 delayed_cleared_count = 0 ActivityLog.log_scraper_command( action="stop_scraper_start", status="info", description="Beginning scraper stop process with task revocation and delayed task clearing" ) try: # Get Celery inspector to check for running tasks i = celery.control.inspect() active = i.active() or {} scheduled = i.scheduled() or {} reserved = i.reserved() or {} # Revoke active tasks for worker, tasks in active.items(): for task in tasks: if 'id' in task: celery.control.revoke(task['id'], terminate=True) revoked_count += 1 ActivityLog.log_scraper_activity( action="revoke_task", status="success", description=f"Revoked active task: {task.get('name', 'unknown')} (ID: {task['id']})" ) # Revoke scheduled tasks for worker, tasks in scheduled.items(): for task in tasks: if 'id' in task: celery.control.revoke(task['id'], terminate=True) revoked_count += 1 ActivityLog.log_scraper_activity( action="revoke_task", status="success", description=f"Revoked scheduled task: {task.get('name', 'unknown')} (ID: {task['id']})" ) # Revoke reserved tasks for worker, tasks in reserved.items(): for task in tasks: if 'id' in task: celery.control.revoke(task['id'], terminate=True) revoked_count += 1 ActivityLog.log_scraper_activity( action="revoke_task", status="success", description=f"Revoked reserved task: {task.get('name', 'unknown')} (ID: {task['id']})" ) # Purge all task queues celery.control.purge() ActivityLog.log_scraper_activity( action="purge_queues", status="success", description="Purged all task queues" ) # **NEW: Clear delayed tasks from Redis sorted sets** delayed_cleared_count = self._clear_delayed_tasks_from_redis() # Additional cleanup: revoke any remaining scraper-related tasks by name pattern try: # Use broadcast to revoke tasks that match scraper patterns scraper_task_patterns = [ 'process_single_paper', 'process_papers_batch', 'hourly_scraper_scheduler' ] # Get a fresh inspection of tasks after purge fresh_inspect = celery.control.inspect() all_tasks = {} all_tasks.update(fresh_inspect.active() or {}) all_tasks.update(fresh_inspect.scheduled() or {}) all_tasks.update(fresh_inspect.reserved() or {}) additional_revoked = 0 for worker, tasks in all_tasks.items(): for task in tasks: task_name = task.get('name', '') task_id = task.get('id', '') if any(pattern in task_name for pattern in scraper_task_patterns) and task_id: celery.control.revoke(task_id, terminate=True) additional_revoked += 1 ActivityLog.log_scraper_activity( action="revoke_scraper_task", status="success", description=f"Revoked lingering scraper task: {task_name} (ID: {task_id})" ) if additional_revoked > 0: ActivityLog.log_scraper_activity( action="cleanup_scraper_tasks", status="success", description=f"Additional cleanup: revoked {additional_revoked} lingering scraper tasks" ) except Exception as e: ActivityLog.log_error( error_message=f"Error during additional scraper task cleanup: {str(e)}", source="ScraperManager.stop_scraper.cleanup" ) except Exception as e: ActivityLog.log_error( error_message=f"Error revoking tasks: {str(e)}", source="ScraperManager.stop_scraper" ) # Continue with paper reversion even if task revocation fails # Get current scraper to know what status to revert to scraper = get_scraper() input_statuses = scraper.get_input_statuses() # Find papers that are currently being processed processing_status = scraper.get_output_statuses()["processing"] pending_papers = PaperMetadata.query.filter_by(status=processing_status).all() # Revert their status to the first input status reverted_count = 0 if pending_papers and input_statuses: revert_status = input_statuses[0] # Use first input status as default for paper in pending_papers: # Try to use previous_status if available, otherwise use first input status if hasattr(paper, 'previous_status') and paper.previous_status: paper.status = paper.previous_status else: paper.status = revert_status paper.updated_at = datetime.utcnow() reverted_count += 1 db.session.commit() ActivityLog.log_scraper_activity( action="revert_pending_papers", status="success", description=f"Reverted {reverted_count} papers from '{processing_status}' to previous status" ) # Deactivate scraper ScraperState.set_active(False) ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="stop_scraper", status="success", description=f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers." ) return { "status": "success", "message": f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers to previous status." } except Exception as e: ActivityLog.log_error( error_message=f"Failed to stop scraper: {str(e)}", source="ScraperManager.stop_scraper" ) return {"status": "error", "message": str(e)} def reset_scraper(self) -> Dict[str, str]: """Reset scraper state, revoke all running tasks, and clear all processing statuses.""" try: # First, revoke all running tasks (similar to stop_scraper) revoked_count = 0 ActivityLog.log_scraper_command( action="reset_scraper_start", status="info", description="Beginning scraper reset process with task revocation" ) try: # Get Celery inspector to check for running tasks i = celery.control.inspect() active = i.active() or {} scheduled = i.scheduled() or {} reserved = i.reserved() or {} # Revoke all tasks (active, scheduled, reserved) for queue_name, queue_tasks in [("active", active), ("scheduled", scheduled), ("reserved", reserved)]: for worker, tasks in queue_tasks.items(): for task in tasks: if 'id' in task: celery.control.revoke(task['id'], terminate=True) revoked_count += 1 ActivityLog.log_scraper_activity( action="revoke_task", status="success", description=f"Revoked {queue_name} task: {task.get('name', 'unknown')} (ID: {task['id']})" ) # Purge all task queues celery.control.purge() ActivityLog.log_scraper_activity( action="purge_queues", status="success", description="Purged all task queues during reset" ) except Exception as e: ActivityLog.log_error( error_message=f"Error revoking tasks during reset: {str(e)}", source="ScraperManager.reset_scraper" ) # Continue with paper reversion even if task revocation fails # Get current scraper configuration scraper = get_scraper() input_statuses = scraper.get_input_statuses() processing_status = scraper.get_output_statuses()["processing"] # Reset all papers in processing status pending_papers = PaperMetadata.query.filter_by(status=processing_status).all() reverted_count = 0 if pending_papers and input_statuses: revert_status = input_statuses[0] for paper in pending_papers: # Try to use previous_status if available, otherwise use first input status if hasattr(paper, 'previous_status') and paper.previous_status: paper.status = paper.previous_status else: paper.status = revert_status paper.updated_at = datetime.utcnow() paper.error_msg = None # Clear any error messages reverted_count += 1 db.session.commit() # Reset scraper state ScraperState.set_active(False) ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="reset_scraper", status="success", description=f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers." ) return { "status": "success", "message": f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers to original status." } except Exception as e: return {"status": "error", "message": str(e)} def get_current_hour_quota(self) -> int: """Calculate papers to process in current hour based on schedule.""" try: return get_cached_hourly_quota(self._calculate_papers_for_current_hour) except Exception as e: ActivityLog.log_error( error_message=f"Error calculating hourly quota: {str(e)}", source="ScraperManager.get_current_hour_quota" ) return 0 def _calculate_papers_for_current_hour(self) -> int: """Internal method to calculate hourly quota.""" try: # Get current hour and volume config current_hour = datetime.now().hour volume_config = VolumeConfig.get_current_volume() daily_volume = volume_config if volume_config else 100 # Get schedule config for current hour schedule_config = ScheduleConfig.query.filter_by(hour=current_hour).first() current_weight = schedule_config.weight if schedule_config else 1.0 # Get total weight across all hours total_weight = db.session.query(func.sum(ScheduleConfig.weight)).scalar() or 24.0 # Calculate quota: (current_weight / total_weight) * daily_volume quota = math.ceil((current_weight / total_weight) * daily_volume) ActivityLog.log_scraper_activity( action="calculate_hourly_quota", status="info", description=f"Hour {current_hour}: quota={quota} (weight={current_weight}, total_weight={total_weight}, daily_volume={daily_volume})" ) return max(1, quota) # Ensure at least 1 paper per hour except Exception as e: ActivityLog.log_error( error_message=f"Error in quota calculation: {str(e)}", source="ScraperManager._calculate_papers_for_current_hour" ) return 1 # Fallback to 1 paper per hour def select_papers_for_processing(self, limit: Optional[int] = None) -> List[PaperMetadata]: """Select papers for processing based on current scraper configuration.""" try: scraper = get_scraper() input_statuses = scraper.get_input_statuses() if not input_statuses: return [] # Use provided limit or calculate from hourly quota papers_needed = limit if limit is not None else self.get_current_hour_quota() # Query papers with input statuses, randomize selection papers = (PaperMetadata.query .filter(PaperMetadata.status.in_(input_statuses)) .order_by(func.random()) .limit(papers_needed) .all()) ActivityLog.log_scraper_activity( action="select_papers", status="info", description=f"Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})" ) return papers except Exception as e: ActivityLog.log_error( error_message=f"Error selecting papers: {str(e)}", source="ScraperManager.select_papers_for_processing" ) return [] def process_paper(self, paper: PaperMetadata) -> Dict: """Process a single paper using the current scraper.""" try: scraper = get_scraper() output_statuses = scraper.get_output_statuses() # Store the previous status before changing it previous_status = paper.status # Update paper status to processing paper.previous_status = previous_status paper.status = output_statuses["processing"] paper.updated_at = datetime.utcnow() db.session.commit() # Perform scraping result = scraper.scrape(paper.doi) # Update paper status based on result if result.status == "success": paper.status = output_statuses["success"] paper.error_msg = None if result.data and "file_path" in result.data: paper.file_path = result.data["file_path"] else: paper.status = output_statuses["failure"] paper.error_msg = result.message paper.updated_at = datetime.utcnow() db.session.commit() # Log result ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status=result.status, description=f"Processed {paper.doi}: {result.message}" ) return { "paper_id": paper.id, "status": result.status, "message": result.message, "duration": result.duration } except Exception as e: # Revert paper status on error try: input_statuses = get_scraper().get_input_statuses() if input_statuses: paper.status = input_statuses[0] paper.error_msg = f"Processing error: {str(e)}" paper.updated_at = datetime.utcnow() db.session.commit() except: pass # Don't fail if reversion fails ActivityLog.log_error( error_message=f"Error processing paper {paper.id}: {str(e)}", source="ScraperManager.process_paper" ) return {"paper_id": paper.id, "status": "error", "message": str(e)} def get_status(self) -> Dict: """Get current scraper status.""" scraper_state = ScraperState.get_current_state() scraper = get_scraper() # Count papers by status input_statuses = scraper.get_input_statuses() output_statuses = scraper.get_output_statuses() available_count = (PaperMetadata.query .filter(PaperMetadata.status.in_(input_statuses)) .count()) processing_count = (PaperMetadata.query .filter_by(status=output_statuses["processing"]) .count()) return { "active": scraper_state.is_active, "paused": scraper_state.is_paused, "current_scraper": scraper.get_name(), "input_statuses": input_statuses, "output_statuses": output_statuses, "available_papers": available_count, "processing_papers": processing_count, "current_hour_quota": self.get_current_hour_quota() }