798 lines
29 KiB
Python

"""
Simplified scraper blueprint using the new ScraperManager and hourly scheduling system.
"""
from flask import Blueprint, jsonify, render_template, request, current_app
from ..models import ActivityLog, PaperMetadata, ScraperState, VolumeConfig
from ..scrapers.manager import ScraperManager
from ..scrapers.factory import get_available_scrapers
from ..db import db
from ..defaults import MAX_VOLUME
from datetime import datetime, timedelta
bp = Blueprint("scraper", __name__, url_prefix="/scraper")
# Initialize the scraper manager
scraper_manager = ScraperManager()
@bp.route("/")
def index():
"""Main scraper page."""
# Get current scraper state
scraper_state = ScraperState.get_current_state()
# Get available scrapers
available_scrapers = get_available_scrapers()
# Get recent activity logs
recent_logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).limit(50).all()
# Get volume configuration
volume_config = VolumeConfig.get_current_volume()
# Get scraper module configuration
from ..models import ScraperModuleConfig
current_scraper_module = ScraperModuleConfig.get_current_module()
# Get paper counts by status
paper_counts = {
'new': PaperMetadata.query.filter_by(status='New').count(),
'processing': PaperMetadata.query.filter_by(status='Processing').count(),
'done': PaperMetadata.query.filter_by(status='Done').count(),
'failed': PaperMetadata.query.filter_by(status='Failed').count(),
'pending': PaperMetadata.query.filter_by(status='Pending').count(),
'retrying': PaperMetadata.query.filter_by(status='Retrying').count(),
}
return render_template(
"scraper.html.jinja",
scraper_state=scraper_state,
available_scrapers=available_scrapers,
recent_logs=recent_logs,
paper_counts=paper_counts,
volume_config=volume_config,
max_volume=MAX_VOLUME,
current_scraper_module=current_scraper_module,
available_scraper_modules=[s["name"] for s in available_scrapers],
scraper_details={s["name"]: s for s in available_scrapers}
)
@bp.route("/start", methods=["POST"])
def start_scraper():
"""Start the hourly scraper scheduling."""
try:
# Handle both JSON and form data
if request.is_json:
data = request.get_json()
# Allow empty JSON payload for start requests
if data is None:
data = {}
else:
return jsonify({"success": False, "message": "Invalid payload format. Expected JSON."}), 400
# Start the scraper using manager
result = scraper_manager.start_scraper()
if result["status"] == "success":
ActivityLog.log_scraper_command(
action="start_scraper",
status="success",
description="Scraper started successfully."
)
return jsonify({"success": True, "message": result["message"]})
else:
ActivityLog.log_scraper_command(
action="start_scraper",
status="failure",
description=f"Failed to start scraper: {result['message']}"
)
return jsonify({"success": False, "message": result["message"]}), 400
except Exception as e:
ActivityLog.log_scraper_command(
action="start_scraper",
status="error",
description=f"Failed to start scraper: {str(e)}"
)
return jsonify({"success": False, "message": f"An error occurred: {str(e)}"}), 500
@bp.route("/pause", methods=["POST"])
def pause_scraper():
"""Pause the scraper."""
try:
result = scraper_manager.pause_scraper()
if result["status"] == "success":
ActivityLog.log_scraper_command(
action="pause_scraper",
status="success",
description="Scraper paused successfully"
)
return jsonify({
"success": True,
"message": result["message"]
})
else:
return jsonify({
"success": False,
"message": result["message"]
}), 400
except Exception as e:
ActivityLog.log_scraper_command(
action="pause_scraper",
status="error",
description=f"Failed to pause scraper: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error pausing scraper: {str(e)}"
}), 500
@bp.route("/stop", methods=["POST"])
def stop_scraper():
"""Stop the scraper and revert processing papers."""
try:
result = scraper_manager.stop_scraper()
# Add debugging to see what the manager returns
print(f"DEBUG: stop_scraper result: {result}")
# Always log the stop attempt regardless of result
ActivityLog.log_scraper_command(
action="stop_scraper_attempt",
status=result.get("status", "unknown"),
description=f"Stop scraper called - result: {result}"
)
if result["status"] == "success":
ActivityLog.log_scraper_command(
action="stop_scraper",
status="success",
description="Scraper stopped and papers reverted to original status"
)
return jsonify({
"success": True,
"message": result["message"]
})
else:
return jsonify({
"success": False,
"message": result["message"]
}), 400
except Exception as e:
ActivityLog.log_scraper_command(
action="stop_scraper",
status="error",
description=f"Failed to stop scraper: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error stopping scraper: {str(e)}"
}), 500
@bp.route("/reset", methods=["POST"])
def reset_scraper():
"""Reset the scraper state and revert all processing papers."""
try:
result = scraper_manager.reset_scraper()
if result["status"] == "success":
ActivityLog.log_scraper_command(
action="reset_scraper",
status="success",
description="Scraper reset and all processing papers reverted"
)
return jsonify({
"success": True,
"message": result["message"]
})
else:
return jsonify({
"success": False,
"message": result["message"]
}), 400
except Exception as e:
ActivityLog.log_scraper_command(
action="reset_scraper",
status="error",
description=f"Failed to reset scraper: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error resetting scraper: {str(e)}"
}), 500
@bp.route("/status")
def get_status():
"""Get current scraper status and statistics."""
try:
scraper_state = ScraperState.get_current_state()
# Get paper counts by status
paper_counts = {
'new': PaperMetadata.query.filter_by(status='New').count(),
'processing': PaperMetadata.query.filter_by(status='Processing').count(),
'done': PaperMetadata.query.filter_by(status='Done').count(),
'failed': PaperMetadata.query.filter_by(status='Failed').count(),
'pending': PaperMetadata.query.filter_by(status='Pending').count(),
'retrying': PaperMetadata.query.filter_by(status='Retrying').count(),
}
# Get current hour quota info
current_quota = scraper_manager.get_current_hour_quota()
# Get current scraper module configuration
from ..models import ScraperModuleConfig
current_scraper_module = ScraperModuleConfig.get_current_module()
# Get volume configuration
current_volume = VolumeConfig.get_current_volume()
return jsonify({
"success": True,
"scraper_state": {
"active": scraper_state.is_active,
"paused": scraper_state.is_paused,
"last_updated": scraper_state.last_updated.isoformat() if scraper_state.last_updated else None
},
"paper_counts": paper_counts,
"current_quota": current_quota,
"current_scraper_module": current_scraper_module,
"volume_config": current_volume
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Error getting status: {str(e)}"
}), 500
@bp.route("/logs")
def get_logs():
"""Get recent activity logs with pagination support."""
try:
# Pagination parameters
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# Legacy limit parameter for backward compatibility
limit = request.args.get('limit', type=int)
if limit and not request.args.get('page'):
# Legacy mode: use limit without pagination
logs = ActivityLog.query.order_by(ActivityLog.timestamp.desc()).limit(limit).all()
return jsonify({
"success": True,
"logs": [{
"id": log.id,
"timestamp": log.timestamp.isoformat(),
"action": log.action,
"status": log.status,
"description": log.description,
"category": log.category
} for log in logs]
})
# Ensure reasonable per_page limits
per_page = min(per_page, 100) # Cap at 100 items per page
# Build query with optional filtering
query = ActivityLog.query
# Filter by categories if specified
categories = request.args.getlist('category')
if categories:
query = query.filter(ActivityLog.category.in_(categories))
# Filter by status if specified
status = request.args.get('status')
if status:
query = query.filter(ActivityLog.status == status)
# Order by most recent first and paginate
pagination = query.order_by(ActivityLog.timestamp.desc()).paginate(
page=page,
per_page=per_page,
error_out=False
)
return jsonify({
"success": True,
"logs": [{
"id": log.id,
"timestamp": log.timestamp.isoformat(),
"action": log.action,
"status": log.status,
"description": log.description,
"category": log.category
} for log in pagination.items],
"pagination": {
"page": pagination.page,
"pages": pagination.pages,
"per_page": pagination.per_page,
"total": pagination.total,
"has_next": pagination.has_next,
"has_prev": pagination.has_prev,
"next_num": pagination.next_num if pagination.has_next else None,
"prev_num": pagination.prev_num if pagination.has_prev else None
}
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Error getting logs: {str(e)}"
}), 500
@bp.route("/scrapers")
def get_scrapers():
"""Get available scrapers and their configurations."""
try:
available_scrapers = get_available_scrapers()
scraper_info = []
for scraper_dict in available_scrapers:
try:
scraper_class = scraper_dict["class"]
scraper_info.append({
"name": scraper_dict["name"],
"description": scraper_dict["description"],
"input_statuses": list(scraper_class.INPUT_STATUSES),
"output_status_success": scraper_class.OUTPUT_STATUS_SUCCESS,
"output_status_failure": scraper_class.OUTPUT_STATUS_FAILURE,
"output_status_processing": scraper_class.OUTPUT_STATUS_PROCESSING
})
except Exception as e:
scraper_info.append({
"name": scraper_dict.get("name", "unknown"),
"error": f"Failed to load scraper info: {str(e)}"
})
return jsonify({
"success": True,
"scrapers": scraper_info
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Error getting scrapers: {str(e)}"
}), 500
@bp.route("/process-papers", methods=["POST"])
def process_papers_manually():
"""Manually trigger paper processing for current hour."""
try:
data = request.get_json() or {}
scraper_name = data.get('scraper_name')
if not scraper_name:
return jsonify({
"success": False,
"message": "Scraper name is required"
}), 400
# Process papers for current hour
papers = scraper_manager.select_papers_for_processing()
processed_count = len(papers) if papers else 0
result_msg = f"Manual processing triggered - {processed_count} papers selected for processing"
ActivityLog.log_scraper_command(
action="manual_process",
status="success",
description=result_msg
)
return jsonify({
"success": True,
"message": result_msg,
"processed_count": processed_count
})
except Exception as e:
ActivityLog.log_scraper_command(
action="manual_process",
status="error",
description=f"Failed to manually process papers: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error processing papers: {str(e)}"
}), 500
@bp.route("/trigger-immediate", methods=["POST"])
def trigger_immediate_processing():
"""Trigger immediate processing of papers without waiting for hourly schedule."""
try:
# Get papers that should be processed this hour
manager = ScraperManager()
papers = manager.select_papers_for_processing()
if not papers:
return jsonify({
"success": True,
"message": "No papers available for immediate processing",
"papers_scheduled": 0
})
# Get APScheduler instance
scheduler = current_app.config.get('SCHEDULER')
if not scheduler:
return jsonify({
"success": False,
"message": "APScheduler not available"
}), 500
# Schedule papers for immediate processing via APScheduler
scheduled_count = 0
for paper in papers:
try:
import uuid
job_id = f"immediate_paper_{paper.id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}"
scheduler.schedule_paper_processing(paper.id, delay_seconds=1, job_id=job_id)
scheduled_count += 1
except Exception as e:
ActivityLog.log_error(
error_message=f"Failed to schedule paper {paper.id}: {str(e)}",
source="trigger_immediate_processing"
)
ActivityLog.log_scraper_command(
action="trigger_immediate_processing",
status="success",
description=f"Triggered immediate processing of {scheduled_count} papers via APScheduler"
)
return jsonify({
"success": True,
"message": f"Immediate processing started for {scheduled_count} papers",
"papers_scheduled": scheduled_count
})
except Exception as e:
ActivityLog.log_scraper_command(
action="trigger_immediate_processing",
status="error",
description=f"Failed to trigger immediate processing: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error triggering immediate processing: {str(e)}"
}), 500
@bp.route("/available_scrapers")
def get_available_scrapers_endpoint():
"""Get available scrapers for the UI dropdown."""
try:
available_scrapers = get_available_scrapers()
return jsonify({
"success": True,
"scrapers": [{
"name": scraper["name"],
"description": scraper["description"],
"is_current": False # Could implement current scraper detection
} for scraper in available_scrapers]
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Error getting scrapers: {str(e)}"
}), 500
@bp.route("/stats")
def get_stats():
"""Get scraper statistics for the dashboard."""
try:
hours = int(request.args.get('hours', 24))
current_time = datetime.utcnow()
# Get activity logs for scraper actions in the last N hours
from ..models import ActivityCategory
start_time = current_time - timedelta(hours=hours)
logs = ActivityLog.query.filter(
ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value,
ActivityLog.timestamp >= start_time
).all()
# Get scraper command logs for state changes in the same time period
state_logs = ActivityLog.query.filter(
ActivityLog.category == ActivityCategory.SCRAPER_COMMAND.value,
ActivityLog.action.in_(['start_scraper', 'pause_scraper', 'stop_scraper', 'reset_scraper']),
ActivityLog.timestamp >= start_time
).order_by(ActivityLog.timestamp.asc()).all()
# Group by chronological hour buckets (not hour of day)
stats = []
for hour_offset in range(hours):
# Calculate the hour bucket (most recent hour first when hour_offset=0)
bucket_end_time = current_time - timedelta(hours=hour_offset)
bucket_start_time = bucket_end_time - timedelta(hours=1)
# Format hour label for display (e.g., "14:00-15:00" or "14:00" for simplicity)
hour_label = bucket_start_time.strftime("%H:%M")
# Initialize counters for this hour bucket
bucket_stats = {
"success": 0,
"error": 0,
"pending": 0,
"hour": hour_label,
"hour_offset": hour_offset, # For sorting
"bucket_start": bucket_start_time,
"bucket_end": bucket_end_time,
"scraper_active": 0 # Default to inactive
}
# Count logs that fall within this hour bucket
for log in logs:
if bucket_start_time <= log.timestamp < bucket_end_time:
if log.status == "success":
bucket_stats["success"] += 1
elif log.status == "error":
bucket_stats["error"] += 1
elif log.status in ("pending", "info"):
bucket_stats["pending"] += 1
# Determine scraper status for this hour by checking if scraper was active
# For simplicity, check if there were any successful scrapes in this hour
# If there were scrapes, assume scraper was active
bucket_stats["scraper_active"] = 1 if bucket_stats["success"] > 0 else 0
stats.append(bucket_stats)
# Reverse so oldest hour comes first (better for chronological chart display)
stats.reverse()
# Prepare precise scraper state changes for timeline
scraper_timeline = []
for log in state_logs:
# Calculate hours ago from current time
time_diff = current_time - log.timestamp
hours_ago = time_diff.total_seconds() / 3600
# Only include logs within our time range
if hours_ago <= hours:
scraper_timeline.append({
"timestamp": log.timestamp.isoformat(),
"hours_ago": hours_ago,
"action": log.action,
"status": log.status,
"active": 1 if log.action == "start_scraper" and log.status == "success" else 0
})
# Clean up the response (remove internal fields)
result = []
for stat in stats:
result.append({
"success": stat["success"],
"error": stat["error"],
"pending": stat["pending"],
"hour": stat["hour"],
"scraper_active": stat["scraper_active"]
})
return jsonify({
"hourly_stats": result,
"scraper_timeline": scraper_timeline
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Error getting stats: {str(e)}"
}), 500
@bp.route("/process_single/<int:paper_id>", methods=["POST"])
def process_single_paper_endpoint(paper_id):
"""Process a single paper by ID."""
try:
data = request.get_json() or {}
scraper_name = data.get('scraper_module')
# Get the paper
paper = PaperMetadata.query.get(paper_id)
if not paper:
return jsonify({
"success": False,
"message": "Paper not found"
}), 404
# Get APScheduler instance
scheduler = current_app.config.get('SCHEDULER')
if not scheduler:
return jsonify({
"success": False,
"message": "APScheduler not available"
}), 500
# Schedule the paper for immediate manual processing via APScheduler
# Use UUID suffix to ensure unique job IDs
import uuid
job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}"
try:
scheduler.schedule_manual_paper_processing(paper_id, scraper_name=scraper_name, delay_seconds=1, job_id=job_id)
ActivityLog.log_scraper_command(
action="manual_process_single",
status="success",
description=f"Scheduled manual processing for paper {paper.doi} via APScheduler" +
(f" using scraper '{scraper_name}'" if scraper_name else " using system default scraper")
)
return jsonify({
"success": True,
"message": f"Processing scheduled for paper {paper.doi}" +
(f" using {scraper_name} scraper" if scraper_name else " using system default scraper"),
"paper_id": paper_id
})
except Exception as e:
return jsonify({
"success": False,
"message": f"Failed to schedule processing: {str(e)}"
}), 500
except Exception as e:
ActivityLog.log_scraper_command(
action="manual_process_single",
status="error",
description=f"Failed to process paper {paper_id}: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error processing paper: {str(e)}"
}), 500
@bp.route("/update_config", methods=["POST"])
def update_scraper_config():
"""Update scraper configuration."""
try:
data = request.get_json() or {}
# Handle volume configuration updates for daily quota
if "volume" in data:
# Import the helper function from config module
from .config import _update_volume
new_volume = data["volume"]
success, message, volume_config = _update_volume(new_volume)
if success:
ActivityLog.log_scraper_command(
action="update_volume_config",
status="success",
description=f"Updated daily volume to {new_volume} papers per day"
)
return jsonify({
"success": True,
"message": message
})
else:
return jsonify({
"success": False,
"message": message
}), 400
# Handle scraper module configuration updates
if "scraper_module" in data:
from ..models import ScraperModuleConfig
new_module = data["scraper_module"]
# Validate that the module exists and is valid
available_modules = [m["name"] for m in get_available_scrapers()]
if new_module not in available_modules:
return jsonify({
"success": False,
"message": f"Invalid scraper module: {new_module}"
}), 400
# Update the database configuration
ScraperModuleConfig.set_module(new_module)
ActivityLog.log_scraper_command(
action="update_scraper_module",
status="success",
description=f"Updated scraper module to '{new_module}'"
)
return jsonify({
"success": True,
"message": f"Scraper module updated to '{new_module}' successfully"
})
# Handle other configuration updates here if needed in the future
return jsonify({
"success": True,
"message": "Configuration updated successfully"
})
except Exception as e:
ActivityLog.log_scraper_command(
action="update_scraper_config",
status="error",
description=f"Failed to update scraper config: {str(e)}"
)
return jsonify({
"success": False,
"message": f"Error updating scraper config: {str(e)}"
}), 500
@bp.route("/publishers")
def get_publishers():
"""Get publisher overview data for the scraper overview modal."""
try:
import os
import glob
# Get available parser modules
parsers_dir = os.path.join(current_app.root_path, 'parsers')
parser_files = glob.glob(os.path.join(parsers_dir, '*_parser.py'))
available_parsers = []
for parser_file in parser_files:
filename = os.path.basename(parser_file)
if filename != 'base_parser.py': # Skip the base parser
parser_name = filename.replace('_parser.py', '')
available_parsers.append(parser_name)
# Get publishers from database (papers that have publisher detected)
publisher_query = db.session.query(
PaperMetadata.publisher,
db.func.count(PaperMetadata.id).label('paper_count')
).filter(
PaperMetadata.publisher.isnot(None),
PaperMetadata.publisher != ''
).group_by(PaperMetadata.publisher).all()
publishers_data = []
for publisher, count in publisher_query:
# Check if a parser exists for this publisher
has_parser = publisher in available_parsers
publishers_data.append({
'name': publisher,
'paper_count': count,
'has_parser': has_parser,
'parser_status': 'available' if has_parser else 'missing'
})
# Sort by paper count descending
publishers_data.sort(key=lambda x: x['paper_count'], reverse=True)
# Get totals
total_papers_with_publisher = sum(p['paper_count'] for p in publishers_data)
total_papers_without_publisher = PaperMetadata.query.filter(
db.or_(PaperMetadata.publisher.is_(None), PaperMetadata.publisher == '')
).count()
return jsonify({
'success': True,
'data': {
'publishers': publishers_data,
'available_parsers': available_parsers,
'stats': {
'total_publishers': len(publishers_data),
'publishers_with_parsers': len([p for p in publishers_data if p['has_parser']]),
'publishers_without_parsers': len([p for p in publishers_data if not p['has_parser']]),
'total_papers_with_publisher': total_papers_with_publisher,
'total_papers_without_publisher': total_papers_without_publisher
}
}
})
except Exception as e:
return jsonify({
'success': False,
'message': f'Error getting publisher data: {str(e)}'
}), 500