540 lines
17 KiB
Python
540 lines
17 KiB
Python
import random
|
|
import json
|
|
import time
|
|
import math
|
|
from datetime import datetime, timedelta
|
|
from flask import Blueprint, jsonify, render_template, request, current_app, flash
|
|
from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig, ScraperState
|
|
from ..db import db
|
|
from ..celery import celery
|
|
from ..defaults import MAX_VOLUME
|
|
from celery.schedules import crontab
|
|
|
|
bp = Blueprint("scraper", __name__, url_prefix="/scraper")
|
|
|
|
# Setup periodic task to run every minute for testing purposes
|
|
@celery.on_after_configure.connect
|
|
def setup_periodic_tasks(sender, **kwargs):
|
|
# Run the dummy scraper every minute for testing purposes
|
|
sender.add_periodic_task(60.0, run_periodic_dummy_scraper.s(), name='run dummy scraper every minute')
|
|
|
|
@celery.task
|
|
def run_periodic_dummy_scraper():
|
|
"""Periodic task to run the dummy scraper if it's active and not paused"""
|
|
if ScraperState.is_scraper_active():
|
|
dummy_scheduled_scraper.delay()
|
|
return True
|
|
return False
|
|
|
|
@bp.route("/")
|
|
def index():
|
|
"""Render the scraper control panel."""
|
|
volume_config = VolumeConfig.query.first()
|
|
|
|
# Ensure we have volume config
|
|
if not volume_config:
|
|
volume_config = VolumeConfig(volume=100) # Default value
|
|
db.session.add(volume_config)
|
|
db.session.commit()
|
|
|
|
# Get scraper state
|
|
scraper_state = ScraperState.get_current_state()
|
|
|
|
return render_template(
|
|
"scraper.html.jinja",
|
|
volume_config=volume_config,
|
|
scraper_active=scraper_state.is_active,
|
|
scraper_paused=scraper_state.is_paused,
|
|
max_volume=MAX_VOLUME
|
|
)
|
|
|
|
@bp.route("/start", methods=["POST"])
|
|
def start_scraper():
|
|
"""Start the scraper."""
|
|
scraper_state = ScraperState.get_current_state()
|
|
|
|
if not scraper_state.is_active:
|
|
# Update scraper state
|
|
ScraperState.set_active(True)
|
|
ScraperState.set_paused(False)
|
|
|
|
# Log the action
|
|
ActivityLog.log_scraper_command(
|
|
action="start_scraper",
|
|
status="success",
|
|
description="Scheduled scraper started - will follow hourly configuration"
|
|
)
|
|
|
|
# Immediately trigger a task to test the scheduler and provide feedback
|
|
dummy_scheduled_scraper.delay()
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Scraper started - following hourly schedule configuration"
|
|
})
|
|
else:
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Scraper is already running"
|
|
})
|
|
|
|
@bp.route("/stop", methods=["POST"])
|
|
def stop_scraper():
|
|
"""Stop the scraper."""
|
|
scraper_state = ScraperState.get_current_state()
|
|
|
|
if scraper_state.is_active:
|
|
# Update scraper state
|
|
ScraperState.set_active(False)
|
|
ScraperState.set_paused(False)
|
|
|
|
ActivityLog.log_scraper_command(
|
|
action="stop_scraper",
|
|
status="success",
|
|
description="Scraper stopped manually"
|
|
)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Scraper stopped"
|
|
})
|
|
else:
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Scraper is not running"
|
|
})
|
|
|
|
@bp.route("/pause", methods=["POST"])
|
|
def pause_scraper():
|
|
"""Pause the scraper."""
|
|
scraper_state = ScraperState.get_current_state()
|
|
|
|
if scraper_state.is_active and not scraper_state.is_paused:
|
|
# Update scraper state
|
|
ScraperState.set_paused(True)
|
|
|
|
ActivityLog.log_scraper_command(
|
|
action="pause_scraper",
|
|
status="success",
|
|
description="Scraper paused manually"
|
|
)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Scraper paused"
|
|
})
|
|
elif scraper_state.is_active and scraper_state.is_paused:
|
|
# Update scraper state
|
|
ScraperState.set_paused(False)
|
|
|
|
ActivityLog.log_scraper_command(
|
|
action="resume_scraper",
|
|
status="success",
|
|
description="Scraper resumed manually"
|
|
)
|
|
|
|
return jsonify({
|
|
"success": True,
|
|
"message": "Scraper resumed"
|
|
})
|
|
else:
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Scraper is not running"
|
|
})
|
|
|
|
@bp.route("/status")
|
|
def scraper_status():
|
|
"""Get the current status of the scraper."""
|
|
scraper_state = ScraperState.get_current_state()
|
|
|
|
return jsonify({
|
|
"active": scraper_state.is_active,
|
|
"paused": scraper_state.is_paused,
|
|
"current_hour": datetime.now().hour,
|
|
})
|
|
|
|
@bp.route("/stats")
|
|
def scraper_stats():
|
|
"""Get scraper statistics for the dashboard."""
|
|
# Get the last 24 hours of activity
|
|
hours = 24
|
|
if request.args.get('hours'):
|
|
try:
|
|
hours = int(request.args.get('hours'))
|
|
except ValueError:
|
|
pass
|
|
|
|
current_time = datetime.utcnow()
|
|
# Use timedelta for proper date calculation instead of simple hour subtraction
|
|
cutoff_time = (current_time - timedelta(hours=hours)).replace(
|
|
minute=0, second=0, microsecond=0
|
|
)
|
|
|
|
# Get activity logs for scraper actions
|
|
logs = ActivityLog.query.filter(
|
|
ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value,
|
|
ActivityLog.timestamp >= cutoff_time
|
|
).all()
|
|
|
|
# Group by hour and status
|
|
stats = {}
|
|
for hour in range(hours):
|
|
# Calculate the hour as offset from current time
|
|
target_hour = (current_time.hour - hour) % 24
|
|
stats[target_hour] = {
|
|
"success": 0,
|
|
"error": 0,
|
|
"pending": 0,
|
|
"hour": target_hour,
|
|
}
|
|
|
|
for log in logs:
|
|
hour = log.timestamp.hour
|
|
if hour in stats:
|
|
if log.status == "success":
|
|
stats[hour]["success"] += 1
|
|
elif log.status == "error":
|
|
stats[hour]["error"] += 1
|
|
elif log.status == "pending":
|
|
stats[hour]["pending"] += 1
|
|
|
|
# Convert to list for easier consumption by JavaScript
|
|
result = [stats[hour] for hour in sorted(stats.keys())]
|
|
|
|
return jsonify(result)
|
|
|
|
@bp.route("/update_config", methods=["POST"])
|
|
def update_config():
|
|
"""Update scraper configuration."""
|
|
data = request.json
|
|
|
|
try:
|
|
if "volume" in data:
|
|
try:
|
|
new_volume = float(data["volume"])
|
|
|
|
# Validate volume value
|
|
if new_volume <= 0 or new_volume > MAX_VOLUME:
|
|
return jsonify({
|
|
"success": False,
|
|
"message": f"Volume must be between 1 and {MAX_VOLUME}"
|
|
})
|
|
|
|
volume_config = VolumeConfig.query.first()
|
|
if not volume_config:
|
|
volume_config = VolumeConfig(volume=new_volume)
|
|
db.session.add(volume_config)
|
|
else:
|
|
old_value = volume_config.volume
|
|
volume_config.volume = new_volume
|
|
ActivityLog.log_config_change(
|
|
config_key="scraper_volume",
|
|
old_value=old_value,
|
|
new_value=new_volume,
|
|
description="Updated scraper volume"
|
|
)
|
|
|
|
db.session.commit()
|
|
except (ValueError, TypeError):
|
|
return jsonify({
|
|
"success": False,
|
|
"message": "Invalid volume value"
|
|
})
|
|
|
|
return jsonify({"success": True, "message": "Configuration updated"})
|
|
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"})
|
|
|
|
@celery.task(bind=True)
|
|
def dummy_scrape_paper(self):
|
|
"""Simulate scraping a single paper."""
|
|
# Simulate success or failure
|
|
success = random.random() > 0.3 # 70% success rate
|
|
|
|
# Simulate processing time
|
|
import time
|
|
time.sleep(random.randint(2, 5)) # 2-5 seconds
|
|
|
|
if success:
|
|
# Create a dummy paper
|
|
new_paper = PaperMetadata(
|
|
title=f"Dummy Paper {random.randint(1000, 9999)}",
|
|
doi=f"10.1234/dummy.{random.randint(1000, 9999)}",
|
|
journal=random.choice([
|
|
"Nature", "Science", "PLOS ONE", "Journal of Dummy Research",
|
|
"Proceedings of the Dummy Society", "Cell", "Dummy Review Letters"
|
|
]),
|
|
type="article",
|
|
language="en",
|
|
published_online=datetime.now().date(),
|
|
status="Done",
|
|
file_path="/path/to/dummy/paper.pdf"
|
|
)
|
|
|
|
db.session.add(new_paper)
|
|
db.session.commit()
|
|
|
|
# Log the successful scrape
|
|
ActivityLog.log_scraper_activity(
|
|
action="scrape_paper",
|
|
paper_id=new_paper.id,
|
|
status="success",
|
|
description=f"Successfully scraped paper {new_paper.doi}"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"paper_id": new_paper.id,
|
|
"title": new_paper.title,
|
|
"doi": new_paper.doi
|
|
}
|
|
else:
|
|
# Log the failed scrape
|
|
error_message = random.choice([
|
|
"Connection timeout",
|
|
"404 Not Found",
|
|
"Access denied",
|
|
"Invalid DOI format",
|
|
"PDF download failed",
|
|
"Rate limited by publisher"
|
|
])
|
|
|
|
ActivityLog.log_scraper_activity(
|
|
action="scrape_paper",
|
|
status="error",
|
|
description=f"Failed to scrape paper: {error_message}"
|
|
)
|
|
|
|
return {
|
|
"success": False,
|
|
"error": error_message
|
|
}
|
|
|
|
@celery.task
|
|
def calculate_papers_for_current_hour():
|
|
"""
|
|
Calculate how many papers should be downloaded in the current hour
|
|
based on schedule configuration.
|
|
|
|
Returns:
|
|
int: Number of papers to download this hour
|
|
"""
|
|
current_hour = datetime.now().hour
|
|
|
|
# Get volume configuration
|
|
volume_config = VolumeConfig.query.first()
|
|
if not volume_config:
|
|
volume_config = VolumeConfig(volume=100) # Default to 100 papers per day
|
|
db.session.add(volume_config)
|
|
db.session.commit()
|
|
|
|
# Get all schedule configurations to calculate total weight
|
|
schedule_configs = ScheduleConfig.query.all()
|
|
if not schedule_configs:
|
|
# If no schedule configs, create default with equal weights
|
|
for hour in range(24):
|
|
config = ScheduleConfig(hour=hour, weight=1.0)
|
|
db.session.add(config)
|
|
db.session.commit()
|
|
schedule_configs = ScheduleConfig.query.all()
|
|
|
|
# Calculate total weight across all hours
|
|
total_weight = sum(config.weight for config in schedule_configs)
|
|
|
|
# Find the weight for the current hour
|
|
current_hour_config = ScheduleConfig.query.get(current_hour)
|
|
if not current_hour_config:
|
|
# Create config for current hour if it doesn't exist
|
|
current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0)
|
|
db.session.add(current_hour_config)
|
|
db.session.commit()
|
|
|
|
# Calculate papers for current hour: (hour_weight / total_weight) * daily_volume
|
|
if total_weight > 0:
|
|
weight_ratio = current_hour_config.weight / total_weight
|
|
papers_this_hour = math.floor(weight_ratio * volume_config.volume)
|
|
else:
|
|
papers_this_hour = 0
|
|
|
|
return papers_this_hour
|
|
|
|
|
|
@celery.task
|
|
def dummy_scheduled_scraper():
|
|
"""
|
|
The main scheduler task that runs every hour to process papers
|
|
according to the configured schedule.
|
|
"""
|
|
# Check if scraper is active using ScraperState
|
|
if not ScraperState.is_scraper_active():
|
|
ActivityLog.log_scraper_activity(
|
|
action="scheduled_scraping",
|
|
status="info",
|
|
description=f"Scheduled scraping skipped: inactive or paused"
|
|
)
|
|
return False
|
|
|
|
# Calculate how many papers to download this hour
|
|
papers_to_download = calculate_papers_for_current_hour()
|
|
|
|
if papers_to_download <= 0:
|
|
ActivityLog.log_scraper_activity(
|
|
action="scheduled_scraping",
|
|
status="info",
|
|
description=f"No papers scheduled for current hour"
|
|
)
|
|
return True
|
|
|
|
# Get all pending papers
|
|
pending_papers = PaperMetadata.query.filter_by(status="Pending").all()
|
|
|
|
# If no pending papers available, create some dummy pending papers
|
|
if not pending_papers:
|
|
ActivityLog.log_scraper_activity(
|
|
action="scheduled_scraping",
|
|
status="info",
|
|
description=f"No pending papers found - creating {papers_to_download} dummy pending papers"
|
|
)
|
|
|
|
# Create dummy pending papers
|
|
for i in range(papers_to_download):
|
|
# Generate a unique DOI by checking if it exists in the database
|
|
while True:
|
|
random_id = random.randint(1000, 9999)
|
|
doi = f"10.1234/dummy-pending.{random_id}"
|
|
|
|
# Check if the DOI already exists
|
|
existing = PaperMetadata.query.filter_by(doi=doi).first()
|
|
if not existing:
|
|
break
|
|
|
|
new_paper = PaperMetadata(
|
|
title=f"Dummy Pending Paper {random_id}",
|
|
doi=doi,
|
|
journal=random.choice([
|
|
"Nature", "Science", "PLOS ONE", "Journal of Dummy Research",
|
|
"Proceedings of the Dummy Society", "Cell", "Dummy Review Letters"
|
|
]),
|
|
type="article",
|
|
language="en",
|
|
published_online=datetime.now().date(),
|
|
status="Pending"
|
|
)
|
|
db.session.add(new_paper)
|
|
|
|
# Commit all at once after creating all papers
|
|
try:
|
|
db.session.commit()
|
|
except Exception as e:
|
|
# Log the error and rollback
|
|
ActivityLog.log_error(
|
|
error_message="Failed to create dummy pending papers",
|
|
exception=e,
|
|
source="dummy_scheduled_scraper"
|
|
)
|
|
db.session.rollback()
|
|
return False
|
|
|
|
# Get the newly created papers
|
|
pending_papers = PaperMetadata.query.filter_by(status="Pending").all()
|
|
|
|
# Select papers_to_download random papers from pending_papers
|
|
selected_papers = random.sample(
|
|
pending_papers,
|
|
min(papers_to_download, len(pending_papers))
|
|
)
|
|
|
|
ActivityLog.log_scraper_activity(
|
|
action="scheduled_scraping",
|
|
status="info",
|
|
description=f"Starting scheduled scraping of {len(selected_papers)} papers for hour {datetime.now().hour}"
|
|
)
|
|
|
|
# For each paper, schedule it to run at a random time within the hour
|
|
current_time = time.time()
|
|
one_hour_in_seconds = 3600
|
|
|
|
for paper in selected_papers:
|
|
# Random delay within this hour (0 to 60 minutes)
|
|
random_delay = random.randint(0, one_hour_in_seconds)
|
|
|
|
# Schedule the dummy_process_paper task with the random delay
|
|
dummy_process_paper.apply_async(
|
|
args=[paper.id],
|
|
countdown=random_delay
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
@celery.task(bind=True)
|
|
def dummy_process_paper(self, paper_id):
|
|
"""
|
|
Process a single paper for the dummy scraper.
|
|
|
|
Args:
|
|
paper_id (int): ID of the paper to process
|
|
"""
|
|
# Get the paper from database
|
|
paper = PaperMetadata.query.get(paper_id)
|
|
if not paper:
|
|
# Log error if paper not found
|
|
ActivityLog.log_scraper_activity(
|
|
action="process_paper",
|
|
status="error",
|
|
description=f"Paper with ID {paper_id} not found"
|
|
)
|
|
return False
|
|
|
|
# Simulate random success/failure (70% success rate)
|
|
success = random.random() < 0.7
|
|
|
|
# Simulate processing time (1-5 seconds)
|
|
process_time = random.uniform(1, 5)
|
|
time.sleep(process_time)
|
|
|
|
if success:
|
|
# Update paper status to "Done"
|
|
paper.status = "Done"
|
|
paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf"
|
|
|
|
# Log success
|
|
ActivityLog.log_scraper_activity(
|
|
action="process_paper",
|
|
paper_id=paper.id,
|
|
status="success",
|
|
description=f"Successfully processed paper: {paper.title}"
|
|
)
|
|
else:
|
|
# Update paper status to "Failed"
|
|
paper.status = "Failed"
|
|
|
|
# Generate random error message
|
|
error_message = random.choice([
|
|
"Publisher website unavailable",
|
|
"No PDF download link found",
|
|
"Access restricted",
|
|
"Download timeout",
|
|
"Invalid DOI",
|
|
"Rate limited by publisher"
|
|
])
|
|
paper.error_msg = error_message
|
|
|
|
# Log failure
|
|
ActivityLog.log_scraper_activity(
|
|
action="process_paper",
|
|
paper_id=paper.id,
|
|
status="error",
|
|
description=f"Failed to process paper: {error_message}"
|
|
)
|
|
|
|
# Update the timestamp
|
|
paper.updated_at = datetime.utcnow()
|
|
|
|
# Commit changes to database
|
|
db.session.commit()
|
|
|
|
return success |