fixes dummy scraper

This commit is contained in:
Michael Beck 2025-04-16 21:39:59 +02:00
parent 0adaed0bfa
commit a0fa541de3
2 changed files with 581 additions and 173 deletions

View File

@ -9,23 +9,82 @@ from ..db import db
from ..celery import celery
from ..defaults import MAX_VOLUME
from celery.schedules import crontab
from sqlalchemy import func
bp = Blueprint("scraper", __name__, url_prefix="/scraper")
# Track the periodic task ID for proper revocation
PERIODIC_TASK_ID = None
# Setup periodic task to run every minute for testing purposes
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Run the dummy scraper every minute for testing purposes
sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), name='run dummy scraper every minute')
global PERIODIC_TASK_ID
# Run the dummy scraper every minute for testing purposes with a unique task name
# Using a constant name allows us to revoke this specific periodic task when stopping
result = sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(),
name='run_dummy_scraper_every_minute')
PERIODIC_TASK_ID = result.id
# Log that we've registered the periodic task with its ID
ActivityLog.log_scraper_command(
action="register_periodic_task",
status="success",
description=f"Registered periodic scraper task with ID: {PERIODIC_TASK_ID}"
)
# Function to revoke the periodic task properly
def revoke_periodic_task():
global PERIODIC_TASK_ID
if PERIODIC_TASK_ID:
try:
# Revoke by ID is more reliable than by name
celery.control.revoke(PERIODIC_TASK_ID, terminate=True)
# Attempt to revoke by name as a fallback (standard Celery method)
# Note: We're removing the non-standard revoke_from_control method
# and replacing it with a more reliable approach
i = celery.control.inspect()
scheduled = i.scheduled() or {}
# Look for our periodic task by name in the scheduled tasks
for worker, tasks in scheduled.items():
for task in tasks:
if task.get('name') == 'run_dummy_scraper_every_minute':
celery.control.revoke(task['id'], terminate=True)
# Log the action
ActivityLog.log_scraper_command(
action="revoke_periodic_task",
status="success",
description=f"Revoked periodic task with ID: {PERIODIC_TASK_ID}"
)
return True
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to revoke periodic task: {str(e)}",
source="revoke_periodic_task"
)
return False
return False
@celery.task
def run_periodic_dummy_scraper():
"""Periodic task to run the dummy scraper if it's active and not paused"""
if ScraperState.is_scraper_active():
scraper_state = ScraperState.get_current_state()
# Log every time this runs to track execution
ActivityLog.log_scraper_activity(
action="periodic_check",
status="info",
description=f"Periodic check running. Scraper active: {scraper_state.is_active}, paused: {scraper_state.is_paused}"
)
if scraper_state.is_active and not scraper_state.is_paused:
dummy_scheduled_scraper.delay()
return True
return False
@bp.route("/")
def index():
"""Render the scraper control panel."""
@ -48,6 +107,7 @@ def index():
max_volume=MAX_VOLUME
)
@bp.route("/start", methods=["POST"])
def start_scraper():
"""Start the scraper."""
@ -78,25 +138,70 @@ def start_scraper():
"message": "Scraper is already running"
})
@bp.route("/stop", methods=["POST"])
def stop_scraper():
"""Stop the scraper."""
"""Stop the scraper completely."""
scraper_state = ScraperState.get_current_state()
if scraper_state.is_active:
# Update scraper state
# Update scraper state first
ScraperState.set_active(False)
ScraperState.set_paused(False)
# Stop any running tasks
task_types_to_revoke = [
'scipaperloader.blueprints.scraper.dummy_process_paper',
'scipaperloader.blueprints.scraper.dummy_scheduled_scraper',
'scipaperloader.blueprints.scraper.run_periodic_dummy_scraper'
]
# Use our utility function to revoke tasks
revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True)
# Revoke the periodic task specifically
revoke_periodic_task()
# Clear all pending tasks from the queue more aggressively
try:
# This purges all tasks in all queues
celery.control.purge()
ActivityLog.log_scraper_command(
action="purge_queue",
status="success",
description="Purged all task queues"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to purge task queues: {str(e)}",
source="stop_scraper"
)
# Fallback to discard_all if purge fails
celery.control.discard_all()
# Restart the worker to ensure clean state
try:
celery.control.broadcast('pool_restart', arguments={'reload': True})
ActivityLog.log_scraper_command(
action="restart_worker",
status="success",
description="Worker pool restart requested"
)
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to restart worker pool: {str(e)}",
source="stop_scraper"
)
ActivityLog.log_scraper_command(
action="stop_scraper",
status="success",
description="Scraper stopped manually"
description=f"Scraper stopped manually. Revoked {revoked_count} pending tasks. Worker pool restart requested."
)
return jsonify({
"success": True,
"message": "Scraper stopped"
"message": f"Scraper stopped. Revoked {revoked_count} pending tasks and requested worker restart."
})
else:
return jsonify({
@ -104,6 +209,7 @@ def stop_scraper():
"message": "Scraper is not running"
})
@bp.route("/pause", methods=["POST"])
def pause_scraper():
"""Pause the scraper."""
@ -113,15 +219,28 @@ def pause_scraper():
# Update scraper state
ScraperState.set_paused(True)
# Just revoke processing tasks, but leave the periodic tasks running
# so it can continue to check the state (which is now paused)
task_types_to_revoke = [
'scipaperloader.blueprints.scraper.dummy_process_paper',
'scipaperloader.blueprints.scraper.dummy_scheduled_scraper'
]
# Use our utility function to revoke tasks
revoked_count = revoke_tasks_by_type(task_types_to_revoke, terminate=True)
# Also clear the queue
celery.control.discard_all()
ActivityLog.log_scraper_command(
action="pause_scraper",
status="success",
description="Scraper paused manually"
description=f"Scraper paused manually. Revoked {revoked_count} pending tasks."
)
return jsonify({
"success": True,
"message": "Scraper paused"
"message": f"Scraper paused. Revoked {revoked_count} pending tasks."
})
elif scraper_state.is_active and scraper_state.is_paused:
# Update scraper state
@ -143,6 +262,7 @@ def pause_scraper():
"message": "Scraper is not running"
})
@bp.route("/status")
def scraper_status():
"""Get the current status of the scraper."""
@ -154,6 +274,7 @@ def scraper_status():
"current_hour": datetime.now().hour,
})
@bp.route("/stats")
def scraper_stats():
"""Get scraper statistics for the dashboard."""
@ -204,6 +325,7 @@ def scraper_stats():
return jsonify(result)
@bp.route("/update_config", methods=["POST"])
def update_config():
"""Update scraper configuration."""
@ -248,6 +370,7 @@ def update_config():
db.session.rollback()
return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"})
@celery.task(bind=True)
def dummy_scrape_paper(self):
"""Simulate scraping a single paper."""
@ -313,6 +436,7 @@ def dummy_scrape_paper(self):
"error": error_message
}
@celery.task
def calculate_papers_for_current_hour():
"""
@ -365,110 +489,82 @@ def calculate_papers_for_current_hour():
@celery.task
def dummy_scheduled_scraper():
"""
The main scheduler task that runs every hour to process papers
according to the configured schedule.
Selects new papers based on the hourly schedule and marks them as Pending.
Then schedules their processing randomly within the hour.
"""
# Check if scraper is active using ScraperState
if not ScraperState.is_scraper_active():
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="scheduled_scraping",
action="dummy_scheduled_scraper_skip",
status="info",
description=f"Scheduled scraping skipped: inactive or paused"
description="Skipping run because scraper is inactive or paused."
)
return False
return False # Stop if not active/paused
# Calculate how many papers to download this hour
papers_to_download = calculate_papers_for_current_hour()
papers_to_select = calculate_papers_for_current_hour()
if papers_to_download <= 0:
if papers_to_select <= 0:
ActivityLog.log_scraper_activity(
action="scheduled_scraping",
action="dummy_scheduled_scraper_info",
status="info",
description=f"No papers scheduled for current hour"
description=f"Hourly quota is {papers_to_select}. No papers to select this hour."
)
return True # Nothing to do this hour based on schedule
# --- Core Logic Change: Select NEW papers ---
try:
# Find "New" papers, select randomly up to the calculated limit
new_papers = PaperMetadata.query.filter_by(status="New") \
.order_by(func.random()) \
.limit(papers_to_select) \
.all()
if not new_papers:
ActivityLog.log_scraper_activity(
action="dummy_scheduled_scraper_info",
status="info",
description="No 'New' papers found in the database to select."
)
# Optional: Depending on requirements, you might want to check later
# or handle this case differently. For now, we just log and exit.
return True
# Get all pending papers
pending_papers = PaperMetadata.query.filter_by(status="Pending").all()
selected_paper_ids = [p.id for p in new_papers]
# If no pending papers available, create some dummy pending papers
if not pending_papers:
ActivityLog.log_scraper_activity(
action="scheduled_scraping",
status="info",
description=f"No pending papers found - creating {papers_to_download} dummy pending papers"
)
# Create dummy pending papers
for i in range(papers_to_download):
# Generate a unique DOI by checking if it exists in the database
while True:
random_id = random.randint(1000, 9999)
doi = f"10.1234/dummy-pending.{random_id}"
# Check if the DOI already exists
existing = PaperMetadata.query.filter_by(doi=doi).first()
if not existing:
break
new_paper = PaperMetadata(
title=f"Dummy Pending Paper {random_id}",
doi=doi,
journal=random.choice([
"Nature", "Science", "PLOS ONE", "Journal of Dummy Research",
"Proceedings of the Dummy Society", "Cell", "Dummy Review Letters"
]),
type="article",
language="en",
published_online=datetime.now().date(),
status="Pending"
)
db.session.add(new_paper)
# Commit all at once after creating all papers
try:
# Update status to "Pending" in bulk for efficiency
PaperMetadata.query.filter(PaperMetadata.id.in_(selected_paper_ids)) \
.update({"status": "Pending", "updated_at": datetime.utcnow()}, synchronize_session=False)
db.session.commit()
ActivityLog.log_scraper_activity(
action="select_new_papers",
status="success",
description=f"Selected {len(selected_paper_ids)} 'New' papers and marked as 'Pending'. IDs: {selected_paper_ids}"
)
# --- Now schedule processing for the newly selected "Pending" papers ---
# (Assuming dummy_process_paper takes a paper_id)
# Add random delays for processing within the hour (e.g., up to 3600 seconds)
for paper_id in selected_paper_ids:
delay = random.uniform(1, 3500) # Random delay up to ~58 minutes
dummy_process_paper.apply_async(args=[paper_id], countdown=delay)
ActivityLog.log_scraper_activity(
action="schedule_processing",
status="success",
description=f"Scheduled processing for {len(selected_paper_ids)} papers with random delays."
)
return True
except Exception as e:
# Log the error and rollback
db.session.rollback() # Rollback DB changes on error
ActivityLog.log_error(
error_message="Failed to create dummy pending papers",
exception=e,
error_message=f"Error in dummy_scheduled_scraper: {str(e)}",
source="dummy_scheduled_scraper"
)
db.session.rollback()
return False
# Get the newly created papers
pending_papers = PaperMetadata.query.filter_by(status="Pending").all()
# Select papers_to_download random papers from pending_papers
selected_papers = random.sample(
pending_papers,
min(papers_to_download, len(pending_papers))
)
ActivityLog.log_scraper_activity(
action="scheduled_scraping",
status="info",
description=f"Starting scheduled scraping of {len(selected_papers)} papers for hour {datetime.now().hour}"
)
# For each paper, schedule it to run at a random time within the hour
current_time = time.time()
one_hour_in_seconds = 3600
for paper in selected_papers:
# Random delay within this hour (0 to 60 minutes)
random_delay = random.randint(0, one_hour_in_seconds)
# Schedule the dummy_process_paper task with the random delay
dummy_process_paper.apply_async(
args=[paper.id],
countdown=random_delay
)
return True
@celery.task(bind=True)
def dummy_process_paper(self, paper_id):
@ -478,6 +574,17 @@ def dummy_process_paper(self, paper_id):
Args:
paper_id (int): ID of the paper to process
"""
# First check if the scraper is still active and not paused
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
# Log that task was skipped due to scraper being stopped or paused
ActivityLog.log_scraper_activity(
action="process_paper",
status="info",
description=f"Skipped processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}"
)
return False
# Get the paper from database
paper = PaperMetadata.query.get(paper_id)
if not paper:
@ -496,6 +603,17 @@ def dummy_process_paper(self, paper_id):
process_time = random.uniform(1, 5)
time.sleep(process_time)
# Check again if scraper is still active and not paused after the time delay
# This ensures we don't process papers if the scraper was stopped during the delay
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="process_paper",
status="info",
description=f"Cancelled processing paper ID {paper_id} because scraper is {'paused' if scraper_state.is_paused else 'stopped'}"
)
return False
if success:
# Update paper status to "Done"
paper.status = "Done"
@ -506,7 +624,7 @@ def dummy_process_paper(self, paper_id):
action="process_paper",
paper_id=paper.id,
status="success",
description=f"Successfully processed paper: {paper.title}"
description=f"Successfully processed paper: {paper.doi}"
)
else:
# Update paper status to "Failed"
@ -538,3 +656,248 @@ def dummy_process_paper(self, paper_id):
db.session.commit()
return success
@celery.task(bind=True)
def process_paper_batch(self, paper_ids):
"""
Process a batch of papers to improve throughput and reduce overhead.
Args:
paper_ids (list): List of paper IDs to process in this batch
"""
# Check if scraper is still active
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
ActivityLog.log_scraper_activity(
action="process_paper_batch",
status="info",
description=f"Skipped batch of {len(paper_ids)} papers because scraper is {'paused' if scraper_state.is_paused else 'stopped'}"
)
return False
# Log the batch starting
ActivityLog.log_scraper_activity(
action="process_paper_batch",
status="info",
description=f"Started processing batch of {len(paper_ids)} papers"
)
# Process each paper in the batch
results = {
"success": 0,
"failure": 0,
"skipped": 0
}
# Begin a transaction for the entire batch
try:
for paper_id in paper_ids:
# Double-check scraper state before each paper
scraper_state = ScraperState.get_current_state()
if not scraper_state.is_active or scraper_state.is_paused:
results["skipped"] += 1
continue
# Get the paper from database
paper = PaperMetadata.query.get(paper_id)
if not paper:
results["skipped"] += 1
continue
# Simulate random success/failure (70% success rate)
success = random.random() < 0.7
# Simulate some processing time (0.5-2 seconds per paper in batch)
time.sleep(random.uniform(0.5, 2))
if success:
# Update paper status to "Done"
paper.status = "Done"
paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf"
results["success"] += 1
# Log individual paper success with minimal overhead
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="success",
description=f"Processed in batch: {paper.title}"
)
else:
# Update paper status to "Failed"
paper.status = "Failed"
# Generate random error message
error_message = random.choice([
"Publisher website unavailable",
"No PDF download link found",
"Access restricted",
"Download timeout",
"Invalid DOI",
"Rate limited by publisher"
])
paper.error_msg = error_message
results["failure"] += 1
# Log individual paper failure with minimal overhead
ActivityLog.log_scraper_activity(
action="process_paper",
paper_id=paper.id,
status="error",
description=f"Failed in batch: {error_message}"
)
# Update the timestamp
paper.updated_at = datetime.utcnow()
# Commit the entire batch at once
db.session.commit()
except Exception as e:
# If any error occurs, roll back the entire batch
db.session.rollback()
ActivityLog.log_error(
error_message=f"Error processing paper batch: {str(e)}",
source="process_paper_batch"
)
return False
# Log batch completion
ActivityLog.log_scraper_activity(
action="process_paper_batch",
status="success",
description=f"Completed batch processing: {results['success']} succeeded, {results['failure']} failed, {results['skipped']} skipped"
)
return results
@bp.route("/reset", methods=["POST"])
def reset_scraper():
"""
Reset the scraper completely:
1. Stop all running tasks
2. Optionally purge all papers except those with 'Pending' status
3. Reset scraper state to active and unpaused
4. Trigger a new scraping cycle
"""
# First stop everything
stop_scraper()
# Check if we should clear papers
clear_papers = request.json.get('clear_papers', True) if request.is_json else True
if clear_papers:
try:
# Get all papers that aren't in Pending status
papers = PaperMetadata.query.filter(PaperMetadata.status != "Pending").all()
count = len(papers)
# Delete them all
for paper in papers:
db.session.delete(paper)
db.session.commit()
ActivityLog.log_scraper_command(
action="reset_scraper",
status="success",
description=f"Reset scraper and cleared {count} non-pending papers"
)
except Exception as e:
db.session.rollback()
ActivityLog.log_error(
error_message=f"Failed to reset papers: {str(e)}",
source="reset_scraper"
)
return jsonify({
"success": False,
"message": f"Error clearing papers: {str(e)}"
})
# Set state to active and unpaused
ScraperState.set_active(True)
ScraperState.set_paused(False)
# Re-register the periodic task if needed
setup_periodic_tasks(celery)
# Kick off a fresh scraping cycle
dummy_scheduled_scraper.delay()
return jsonify({
"success": True,
"message": "Scraper has been completely reset and restarted"
})
# Common utility function to revoke tasks by type
def revoke_tasks_by_type(task_types, terminate=True):
"""
Revokes all tasks of specified types across scheduled, reserved and active queues.
Args:
task_types (list): List of task name strings to revoke
terminate (bool): Whether to terminate running tasks
Returns:
int: Count of revoked tasks
"""
# Get all tasks of all types
i = celery.control.inspect()
scheduled = i.scheduled() or {}
reserved = i.reserved() or {}
active = i.active() or {}
revoked_count = 0
# Revoke all scheduled tasks
for worker, tasks in scheduled.items():
for task in tasks:
if task['name'] in task_types:
celery.control.revoke(task['id'], terminate=terminate)
revoked_count += 1
# Revoke all reserved tasks
for worker, tasks in reserved.items():
for task in tasks:
if task['name'] in task_types:
celery.control.revoke(task['id'], terminate=terminate)
revoked_count += 1
# Revoke all active tasks
for worker, tasks in active.items():
for task in tasks:
if task['name'] in task_types:
celery.control.revoke(task['id'], terminate=terminate)
revoked_count += 1
return revoked_count
import math
from datetime import datetime
from ..models import VolumeConfig, ScheduleConfig, PaperMetadata # Ensure imports
from ..db import db # Ensure import
def calculate_papers_for_current_hour():
"""Calculates the target number of papers for the current hour using w*N/24."""
current_hour = datetime.now().hour
volume_config = VolumeConfig.query.first()
# Use default if not set
volume = volume_config.volume if volume_config else 100
current_hour_config = ScheduleConfig.query.filter_by(hour=current_hour).first()
# Use default weight 1.0 if not set for the hour
weight = current_hour_config.weight if current_hour_config else 1.0
# Calculate papers using the formula: w * N / 24
papers_this_hour = math.floor(weight * volume / 24)
# Log the calculation for debugging
ActivityLog.log_scraper_activity(
action="calculate_hourly_quota",
status="info",
description=f"Hour {current_hour}: weight={weight:.2f}, total_volume={volume}, target_papers={papers_this_hour}"
)
return papers_this_hour

View File

@ -62,6 +62,7 @@
<button id="startButton" class="btn btn-success">Start</button>
<button id="pauseButton" class="btn btn-warning" disabled>Pause</button>
<button id="stopButton" class="btn btn-danger" disabled>Stop</button>
<button id="resetButton" class="btn btn-secondary" disabled>Reset</button>
</div>
</div>
</div>
@ -160,6 +161,7 @@
const startButton = document.getElementById('startButton');
const pauseButton = document.getElementById('pauseButton');
const stopButton = document.getElementById('stopButton');
const resetButton = document.getElementById('resetButton');
const notificationsToggle = document.getElementById('notificationsToggle');
const activityLog = document.getElementById('activityLog');
@ -173,6 +175,7 @@
startButton.addEventListener('click', startScraper);
pauseButton.addEventListener('click', togglePauseScraper);
stopButton.addEventListener('click', stopScraper);
resetButton.addEventListener('click', resetScraper);
notificationsToggle.addEventListener('click', toggleNotifications);
document.getElementById('volumeForm').addEventListener('submit', function (e) {
@ -213,12 +216,14 @@
startButton.disabled = true;
pauseButton.disabled = false;
stopButton.disabled = false;
resetButton.disabled = false; // Enable reset when active
} else {
statusIndicator.className = 'status-indicator status-inactive';
statusText.textContent = 'Inactive';
startButton.disabled = false;
pauseButton.disabled = true;
stopButton.disabled = true;
resetButton.disabled = false; // Enable reset when inactive too
}
});
}
@ -276,6 +281,46 @@
});
}
function resetScraper() {
if (confirm("Are you sure you want to reset the scraper? This will stop all current tasks, optionally clear non-pending papers, and restart the scraper.")) {
// Disable button to prevent multiple clicks
resetButton.disabled = true;
// Show a loading message
showFlashMessage('Resetting scraper, please wait...', 'info');
fetch('/scraper/reset', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({
clear_papers: true // You could make this configurable with a checkbox
})
})
.then(response => response.json())
.then(data => {
if (data.success) {
showFlashMessage('Scraper has been completely reset and restarted', 'success');
// Update everything
updateStatus();
loadActivityStats(currentTimeRange);
setTimeout(() => { loadRecentActivity(); }, 1000);
} else {
showFlashMessage(data.message || 'Error resetting scraper', 'error');
}
// Re-enable button
resetButton.disabled = false;
})
.catch(error => {
console.error("Error resetting scraper:", error);
showFlashMessage('Error resetting scraper: ' + error.message, 'error');
// Re-enable button
resetButton.disabled = false;
});
}
}
function updateVolume() {
const volume = document.getElementById('volumeInput').value;