2025-06-10 11:40:36 +02:00

796 lines
36 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Simplified scraper management system with hourly quota scheduling.
"""
import random
import math
import redis
from datetime import datetime, timedelta
from typing import List, Dict, Optional
from sqlalchemy import func
from ..models import (
PaperMetadata,
ScheduleConfig,
VolumeConfig,
ScraperState,
ActivityLog,
ScraperModuleConfig
)
from ..db import db
from ..cache_utils import get_cached_hourly_quota
from .factory import get_scraper, get_available_scrapers
from ..celery import celery
class ScraperManager:
"""Manages scraper operations with hourly quota-based scheduling."""
def __init__(self):
self.current_scraper = None
self.pending_papers = [] # Track papers being processed
# Initialize Redis client for delayed task management
self.redis_client = None
self._init_redis_client()
def _init_redis_client(self):
"""Initialize Redis client for delayed task management."""
try:
# Use same Redis configuration as Celery
self.redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
# Test connection
self.redis_client.ping()
except Exception as e:
# Only log if we're in an application context
try:
ActivityLog.log_error(
error_message=f"Failed to initialize Redis client: {str(e)}",
source="ScraperManager._init_redis_client"
)
except RuntimeError:
# Outside application context - just print to console
print(f"Warning: Failed to initialize Redis client: {str(e)}")
self.redis_client = None
def _clear_delayed_tasks_from_redis(self) -> int:
"""Clear delayed tasks from Redis structures used by Celery.
Based on analysis, Celery stores delayed tasks in:
- 'unacked_index': Sorted set containing task IDs with execution timestamps
- 'unacked': Hash containing task data keyed by task ID
Returns:
int: Number of delayed tasks cleared
"""
if not self.redis_client:
try:
ActivityLog.log_error(
error_message="Redis client not available - cannot clear delayed tasks",
source="ScraperManager._clear_delayed_tasks_from_redis"
)
except RuntimeError:
# Working outside application context - just print instead
print("❌ Redis client not available - cannot clear delayed tasks")
return 0
cleared_count = 0
try:
# Define scraper task patterns to identify our tasks
scraper_patterns = [
'process_single_paper',
'process_papers_batch',
'hourly_scraper_scheduler'
]
try:
ActivityLog.log_scraper_activity(
action="check_delayed_tasks",
status="info",
description="Checking Celery delayed task structures (unacked_index, unacked)"
)
except RuntimeError:
print("🔍 Checking Celery delayed task structures (unacked_index, unacked)")
# Check 'unacked_index' (sorted set with task IDs and timestamps)
unacked_index_cleared = 0
if self.redis_client.exists('unacked_index'):
try:
# Get all task IDs from the sorted set
task_ids = self.redis_client.zrange('unacked_index', 0, -1)
if task_ids:
try:
ActivityLog.log_scraper_activity(
action="scan_unacked_index",
status="info",
description=f"Found {len(task_ids)} tasks in 'unacked_index'"
)
except RuntimeError:
print(f"📋 Found {len(task_ids)} tasks in 'unacked_index'")
# Check each task ID against the 'unacked' hash to get task details
scraper_task_ids = []
for task_id in task_ids:
try:
# Get task data from 'unacked' hash
task_data = self.redis_client.hget('unacked', task_id)
if task_data:
# Check if this task contains any of our scraper patterns
if any(pattern in str(task_data) for pattern in scraper_patterns):
scraper_task_ids.append(task_id)
except Exception:
# Skip individual task errors
continue
# Remove scraper task IDs from both structures
for task_id in scraper_task_ids:
try:
# Remove from unacked_index (sorted set)
removed_from_index = self.redis_client.zrem('unacked_index', task_id)
# Remove from unacked (hash)
removed_from_hash = self.redis_client.hdel('unacked', task_id)
if removed_from_index or removed_from_hash:
unacked_index_cleared += 1
except Exception as e:
try:
ActivityLog.log_error(
error_message=f"Error removing delayed task {task_id}: {str(e)}",
source="ScraperManager._clear_delayed_tasks_from_redis"
)
except RuntimeError:
print(f"❌ Error removing delayed task {task_id}: {str(e)}")
continue
cleared_count += unacked_index_cleared
if unacked_index_cleared > 0:
try:
ActivityLog.log_scraper_activity(
action="clear_unacked_tasks",
status="success",
description=f"Cleared {unacked_index_cleared} scraper tasks from unacked structures"
)
except RuntimeError:
print(f"✅ Cleared {unacked_index_cleared} scraper tasks from unacked structures")
else:
try:
ActivityLog.log_scraper_activity(
action="check_unacked_index",
status="info",
description="No tasks found in 'unacked_index'"
)
except RuntimeError:
print(" No tasks found in 'unacked_index'")
except Exception as e:
try:
ActivityLog.log_error(
error_message=f"Error accessing 'unacked_index': {str(e)}",
source="ScraperManager._clear_delayed_tasks_from_redis"
)
except RuntimeError:
print(f"❌ Error accessing 'unacked_index': {str(e)}")
else:
try:
ActivityLog.log_scraper_activity(
action="check_unacked_index",
status="info",
description="'unacked_index' key does not exist - no delayed tasks"
)
except RuntimeError:
print(" 'unacked_index' key does not exist - no delayed tasks")
# Also check the 'celery' queue for immediate tasks (backup check)
celery_cleared = 0
try:
queue_length = self.redis_client.llen('celery')
if queue_length and queue_length > 0:
# Scan for any scraper tasks in the immediate queue
scraper_tasks = []
for i in range(queue_length):
try:
task_data = self.redis_client.lindex('celery', i)
if task_data and any(pattern in str(task_data) for pattern in scraper_patterns):
scraper_tasks.append(task_data)
except Exception:
continue
# Remove scraper tasks from celery queue
for task_data in scraper_tasks:
try:
removed_count = self.redis_client.lrem('celery', 0, task_data)
celery_cleared += removed_count
except Exception:
continue
cleared_count += celery_cleared
if celery_cleared > 0:
try:
ActivityLog.log_scraper_activity(
action="clear_celery_tasks",
status="success",
description=f"Cleared {celery_cleared} scraper tasks from 'celery' queue"
)
except RuntimeError:
print(f"✅ Cleared {celery_cleared} scraper tasks from 'celery' queue")
except Exception as e:
try:
ActivityLog.log_error(
error_message=f"Error checking 'celery' queue: {str(e)}",
source="ScraperManager._clear_delayed_tasks_from_redis"
)
except RuntimeError:
print(f"❌ Error checking 'celery' queue: {str(e)}")
# Summary
if cleared_count > 0:
try:
ActivityLog.log_scraper_activity(
action="clear_delayed_tasks_complete",
status="success",
description=f"Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})"
)
except RuntimeError:
print(f"✅ Total delayed scraper tasks cleared from Redis: {cleared_count} (unacked: {unacked_index_cleared}, celery: {celery_cleared})")
else:
try:
ActivityLog.log_scraper_activity(
action="clear_delayed_tasks_complete",
status="info",
description="No delayed scraper tasks found to clear in Redis"
)
except RuntimeError:
print(" No delayed scraper tasks found to clear in Redis")
return cleared_count
except Exception as e:
try:
ActivityLog.log_error(
error_message=f"Failed to clear delayed tasks from Redis: {str(e)}",
source="ScraperManager._clear_delayed_tasks_from_redis"
)
except RuntimeError:
print(f"❌ Failed to clear delayed tasks from Redis: {str(e)}")
return 0
def start_scraper(self) -> Dict[str, str]:
"""Start the scraper system."""
try:
# Get current scraper
self.current_scraper = get_scraper()
# Activate scraper state
ScraperState.set_active(True)
ScraperState.set_paused(False)
scraper_name = self.current_scraper.get_name()
ActivityLog.log_scraper_command(
action="start_scraper",
status="success",
description=f"Started scraper: {scraper_name}. Use /trigger-immediate endpoint to immediately schedule papers instead of waiting for the next hourly boundary."
)
return {"status": "success", "message": "Scraper started successfully. Papers will be scheduled at the next hourly boundary, or use /trigger-immediate to schedule immediately."}
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to start scraper: {str(e)}",
source="ScraperManager.start_scraper"
)
return {"status": "error", "message": str(e)}
def pause_scraper(self) -> Dict[str, str]:
"""Pause the scraper system."""
try:
ScraperState.set_paused(True)
ActivityLog.log_scraper_command(
action="pause_scraper",
status="success",
description="Scraper paused - processing will halt"
)
return {"status": "success", "message": "Scraper paused"}
except Exception as e:
return {"status": "error", "message": str(e)}
def resume_scraper(self) -> Dict[str, str]:
"""Resume the scraper system."""
try:
ScraperState.set_paused(False)
ActivityLog.log_scraper_command(
action="resume_scraper",
status="success",
description="Scraper resumed - processing will continue"
)
return {"status": "success", "message": "Scraper resumed"}
except Exception as e:
return {"status": "error", "message": str(e)}
def stop_scraper(self) -> Dict[str, str]:
"""Stop the scraper, revoke all running tasks, and revert pending papers."""
try:
# STEP 1: Immediately set scraper as inactive - this is critical for race condition prevention
ScraperState.set_active(False)
ScraperState.set_paused(False)
ActivityLog.log_scraper_command(
action="stop_scraper_start",
status="info",
description="Scraper stop initiated - marked as inactive. Beginning task revocation and delayed task clearing."
)
# STEP 2: Brief pause to allow running tasks to see the inactive state
import time
time.sleep(0.2)
# STEP 3: Revoke all running tasks
revoked_count = 0
delayed_cleared_count = 0
try:
# Get Celery inspector to check for running tasks
i = celery.control.inspect()
active = i.active() or {}
scheduled = i.scheduled() or {}
reserved = i.reserved() or {}
# Revoke active tasks
for worker, tasks in active.items():
for task in tasks:
if 'id' in task:
celery.control.revoke(task['id'], terminate=True)
revoked_count += 1
ActivityLog.log_scraper_activity(
action="revoke_task",
status="success",
description=f"Revoked active task: {task.get('name', 'unknown')} (ID: {task['id']})"
)
# Revoke scheduled tasks
for worker, tasks in scheduled.items():
for task in tasks:
if 'id' in task:
celery.control.revoke(task['id'], terminate=True)
revoked_count += 1
ActivityLog.log_scraper_activity(
action="revoke_task",
status="success",
description=f"Revoked scheduled task: {task.get('name', 'unknown')} (ID: {task['id']})"
)
# Revoke reserved tasks
for worker, tasks in reserved.items():
for task in tasks:
if 'id' in task:
celery.control.revoke(task['id'], terminate=True)
revoked_count += 1
ActivityLog.log_scraper_activity(
action="revoke_task",
status="success",
description=f"Revoked reserved task: {task.get('name', 'unknown')} (ID: {task['id']})"
)
# Purge all task queues
celery.control.purge()
ActivityLog.log_scraper_activity(
action="purge_queues",
status="success",
description="Purged all task queues"
)
# STEP 4: Clear delayed tasks from Redis sorted sets
delayed_cleared_count = self._clear_delayed_tasks_from_redis()
# Additional cleanup: revoke any remaining scraper-related tasks by name pattern
try:
# Use broadcast to revoke tasks that match scraper patterns
scraper_task_patterns = [
'process_single_paper',
'process_papers_batch',
'hourly_scraper_scheduler'
]
# Get a fresh inspection of tasks after purge
fresh_inspect = celery.control.inspect()
all_tasks = {}
all_tasks.update(fresh_inspect.active() or {})
all_tasks.update(fresh_inspect.scheduled() or {})
all_tasks.update(fresh_inspect.reserved() or {})
additional_revoked = 0
for worker, tasks in all_tasks.items():
for task in tasks:
task_name = task.get('name', '')
task_id = task.get('id', '')
if any(pattern in task_name for pattern in scraper_task_patterns) and task_id:
celery.control.revoke(task_id, terminate=True)
additional_revoked += 1
ActivityLog.log_scraper_activity(
action="revoke_scraper_task",
status="success",
description=f"Revoked lingering scraper task: {task_name} (ID: {task_id})"
)
if additional_revoked > 0:
ActivityLog.log_scraper_activity(
action="cleanup_scraper_tasks",
status="success",
description=f"Additional cleanup: revoked {additional_revoked} lingering scraper tasks"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Error during additional scraper task cleanup: {str(e)}",
source="ScraperManager.stop_scraper.cleanup"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Error revoking tasks: {str(e)}",
source="ScraperManager.stop_scraper"
)
# Continue with paper reversion even if task revocation fails
# STEP 5: Wait a bit longer for any remaining tasks to finish their checks and exit
time.sleep(1.0)
# STEP 6: Revert papers from processing status
scraper = get_scraper()
input_statuses = scraper.get_input_statuses()
# Find papers that are currently being processed
processing_status = scraper.get_output_statuses()["processing"]
pending_papers = PaperMetadata.query.filter_by(status=processing_status).all()
# Revert their status to the first input status
reverted_count = 0
if pending_papers and input_statuses:
revert_status = input_statuses[0] # Use first input status as default
for paper in pending_papers:
# Try to use previous_status if available, otherwise use first input status
if hasattr(paper, 'previous_status') and paper.previous_status:
paper.status = paper.previous_status
else:
paper.status = revert_status
paper.updated_at = datetime.utcnow()
reverted_count += 1
db.session.commit()
ActivityLog.log_scraper_activity(
action="revert_pending_papers",
status="success",
description=f"Reverted {reverted_count} papers from '{processing_status}' to previous status"
)
ActivityLog.log_scraper_command(
action="stop_scraper",
status="success",
description=f"Scraper stopped completely. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers."
)
return {
"status": "success",
"message": f"Scraper stopped. Revoked {revoked_count} tasks, cleared {delayed_cleared_count} delayed tasks, and reverted {reverted_count} papers to previous status."
}
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to stop scraper: {str(e)}",
source="ScraperManager.stop_scraper"
)
return {"status": "error", "message": str(e)}
def reset_scraper(self) -> Dict[str, str]:
"""Reset scraper state, revoke all running tasks, and clear all processing statuses."""
try:
# First, revoke all running tasks (similar to stop_scraper)
revoked_count = 0
ActivityLog.log_scraper_command(
action="reset_scraper_start",
status="info",
description="Beginning scraper reset process with task revocation"
)
try:
# Get Celery inspector to check for running tasks
i = celery.control.inspect()
active = i.active() or {}
scheduled = i.scheduled() or {}
reserved = i.reserved() or {}
# Revoke all tasks (active, scheduled, reserved)
for queue_name, queue_tasks in [("active", active), ("scheduled", scheduled), ("reserved", reserved)]:
for worker, tasks in queue_tasks.items():
for task in tasks:
if 'id' in task:
celery.control.revoke(task['id'], terminate=True)
revoked_count += 1
ActivityLog.log_scraper_activity(
action="revoke_task",
status="success",
description=f"Revoked {queue_name} task: {task.get('name', 'unknown')} (ID: {task['id']})"
)
# Purge all task queues
celery.control.purge()
ActivityLog.log_scraper_activity(
action="purge_queues",
status="success",
description="Purged all task queues during reset"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Error revoking tasks during reset: {str(e)}",
source="ScraperManager.reset_scraper"
)
# Continue with paper reversion even if task revocation fails
# Get current scraper configuration
scraper = get_scraper()
input_statuses = scraper.get_input_statuses()
processing_status = scraper.get_output_statuses()["processing"]
# Reset all papers in processing status
pending_papers = PaperMetadata.query.filter_by(status=processing_status).all()
reverted_count = 0
if pending_papers and input_statuses:
revert_status = input_statuses[0]
for paper in pending_papers:
# Try to use previous_status if available, otherwise use first input status
if hasattr(paper, 'previous_status') and paper.previous_status:
paper.status = paper.previous_status
else:
paper.status = revert_status
paper.updated_at = datetime.utcnow()
paper.error_msg = None # Clear any error messages
reverted_count += 1
db.session.commit()
# Reset scraper state
ScraperState.set_active(False)
ScraperState.set_paused(False)
ActivityLog.log_scraper_command(
action="reset_scraper",
status="success",
description=f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers."
)
return {
"status": "success",
"message": f"Scraper reset. Revoked {revoked_count} tasks and reverted {reverted_count} papers to original status."
}
except Exception as e:
return {"status": "error", "message": str(e)}
def get_current_hour_quota(self) -> int:
"""Calculate papers to process in current hour based on schedule."""
try:
return get_cached_hourly_quota(self._calculate_papers_for_current_hour)
except Exception as e:
ActivityLog.log_error(
error_message=f"Error calculating hourly quota: {str(e)}",
source="ScraperManager.get_current_hour_quota"
)
return 0
def _calculate_papers_for_current_hour(self) -> int:
"""Internal method to calculate hourly quota."""
try:
# Get current hour and volume config
current_hour = datetime.now().hour
volume_config = VolumeConfig.get_current_volume()
daily_volume = volume_config if volume_config else 100
# Get schedule config for current hour
schedule_config = ScheduleConfig.query.filter_by(hour=current_hour).first()
current_weight = schedule_config.weight if schedule_config else 1.0
# Get total weight across all hours
total_weight = db.session.query(func.sum(ScheduleConfig.weight)).scalar() or 24.0
# Calculate quota: (current_weight / total_weight) * daily_volume
quota = math.ceil((current_weight / total_weight) * daily_volume)
ActivityLog.log_scraper_activity(
action="calculate_hourly_quota",
status="info",
description=f"Hour {current_hour}: quota={quota} (weight={current_weight}, total_weight={total_weight}, daily_volume={daily_volume})"
)
return max(1, quota) # Ensure at least 1 paper per hour
except Exception as e:
ActivityLog.log_error(
error_message=f"Error in quota calculation: {str(e)}",
source="ScraperManager._calculate_papers_for_current_hour"
)
return 1 # Fallback to 1 paper per hour
def select_papers_for_processing(self, limit: Optional[int] = None) -> List[PaperMetadata]:
"""Select papers for processing based on current scraper configuration."""
try:
scraper = get_scraper()
input_statuses = scraper.get_input_statuses()
if not input_statuses:
return []
# Use provided limit or calculate from hourly quota
papers_needed = limit if limit is not None else self.get_current_hour_quota()
# Query papers with input statuses, randomize selection
papers = (PaperMetadata.query
.filter(PaperMetadata.status.in_(input_statuses))
.order_by(func.random())
.limit(papers_needed)
.all())
ActivityLog.log_scraper_activity(
action="select_papers",
status="info",
description=f"Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})"
)
return papers
except Exception as e:
ActivityLog.log_error(
error_message=f"Error selecting papers: {str(e)}",
source="ScraperManager.select_papers_for_processing"
)
return []
def process_paper(self, paper: PaperMetadata) -> Dict:
"""Process a single paper using the current scraper."""
try:
# **RACE CONDITION FIX**: Double-check scraper state before proceeding
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active:
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="skipped",
description="Skipped processing - scraper deactivated during task execution"
)
return {"paper_id": paper.id, "status": "skipped", "message": "Scraper not active"}
if scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="skipped",
description="Skipped processing - scraper paused during task execution"
)
return {"paper_id": paper.id, "status": "skipped", "message": "Scraper paused"}
scraper = get_scraper()
output_statuses = scraper.get_output_statuses()
# Store the previous status before changing it
previous_status = paper.status
# Update paper status to processing
paper.previous_status = previous_status
paper.status = output_statuses["processing"]
paper.updated_at = datetime.utcnow()
db.session.commit()
# **ADDITIONAL RACE CONDITION CHECK**: Verify scraper is still active before expensive scraping operation
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active:
# Scraper was deactivated after we marked paper as processing - revert and exit
paper.status = previous_status
paper.updated_at = datetime.utcnow()
db.session.commit()
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="cancelled",
description="Cancelled processing - scraper deactivated after paper marked as processing"
)
return {"paper_id": paper.id, "status": "cancelled", "message": "Scraper deactivated during processing"}
# Perform scraping
result = scraper.scrape(paper.doi)
# Update paper status based on result
if result.status == "success":
paper.status = output_statuses["success"]
paper.error_msg = None
if result.data and "file_path" in result.data:
paper.file_path = result.data["file_path"]
else:
paper.status = output_statuses["failure"]
paper.error_msg = result.message
paper.updated_at = datetime.utcnow()
db.session.commit()
# Log result
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status=result.status,
description=f"Processed {paper.doi}: {result.message}"
)
return {
"paper_id": paper.id,
"status": result.status,
"message": result.message,
"duration": result.duration
}
except Exception as e:
# Revert paper status on error
try:
input_statuses = get_scraper().get_input_statuses()
if input_statuses:
paper.status = input_statuses[0]
paper.error_msg = f"Processing error: {str(e)}"
paper.updated_at = datetime.utcnow()
db.session.commit()
except:
pass # Don't fail if reversion fails
ActivityLog.log_error(
error_message=f"Error processing paper {paper.id}: {str(e)}",
source="ScraperManager.process_paper"
)
return {"paper_id": paper.id, "status": "error", "message": str(e)}
def get_status(self) -> Dict:
"""Get current scraper status."""
scraper_state = ScraperState.get_current_state()
scraper = get_scraper()
# Count papers by status
input_statuses = scraper.get_input_statuses()
output_statuses = scraper.get_output_statuses()
available_count = (PaperMetadata.query
.filter(PaperMetadata.status.in_(input_statuses))
.count())
processing_count = (PaperMetadata.query
.filter_by(status=output_statuses["processing"])
.count())
return {
"active": scraper_state.is_active,
"paused": scraper_state.is_paused,
"current_scraper": scraper.get_name(),
"input_statuses": input_statuses,
"output_statuses": output_statuses,
"available_papers": available_count,
"processing_papers": processing_count,
"current_hour_quota": self.get_current_hour_quota()
}