2025-04-16 22:03:17 +02:00

917 lines
31 KiB
Python

import random
import json
import time
import math
import os # Import os for path joining
from datetime import datetime, timedelta
from flask import Blueprint, jsonify, render_template, request, current_app, flash
# Import the new model
from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig, ScraperState, DownloadPathConfig
from ..db import db
from ..celery import celery
from ..defaults import MAX_VOLUME
from celery.schedules import crontab
from sqlalchemy import func
bp = Blueprint("scraper", __name__, url_prefix="/scraper")
# Track the periodic task ID for proper revocation
PERIODIC_TASK_ID = None
# Setup periodic task to run every minute for testing purposes
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
global PERIODIC_TASK_ID
# Run the dummy scraper every minute for testing purposes with a unique task name
# Using a constant name allows us to revoke this specific periodic task when stopping
result = sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(),
name='run_dummy_scraper_every_minute')
PERIODIC_TASK_ID = result.id
# Log that we've registered the periodic task with its ID
ActivityLog.log_scraper_command(
action="register_periodic_task",
status="success",
description=f"Registered periodic scraper task with ID: {PERIODIC_TASK_ID}"
)
# Function to revoke the periodic task properly
def revoke_periodic_task():
global PERIODIC_TASK_ID
if PERIODIC_TASK_ID:
try:
# Revoke by ID is more reliable than by name
celery.control.revoke(PERIODIC_TASK_ID, terminate=True)
# Attempt to revoke by name as a fallback (standard Celery method)
# Note: We're removing the non-standard revoke_from_control method
# and replacing it with a more reliable approach
i = celery.control.inspect()
scheduled = i.scheduled() or {}
# Look for our periodic task by name in the scheduled tasks
for worker, tasks in scheduled.items():
for task in tasks:
if task.get('name') == 'run_dummy_scraper_every_minute':
celery.control.revoke(task['id'], terminate=True)
# Log the action
ActivityLog.log_scraper_command(
action="revoke_periodic_task",
status="success",
description=f"Revoked periodic task with ID: {PERIODIC_TASK_ID}"
)
return True
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to revoke periodic task: {str(e)}",
source="revoke_periodic_task"
)
return False
return False
@celery.task
def run_periodic_dummy_scraper():
"""Periodic task to run the dummy scraper if it's active and not paused"""
scraper_state = ScraperState.get_current_state()
# Log every time this runs to track execution
ActivityLog.log_scraper_activity(
action="periodic_check",
status="info",
description=f"Periodic check running. Scraper active: {scraper_state.is_active}, paused: {scraper_state.is_paused}"
)
if scraper_state.is_active and not scraper_state.is_paused:
dummy_scheduled_scraper.delay()
return True
return False
@bp.route("/")
def index():
"""Render the scraper control panel."""
volume_config = VolumeConfig.query.first()
# Ensure we have volume config
if not volume_config:
volume_config = VolumeConfig(volume=100) # Default value
db.session.add(volume_config)
db.session.commit()
# Get scraper state
scraper_state = ScraperState.get_current_state()
return render_template(
"scraper.html.jinja",
volume_config=volume_config,
scraper_active=scraper_state.is_active,
scraper_paused=scraper_state.is_paused,
max_volume=MAX_VOLUME
)
@bp.route("/start", methods=["POST"])
def start_scraper():
"""Start the scraper."""
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active:
# Update scraper state
ScraperState.set_active(True)
ScraperState.set_paused(False)
# Log the action
ActivityLog.log_scraper_command(
action="start_scraper",
status="success",
description="Scheduled scraper started - will follow hourly configuration"
)
# Immediately trigger a task to test the scheduler and provide feedback
dummy_scheduled_scraper.delay()
return jsonify({
"success": True,
"message": "Scraper started - following hourly schedule configuration"
})
else:
return jsonify({
"success": False,
"message": "Scraper is already running"
})
@bp.route("/stop", methods=["POST"])
def stop_scraper():
"""Stop the scraper completely."""
scraper_state = ScraperState.get_current_state()
if scraper_state.is_active:
# Update scraper state first
ScraperState.set_active(False)
ScraperState.set_paused(False)
# Stop any running tasks
task_types_to_revoke = [
'scipaperloader.blueprints.scraper.dummy_process_paper',
'scipaperloader.blueprints.scraper.dummy_scheduled_scraper',
'scipaperloader.blueprints.scraper.run_periodic_dummy_scraper'
]
# Use our utility function to revoke tasks
revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True)
# Revoke the periodic task specifically
revoke_periodic_task()
# Clear all pending tasks from the queue more aggressively
try:
# This purges all tasks in all queues
celery.control.purge()
ActivityLog.log_scraper_command(
action="purge_queue",
status="success",
description="Purged all task queues"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to purge task queues: {str(e)}",
source="stop_scraper"
)
# Fallback to discard_all if purge fails
celery.control.discard_all()
# Restart the worker to ensure clean state
try:
celery.control.broadcast('pool_restart', arguments={'reload': True})
ActivityLog.log_scraper_command(
action="restart_worker",
status="success",
description="Worker pool restart requested"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to restart worker pool: {str(e)}",
source="stop_scraper"
)
ActivityLog.log_scraper_command(
action="stop_scraper",
status="success",
description=f"Scraper stopped manually. Revoked {revoked_count} pending tasks. Worker pool restart requested."
)
return jsonify({
"success": True,
"message": f"Scraper stopped. Revoked {revoked_count} pending tasks and requested worker restart."
})
else:
return jsonify({
"success": False,
"message": "Scraper is not running"
})
@bp.route("/pause", methods=["POST"])
def pause_scraper():
"""Pause the scraper."""
scraper_state = ScraperState.get_current_state()
if scraper_state.is_active and not scraper_state.is_paused:
# Update scraper state
ScraperState.set_paused(True)
# Just revoke processing tasks, but leave the periodic tasks running
# so it can continue to check the state (which is now paused)
task_types_to_revoke = [
'scipaperloader.blueprints.scraper.dummy_process_paper',
'scipaperloader.blueprints.scraper.dummy_scheduled_scraper'
]
# Use our utility function to revoke tasks
revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True)
# Also clear the queue
celery.control.discard_all()
ActivityLog.log_scraper_command(
action="pause_scraper",
status="success",
description=f"Scraper paused manually. Revoked {revoked_count} pending tasks."
)
return jsonify({
"success": True,
"message": f"Scraper paused. Revoked {revoked_count} pending tasks."
})
elif scraper_state.is_active and scraper_state.is_paused:
# Update scraper state
ScraperState.set_paused(False)
ActivityLog.log_scraper_command(
action="resume_scraper",
status="success",
description="Scraper resumed manually"
)
return jsonify({
"success": True,
"message": "Scraper resumed"
})
else:
return jsonify({
"success": False,
"message": "Scraper is not running"
})
@bp.route("/status")
def scraper_status():
"""Get the current status of the scraper."""
scraper_state = ScraperState.get_current_state()
return jsonify({
"active": scraper_state.is_active,
"paused": scraper_state.is_paused,
"current_hour": datetime.now().hour,
})
@bp.route("/stats")
def scraper_stats():
"""Get scraper statistics for the dashboard."""
# Get the last 24 hours of activity
hours = 24
if request.args.get('hours'):
try:
hours = int(request.args.get('hours'))
except ValueError:
pass
current_time = datetime.utcnow()
# Use timedelta for proper date calculation instead of simple hour subtraction
cutoff_time = (current_time - timedelta(hours=hours)).replace(
minute=0, second=0, microsecond=0
)
# Get activity logs for scraper actions
logs = ActivityLog.query.filter(
ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value,
ActivityLog.timestamp >= cutoff_time
).all()
# Group by hour and status
stats = {}
for hour in range(hours):
# Calculate the hour as offset from current time
target_hour = (current_time.hour - hour) % 24
stats[target_hour] = {
"success": 0,
"error": 0,
"pending": 0,
"hour": target_hour,
}
for log in logs:
hour = log.timestamp.hour
if hour in stats:
if log.status == "success":
stats[hour]["success"] += 1
elif log.status == "error":
stats[hour]["error"] += 1
elif log.status == "pending":
stats[hour]["pending"] += 1
# Convert to list for easier consumption by JavaScript
result = [stats[hour] for hour in sorted(stats.keys())]
return jsonify(result)
@bp.route("/update_config", methods=["POST"])
def update_config():
"""Update scraper configuration."""
data = request.json
try:
if "volume" in data:
try:
new_volume = float(data["volume"])
# Validate volume value
if new_volume <= 0 or new_volume > MAX_VOLUME:
return jsonify({
"success": False,
"message": f"Volume must be between 1 and {MAX_VOLUME}"
})
volume_config = VolumeConfig.query.first()
if not volume_config:
volume_config = VolumeConfig(volume=new_volume)
db.session.add(volume_config)
else:
old_value = volume_config.volume
volume_config.volume = new_volume
ActivityLog.log_config_change(
config_key="scraper_volume",
old_value=old_value,
new_value=new_volume,
description="Updated scraper volume"
)
db.session.commit()
except (ValueError, TypeError):
return jsonify({
"success": False,
"message": "Invalid volume value"
})
return jsonify({"success": True, "message": "Configuration updated"})
except Exception as e:
db.session.rollback()
return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"})
@celery.task(bind=True)
def dummy_scrape_paper(self):
"""Simulate scraping a single paper."""
# Simulate success or failure
success = random.random() > 0.3 # 70% success rate
# Simulate processing time
import time
time.sleep(random.randint(2, 5)) # 2-5 seconds
if success:
# Create a dummy paper
new_paper = PaperMetadata(
title=f"Dummy Paper {random.randint(1000, 9999)}",
doi=f"10.1234/dummy.{random.randint(1000, 9999)}",
journal=random.choice([
"Nature", "Science", "PLOS ONE", "Journal of Dummy Research",
"Proceedings of the Dummy Society", "Cell", "Dummy Review Letters"
]),
type="article",
language="en",
published_online=datetime.now().date(),
status="Done",
file_path="/path/to/dummy/paper.pdf"
)
db.session.add(new_paper)
db.session.commit()
# Log the successful scrape
ActivityLog.log_scraper_activity(
action="scrape_paper",
paper_id=new_paper.id,
status="success",
description=f"Successfully scraped paper {new_paper.doi}"
)
return {
"success": True,
"paper_id": new_paper.id,
"title": new_paper.title,
"doi": new_paper.doi
}
else:
# Log the failed scrape
error_message = random.choice([
"Connection timeout",
"404 Not Found",
"Access denied",
"Invalid DOI format",
"PDF download failed",
"Rate limited by publisher"
])
ActivityLog.log_scraper_activity(
action="scrape_paper",
status="error",
description=f"Failed to scrape paper: {error_message}"
)
return {
"success": False,
"error": error_message
}
@celery.task
def calculate_papers_for_current_hour():
"""
Calculate how many papers should be downloaded in the current hour
based on schedule configuration.
Returns:
int: Number of papers to download this hour
"""
current_hour = datetime.now().hour
# Get volume configuration
volume_config = VolumeConfig.query.first()
if not volume_config:
volume_config = VolumeConfig(volume=100) # Default to 100 papers per day
db.session.add(volume_config)
db.session.commit()
# Get all schedule configurations to calculate total weight
schedule_configs = ScheduleConfig.query.all()
if not schedule_configs:
# If no schedule configs, create default with equal weights
for hour in range(24):
config = ScheduleConfig(hour=hour, weight=1.0)
db.session.add(config)
db.session.commit()
schedule_configs = ScheduleConfig.query.all()
# Calculate total weight across all hours
total_weight = sum(config.weight for config in schedule_configs)
# Find the weight for the current hour
current_hour_config = ScheduleConfig.query.get(current_hour)
if not current_hour_config:
# Create config for current hour if it doesn't exist
current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0)
db.session.add(current_hour_config)
db.session.commit()
# Calculate papers for current hour: (hour_weight / total_weight) * daily_volume
if total_weight > 0:
weight_ratio = current_hour_config.weight / total_weight
papers_this_hour = math.floor(weight_ratio * volume_config.volume)
else:
papers_this_hour = 0
return papers_this_hour
@celery.task
def dummy_scheduled_scraper():
"""
Selects new papers based on the hourly schedule and marks them as Pending.
Then schedules their processing randomly within the hour.
"""
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="dummy_scheduled_scraper_skip",
status="info",
description="Skipping run because scraper is inactive or paused."
)
return False # Stop if not active/paused
papers_to_select = calculate_papers_for_current_hour()
if papers_to_select <= 0:
ActivityLog.log_scraper_activity(
action="dummy_scheduled_scraper_info",
status="info",
description=f"Hourly quota is {papers_to_select}. No papers to select this hour."
)
return True # Nothing to do this hour based on schedule
# --- Core Logic Change: Select NEW papers ---
try:
# Find "New" papers, select randomly up to the calculated limit
new_papers = PaperMetadata.query.filter_by(status="New") \
.order_by(func.random()) \
.limit(papers_to_select) \
.all()
if not new_papers:
ActivityLog.log_scraper_activity(
action="dummy_scheduled_scraper_info",
status="info",
description="No 'New' papers found in the database to select."
)
# Optional: Depending on requirements, you might want to check later
# or handle this case differently. For now, we just log and exit.
return True
selected_paper_ids = [p.id for p in new_papers]
# Update status to "Pending" in bulk for efficiency
PaperMetadata.query.filter(PaperMetadata.id.in_(selected_paper_ids)) \
.update({"status": "Pending", "updated_at": datetime.utcnow()}, synchronize_session=False)
db.session.commit()
ActivityLog.log_scraper_activity(
action="select_new_papers",
status="success",
description=f"Selected {len(selected_paper_ids)} 'New' papers and marked as 'Pending'. IDs: {selected_paper_ids}"
)
# --- Now schedule processing for the newly selected "Pending" papers ---
# (Assuming dummy_process_paper takes a paper_id)
# Add random delays for processing within the hour (e.g., up to 3600 seconds)
for paper_id in selected_paper_ids:
delay = random.uniform(1, 3500) # Random delay up to ~58 minutes
dummy_process_paper.apply_async(args=[paper_id], countdown=delay)
ActivityLog.log_scraper_activity(
action="schedule_processing",
status="success",
description=f"Scheduled processing for {len(selected_paper_ids)} papers with random delays."
)
return True
except Exception as e:
db.session.rollback() # Rollback DB changes on error
ActivityLog.log_error(
error_message=f"Error in dummy_scheduled_scraper: {str(e)}",
source="dummy_scheduled_scraper"
)
return False
@celery.task(bind=True)
def dummy_process_paper(self, paper_id):
"""
Process a single paper for the dummy scraper.
Args:
paper_id (int): ID of the paper to process
"""
# First check if the scraper is still active and not paused
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
# Log that task was skipped due to scraper being stopped or paused
ActivityLog.log_scraper_activity(
action="process_paper",
status="info",
description=f"Skipped processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}"
)
return False
# Get the paper from database
paper = PaperMetadata.query.get(paper_id)
if not paper:
# Log error if paper not found
ActivityLog.log_scraper_activity(
action="process_paper",
status="error",
description=f"Paper with ID {paper_id} not found"
)
return False
# Simulate random success/failure (70% success rate)
success = random.random() < 0.7
# Simulate processing time (1-5 seconds)
process_time = random.uniform(1, 5)
time.sleep(process_time)
# Check again if scraper is still active and not paused after the time delay
# This ensures we don't process papers if the scraper was stopped during the delay
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="process_paper",
status="info",
description=f"Cancelled processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}"
)
return False
if success:
# --- Get configured download path ---
download_base_path = DownloadPathConfig.get_path()
# Ensure the base path exists (optional, but good practice)
# os.makedirs(download_base_path, exist_ok=True)
# --- Construct the file path ---
# Sanitize DOI for use in filename
safe_doi = paper.doi.replace('/', '_').replace(':', '_')
filename = f"{safe_doi}.pdf"
full_path = os.path.join(download_base_path, filename)
# Update paper status to "Done" and set the file path
paper.status = "Done"
paper.file_path = full_path # Use the constructed path
# Log success
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="success",
description=f"Successfully processed paper: {paper.doi}. File at: {full_path}" # Log path
)
else:
# Update paper status to "Failed"
paper.status = "Failed"
# Generate random error message
error_message = random.choice([
"Publisher website unavailable",
"No PDF download link found",
"Access restricted",
"Download timeout",
"Invalid DOI",
"Rate limited by publisher"
])
paper.error_msg = error_message
# Log failure
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="error",
description=f"Failed to process paper: {error_message}"
)
# Update the timestamp
paper.updated_at = datetime.utcnow()
# Commit changes to database
db.session.commit()
return success
@celery.task(bind=True)
def process_paper_batch(self, paper_ids):
"""
Process a batch of papers to improve throughput and reduce overhead.
Args:
paper_ids (list): List of paper IDs to process in this batch
"""
# Check if scraper is still active
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="process_paper_batch",
status="info",
description=f"Skipped batch of {len(paper_ids)} papers because scraper is {'paused' if scraper_state.is_paused else 'stopped'}"
)
return False
# Log the batch starting
ActivityLog.log_scraper_activity(
action="process_paper_batch",
status="info",
description=f"Started processing batch of {len(paper_ids)} papers"
)
# Process each paper in the batch
results = {
"success": 0,
"failure": 0,
"skipped": 0
}
# Begin a transaction for the entire batch
try:
for paper_id in paper_ids:
# Double-check scraper state before each paper
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
results["skipped"] += 1
continue
# Get the paper from database
paper = PaperMetadata.query.get(paper_id)
if not paper:
results["skipped"] += 1
continue
# Simulate random success/failure (70% success rate)
success = random.random() < 0.7
# Simulate some processing time (0.5-2 seconds per paper in batch)
time.sleep(random.uniform(0.5, 2))
if success:
# Update paper status to "Done"
paper.status = "Done"
paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf"
results["success"] += 1
# Log individual paper success with minimal overhead
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="success",
description=f"Processed in batch: {paper.title}"
)
else:
# Update paper status to "Failed"
paper.status = "Failed"
# Generate random error message
error_message = random.choice([
"Publisher website unavailable",
"No PDF download link found",
"Access restricted",
"Download timeout",
"Invalid DOI",
"Rate limited by publisher"
])
paper.error_msg = error_message
results["failure"] += 1
# Log individual paper failure with minimal overhead
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="error",
description=f"Failed in batch: {error_message}"
)
# Update the timestamp
paper.updated_at = datetime.utcnow()
# Commit the entire batch at once
db.session.commit()
except Exception as e:
# If any error occurs, roll back the entire batch
db.session.rollback()
ActivityLog.log_error(
error_message=f"Error processing paper batch: {str(e)}",
source="process_paper_batch"
)
return False
# Log batch completion
ActivityLog.log_scraper_activity(
action="process_paper_batch",
status="success",
description=f"Completed batch processing: {results['success']} succeeded, {results['failure']} failed, {results['skipped']} skipped"
)
return results
@bp.route("/reset", methods=["POST"])
def reset_scraper():
"""
Reset the scraper completely:
1. Stop all running tasks
2. Optionally purge all papers except those with 'Pending' status
3. Reset scraper state to active and unpaused
4. Trigger a new scraping cycle
"""
# First stop everything
stop_scraper()
# Check if we should clear papers
clear_papers = request.json.get('clear_papers', True) if request.is_json else True
if clear_papers:
try:
# Get all papers that aren't in Pending status
papers = PaperMetadata.query.filter(PaperMetadata.status != "Pending").all()
count = len(papers)
# Delete them all
for paper in papers:
db.session.delete(paper)
db.session.commit()
ActivityLog.log_scraper_command(
action="reset_scraper",
status="success",
description=f"Reset scraper and cleared {count} non-pending papers"
)
except Exception as e:
db.session.rollback()
ActivityLog.log_error(
error_message=f"Failed to reset papers: {str(e)}",
source="reset_scraper"
)
return jsonify({
"success": False,
"message": f"Error clearing papers: {str(e)}"
})
# Set state to active and unpaused
ScraperState.set_active(True)
ScraperState.set_paused(False)
# Re-register the periodic task if needed
setup_periodic_tasks(celery)
# Kick off a fresh scraping cycle
dummy_scheduled_scraper.delay()
return jsonify({
"success": True,
"message": "Scraper has been completely reset and restarted"
})
# Common utility function to revoke tasks by type
def revoke_tasks_by_type(task_types, terminate=True):
"""
Revokes all tasks of specified types across scheduled, reserved and active queues.
Args:
task_types (list): List of task name strings to revoke
terminate (bool): Whether to terminate running tasks
Returns:
int: Count of revoked tasks
"""
# Get all tasks of all types
i = celery.control.inspect()
scheduled = i.scheduled() or {}
reserved = i.reserved() or {}
active = i.active() or {}
revoked_count = 0
# Revoke all scheduled tasks
for worker, tasks in scheduled.items():
for task in tasks:
if task['name'] in task_types:
celery.control.revoke(task['id'], terminate=terminate)
revoked_count += 1
# Revoke all reserved tasks
for worker, tasks in reserved.items():
for task in tasks:
if task['name'] in task_types:
celery.control.revoke(task['id'], terminate=terminate)
revoked_count += 1
# Revoke all active tasks
for worker, tasks in active.items():
for task in tasks:
if task['name'] in task_types:
celery.control.revoke(task['id'], terminate=terminate)
revoked_count += 1
return revoked_count
import math
from datetime import datetime
from ..models import VolumeConfig, ScheduleConfig, PaperMetadata # Ensure imports
from ..db import db # Ensure import
def calculate_papers_for_current_hour():
"""Calculates the target number of papers for the current hour using w*N/24."""
current_hour = datetime.now().hour
volume_config = VolumeConfig.query.first()
# Use default if not set
volume = volume_config.volume if volume_config else 100
current_hour_config = ScheduleConfig.query.filter_by(hour=current_hour).first()
# Use default weight 1.0 if not set for the hour
weight = current_hour_config.weight if current_hour_config else 1.0
# Calculate papers using the formula: w * N / 24
papers_this_hour = math.floor(weight * volume / 24)
# Log the calculation for debugging
ActivityLog.log_scraper_activity(
action="calculate_hourly_quota",
status="info",
description=f"Hour {current_hour}: weight={weight:.2f}, total_volume={volume}, target_papers={papers_this_hour}"
)
return papers_this_hour