diff --git a/scipaperloader/scrapers/manager.py b/scipaperloader/scrapers/manager.py index e07f80c..9e145e5 100644 --- a/scipaperloader/scrapers/manager.py +++ b/scipaperloader/scrapers/manager.py @@ -46,10 +46,15 @@ class ScraperManager: # Test connection self.redis_client.ping() except Exception as e: - ActivityLog.log_error( - error_message=f"Failed to initialize Redis client: {str(e)}", - source="ScraperManager._init_redis_client" - ) + # Only log if we're in an application context + try: + ActivityLog.log_error( + error_message=f"Failed to initialize Redis client: {str(e)}", + source="ScraperManager._init_redis_client" + ) + except RuntimeError: + # Outside application context - just print to console + print(f"Warning: Failed to initialize Redis client: {str(e)}") self.redis_client = None def _clear_delayed_tasks_from_redis(self) -> int: @@ -320,16 +325,24 @@ class ScraperManager: def stop_scraper(self) -> Dict[str, str]: """Stop the scraper, revoke all running tasks, and revert pending papers.""" try: - # First, revoke all running tasks - revoked_count = 0 - delayed_cleared_count = 0 + # STEP 1: Immediately set scraper as inactive - this is critical for race condition prevention + ScraperState.set_active(False) + ScraperState.set_paused(False) ActivityLog.log_scraper_command( action="stop_scraper_start", status="info", - description="Beginning scraper stop process with task revocation and delayed task clearing" + description="Scraper stop initiated - marked as inactive. Beginning task revocation and delayed task clearing." ) + # STEP 2: Brief pause to allow running tasks to see the inactive state + import time + time.sleep(0.2) + + # STEP 3: Revoke all running tasks + revoked_count = 0 + delayed_cleared_count = 0 + try: # Get Celery inspector to check for running tasks i = celery.control.inspect() @@ -381,7 +394,7 @@ class ScraperManager: description="Purged all task queues" ) - # **NEW: Clear delayed tasks from Redis sorted sets** + # STEP 4: Clear delayed tasks from Redis sorted sets delayed_cleared_count = self._clear_delayed_tasks_from_redis() # Additional cleanup: revoke any remaining scraper-related tasks by name pattern @@ -434,7 +447,10 @@ class ScraperManager: ) # Continue with paper reversion even if task revocation fails - # Get current scraper to know what status to revert to + # STEP 5: Wait a bit longer for any remaining tasks to finish their checks and exit + time.sleep(1.0) + + # STEP 6: Revert papers from processing status scraper = get_scraper() input_statuses = scraper.get_input_statuses() @@ -464,14 +480,10 @@ class ScraperManager: description=f"Reverted {reverted_count} papers from '{processing_status}' to previous status" ) - # Deactivate scraper - ScraperState.set_active(False) - ScraperState.set_paused(False) - ActivityLog.log_scraper_command( action="stop_scraper", status="success", - description=f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers." + description=f"Scraper stopped completely. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers." ) return { @@ -656,6 +668,26 @@ class ScraperManager: def process_paper(self, paper: PaperMetadata) -> Dict: """Process a single paper using the current scraper.""" try: + # **RACE CONDITION FIX**: Double-check scraper state before proceeding + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="skipped", + description="Skipped processing - scraper deactivated during task execution" + ) + return {"paper_id": paper.id, "status": "skipped", "message": "Scraper not active"} + + if scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="skipped", + description="Skipped processing - scraper paused during task execution" + ) + return {"paper_id": paper.id, "status": "skipped", "message": "Scraper paused"} + scraper = get_scraper() output_statuses = scraper.get_output_statuses() @@ -668,6 +700,22 @@ class ScraperManager: paper.updated_at = datetime.utcnow() db.session.commit() + # **ADDITIONAL RACE CONDITION CHECK**: Verify scraper is still active before expensive scraping operation + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + # Scraper was deactivated after we marked paper as processing - revert and exit + paper.status = previous_status + paper.updated_at = datetime.utcnow() + db.session.commit() + + ActivityLog.log_scraper_activity( + action="process_paper", + paper_id=paper.id, + status="cancelled", + description="Cancelled processing - scraper deactivated after paper marked as processing" + ) + return {"paper_id": paper.id, "status": "cancelled", "message": "Scraper deactivated during processing"} + # Perform scraping result = scraper.scrape(paper.doi) diff --git a/scipaperloader/scrapers/tasks.py b/scipaperloader/scrapers/tasks.py index 295d99b..18d61da 100644 --- a/scipaperloader/scrapers/tasks.py +++ b/scipaperloader/scrapers/tasks.py @@ -109,16 +109,17 @@ def process_single_paper(self, paper_id: int): paper_id: ID of the paper to process """ try: - # Double-check scraper state before processing + # ENHANCED RACE CONDITION PROTECTION: Check scraper state multiple times + + # Initial check before any processing scraper_state = ScraperState.get_current_state() if not scraper_state.is_active: ActivityLog.log_scraper_activity( action="process_single_paper", paper_id=paper_id, status="skipped", - description="Skipped processing - scraper not active" + description="Task skipped - scraper not active (initial check)" ) - # Use Celery's ignore to mark this task as completed without error self.retry = False return {"status": "inactive", "paper_id": paper_id} @@ -127,9 +128,55 @@ def process_single_paper(self, paper_id: int): action="process_single_paper", paper_id=paper_id, status="skipped", - description="Skipped processing - scraper paused" + description="Task skipped - scraper paused (initial check)" + ) + self.retry = False + return {"status": "paused", "paper_id": paper_id} + + # Check if this specific task has been revoked + try: + from ..celery import celery + + # Check if the current task is in the revoked list + if hasattr(self, 'request') and self.request.id: + revoked_tasks = celery.control.inspect().revoked() + if revoked_tasks: + for worker, tasks in revoked_tasks.items(): + if self.request.id in tasks: + ActivityLog.log_scraper_activity( + action="process_single_paper", + paper_id=paper_id, + status="skipped", + description=f"Task skipped - task ID {self.request.id} was revoked" + ) + return {"status": "revoked", "paper_id": paper_id, "task_id": self.request.id} + except Exception: + # Don't fail on revocation check issues, just continue with state checks + pass + + # Brief pause to allow stop commands to take effect + import time + time.sleep(0.1) + + # Second check after brief delay + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="process_single_paper", + paper_id=paper_id, + status="skipped", + description="Task skipped - scraper not active (secondary check)" + ) + self.retry = False + return {"status": "inactive", "paper_id": paper_id} + + if scraper_state.is_paused: + ActivityLog.log_scraper_activity( + action="process_single_paper", + paper_id=paper_id, + status="skipped", + description="Task skipped - scraper paused (secondary check)" ) - # Use Celery's ignore for paused state too self.retry = False return {"status": "paused", "paper_id": paper_id} @@ -139,6 +186,18 @@ def process_single_paper(self, paper_id: int): if not paper: return {"status": "error", "message": f"Paper {paper_id} not found"} + # Third check before starting actual processing + scraper_state = ScraperState.get_current_state() + if not scraper_state.is_active: + ActivityLog.log_scraper_activity( + action="process_single_paper", + paper_id=paper_id, + status="skipped", + description="Task skipped - scraper not active (pre-processing check)" + ) + self.retry = False + return {"status": "inactive", "paper_id": paper_id} + # Process the paper using scraper manager manager = ScraperManager() result = manager.process_paper(paper)