2025-04-16 15:19:28 +02:00

212 lines
7.0 KiB
Python

"""Schedule configuration and scheduling logic."""
from datetime import datetime
import random
import json
from flask import Blueprint, flash, render_template, request, jsonify
from ..db import db
from ..models import ScheduleConfig, VolumeConfig, ActivityLog, ActivityCategory
from ..celery import celery
from .scraper import SCRAPER_ACTIVE, SCRAPER_PAUSED, dummy_scrape_paper
from .config import _update_volume, _update_schedule
bp = Blueprint("schedule", __name__, url_prefix="/schedule")
@bp.route("/", methods=["GET", "POST"])
def schedule():
"""Render and handle the schedule configuration page."""
if request.method == "POST":
try:
# Check if we're updating volume or schedule
if "total_volume" in request.form:
# Volume update using the centralized helper
new_volume = request.form.get("total_volume", 0)
success, message, _ = _update_volume(new_volume)
if success:
flash(message, "success")
else:
flash(message, "error")
else:
# Schedule update using the centralized helper
schedule_data = {}
for hour in range(24):
key = f"hour_{hour}"
if key not in request.form:
flash(f"Missing data for hour {hour}", "error")
break
schedule_data[str(hour)] = request.form.get(key, 0)
if len(schedule_data) == 24:
success, message = _update_schedule(schedule_data)
if success:
flash(message, "success")
else:
flash(message, "error")
except Exception as e:
db.session.rollback()
flash(f"Error: {str(e)}", "error")
# Ensure we have schedule config for all hours
existing_hours = {record.hour: record for record in ScheduleConfig.query.all()}
schedule_config = {}
for hour in range(24):
if hour in existing_hours:
schedule_config[hour] = existing_hours[hour].weight
else:
# Create default schedule entry (weight 1.0)
new_config = ScheduleConfig(hour=hour, weight=1.0)
db.session.add(new_config)
schedule_config[hour] = 1.0
if len(existing_hours) < 24:
db.session.commit()
volume = VolumeConfig.query.first()
return render_template(
"schedule.html.jinja",
schedule=schedule_config,
volume=volume.volume if volume else 0,
stats=get_schedule_stats(),
app_title="PaperScraper",
)
@bp.route("/update_config", methods=["POST"])
def update_config():
"""Update schedule configuration via API."""
data = request.json
response = {"success": True, "updates": []}
try:
# Update volume if provided
if "volume" in data:
success, message, _ = _update_volume(data["volume"])
response["updates"].append({
"type": "volume",
"success": success,
"message": message
})
if not success:
response["success"] = False
# Update schedule if provided
if "schedule" in data:
success, message = _update_schedule(data["schedule"])
response["updates"].append({
"type": "schedule",
"success": success,
"message": message
})
if not success:
response["success"] = False
return jsonify(response)
except Exception as e:
db.session.rollback()
return jsonify({
"success": False,
"message": f"Unexpected error: {str(e)}"
})
# Calculate schedule information for visualization/decision making
def get_schedule_stats():
"""Get statistics about the current schedule configuration."""
volume_config = VolumeConfig.query.first()
if not volume_config:
return {"error": "No volume configuration found"}
total_volume = volume_config.volume
schedule_configs = ScheduleConfig.query.all()
if not schedule_configs:
return {"error": "No schedule configuration found"}
# Calculate total weight
total_weight = sum(config.weight for config in schedule_configs)
# Calculate papers per hour
papers_per_hour = {}
hourly_weights = {}
for config in schedule_configs:
weight_ratio = config.weight / total_weight if total_weight > 0 else 0
papers = weight_ratio * total_volume
papers_per_hour[config.hour] = papers
hourly_weights[config.hour] = config.weight
return {
"total_volume": total_volume,
"total_weight": total_weight,
"papers_per_hour": papers_per_hour,
"hourly_weights": hourly_weights
}
# API route to get schedule information
@bp.route("/schedule_info")
def schedule_info():
"""Get information about the current schedule configuration."""
stats = get_schedule_stats()
return jsonify(stats)
# Define the Celery tasks for the scheduler
@celery.task(bind=True)
def start_scheduler(self):
"""Start the scheduler when the scraper is started."""
if SCRAPER_ACTIVE and not SCRAPER_PAUSED:
# Schedule the first run immediately
scheduler_task.delay()
return {"status": "success", "message": "Scheduler started"}
return {"status": "error", "message": "Scraper not active or paused"}
@celery.task(bind=True)
def scheduler_task(self):
"""Main scheduler task for the scraper."""
if not SCRAPER_ACTIVE:
return {"status": "Scraper not active"}
if SCRAPER_PAUSED:
return {"status": "Scraper paused"}
# Calculate how many papers to scrape based on current hour and configuration
current_hour = datetime.now().hour
hour_config = ScheduleConfig.query.get(current_hour)
volume_config = VolumeConfig.query.first()
if not hour_config or not volume_config:
return {"status": "Missing configuration"}
# Calculate papers to scrape this hour
stats = get_schedule_stats()
papers_to_scrape = int(stats["papers_per_hour"].get(current_hour, 0))
# Log the scheduling decision
ActivityLog.log_scraper_activity(
action="schedule_papers",
status="success",
description=f"Scheduled {papers_to_scrape} papers for scraping at hour {current_hour}",
extra_data=json.dumps({
"hour": current_hour,
"weight": hour_config.weight,
"total_volume": volume_config.volume
})
)
# Execute the actual scraping tasks
for _ in range(papers_to_scrape):
# Queue up scraping tasks - in real implementation, this would
# call the actual scraper task
dummy_scrape_paper.delay()
return {
"status": "success",
"papers_scheduled": papers_to_scrape,
"hour": current_hour
}