import random import json import time import math from datetime import datetime, timedelta from flask import Blueprint, jsonify, render_template, request, current_app, flash from ..models import VolumeConfig, ActivityLog, PaperMetadata, ActivityCategory, ScheduleConfig from ..db import db from ..celery import celery from ..defaults import MAX_VOLUME bp = Blueprint("scraper", __name__, url_prefix="/scraper") # Global variables to track scraper state SCRAPER_ACTIVE = False SCRAPER_PAUSED = False @bp.route("/") def index(): """Render the scraper control panel.""" volume_config = VolumeConfig.query.first() # Ensure we have volume config if not volume_config: volume_config = VolumeConfig(volume=100) # Default value db.session.add(volume_config) db.session.commit() return render_template( "scraper.html.jinja", volume_config=volume_config, scraper_active=SCRAPER_ACTIVE, scraper_paused=SCRAPER_PAUSED, max_volume=MAX_VOLUME ) @bp.route("/start", methods=["POST"]) def start_scraper(): """Start the scraper.""" global SCRAPER_ACTIVE, SCRAPER_PAUSED if not SCRAPER_ACTIVE: SCRAPER_ACTIVE = True SCRAPER_PAUSED = False # Log the action ActivityLog.log_scraper_command( action="start_scraper", status="success", description="Scheduled scraper started - will follow hourly configuration" ) # Immediately trigger a task to test the scheduler and provide feedback dummy_scheduled_scraper.delay() return jsonify({ "success": True, "message": "Scraper started - following hourly schedule configuration" }) else: return jsonify({ "success": False, "message": "Scraper is already running" }) @bp.route("/stop", methods=["POST"]) def stop_scraper(): """Stop the scraper.""" global SCRAPER_ACTIVE, SCRAPER_PAUSED if SCRAPER_ACTIVE: SCRAPER_ACTIVE = False SCRAPER_PAUSED = False ActivityLog.log_scraper_command( action="stop_scraper", status="success", description="Scraper stopped manually" ) return jsonify({ "success": True, "message": "Scraper stopped" }) else: return jsonify({ "success": False, "message": "Scraper is not running" }) @bp.route("/pause", methods=["POST"]) def pause_scraper(): """Pause the scraper.""" global SCRAPER_ACTIVE, SCRAPER_PAUSED if SCRAPER_ACTIVE and not SCRAPER_PAUSED: SCRAPER_PAUSED = True ActivityLog.log_scraper_command( action="pause_scraper", status="success", description="Scraper paused manually" ) return jsonify({ "success": True, "message": "Scraper paused" }) elif SCRAPER_ACTIVE and SCRAPER_PAUSED: SCRAPER_PAUSED = False ActivityLog.log_scraper_command( action="resume_scraper", status="success", description="Scraper resumed manually" ) return jsonify({ "success": True, "message": "Scraper resumed" }) else: return jsonify({ "success": False, "message": "Scraper is not running" }) @bp.route("/status") def scraper_status(): """Get the current status of the scraper.""" return jsonify({ "active": SCRAPER_ACTIVE, "paused": SCRAPER_PAUSED, "current_hour": datetime.now().hour, }) @bp.route("/stats") def scraper_stats(): """Get scraper statistics for the dashboard.""" # Get the last 24 hours of activity hours = 24 if request.args.get('hours'): try: hours = int(request.args.get('hours')) except ValueError: pass current_time = datetime.utcnow() # Use timedelta for proper date calculation instead of simple hour subtraction cutoff_time = (current_time - timedelta(hours=hours)).replace( minute=0, second=0, microsecond=0 ) # Get activity logs for scraper actions logs = ActivityLog.query.filter( ActivityLog.category == ActivityCategory.SCRAPER_ACTIVITY.value, ActivityLog.timestamp >= cutoff_time ).all() # Group by hour and status stats = {} for hour in range(hours): # Calculate the hour as offset from current time target_hour = (current_time.hour - hour) % 24 stats[target_hour] = { "success": 0, "error": 0, "pending": 0, "hour": target_hour, } for log in logs: hour = log.timestamp.hour if hour in stats: if log.status == "success": stats[hour]["success"] += 1 elif log.status == "error": stats[hour]["error"] += 1 elif log.status == "pending": stats[hour]["pending"] += 1 # Convert to list for easier consumption by JavaScript result = [stats[hour] for hour in sorted(stats.keys())] return jsonify(result) @bp.route("/update_config", methods=["POST"]) def update_config(): """Update scraper configuration.""" data = request.json try: if "volume" in data: try: new_volume = float(data["volume"]) # Validate volume value if new_volume <= 0 or new_volume > MAX_VOLUME: return jsonify({ "success": False, "message": f"Volume must be between 1 and {MAX_VOLUME}" }) volume_config = VolumeConfig.query.first() if not volume_config: volume_config = VolumeConfig(volume=new_volume) db.session.add(volume_config) else: old_value = volume_config.volume volume_config.volume = new_volume ActivityLog.log_config_change( config_key="scraper_volume", old_value=old_value, new_value=new_volume, description="Updated scraper volume" ) db.session.commit() except (ValueError, TypeError): return jsonify({ "success": False, "message": "Invalid volume value" }) return jsonify({"success": True, "message": "Configuration updated"}) except Exception as e: db.session.rollback() return jsonify({"success": False, "message": f"Unexpected error: {str(e)}"}) @celery.task(bind=True) def dummy_scrape_paper(self): """Simulate scraping a single paper.""" # Simulate success or failure success = random.random() > 0.3 # 70% success rate # Simulate processing time import time time.sleep(random.randint(2, 5)) # 2-5 seconds if success: # Create a dummy paper new_paper = PaperMetadata( title=f"Dummy Paper {random.randint(1000, 9999)}", doi=f"10.1234/dummy.{random.randint(1000, 9999)}", journal=random.choice([ "Nature", "Science", "PLOS ONE", "Journal of Dummy Research", "Proceedings of the Dummy Society", "Cell", "Dummy Review Letters" ]), type="article", language="en", published_online=datetime.now().date(), status="Done", file_path="/path/to/dummy/paper.pdf" ) db.session.add(new_paper) db.session.commit() # Log the successful scrape ActivityLog.log_scraper_activity( action="scrape_paper", paper_id=new_paper.id, status="success", description=f"Successfully scraped paper {new_paper.doi}" ) return { "success": True, "paper_id": new_paper.id, "title": new_paper.title, "doi": new_paper.doi } else: # Log the failed scrape error_message = random.choice([ "Connection timeout", "404 Not Found", "Access denied", "Invalid DOI format", "PDF download failed", "Rate limited by publisher" ]) ActivityLog.log_scraper_activity( action="scrape_paper", status="error", description=f"Failed to scrape paper: {error_message}" ) return { "success": False, "error": error_message } @celery.task def calculate_papers_for_current_hour(): """ Calculate how many papers should be downloaded in the current hour based on schedule configuration. Returns: int: Number of papers to download this hour """ current_hour = datetime.now().hour # Get volume configuration volume_config = VolumeConfig.query.first() if not volume_config: volume_config = VolumeConfig(volume=100) # Default to 100 papers per day db.session.add(volume_config) db.session.commit() # Get all schedule configurations to calculate total weight schedule_configs = ScheduleConfig.query.all() if not schedule_configs: # If no schedule configs, create default with equal weights for hour in range(24): config = ScheduleConfig(hour=hour, weight=1.0) db.session.add(config) db.session.commit() schedule_configs = ScheduleConfig.query.all() # Calculate total weight across all hours total_weight = sum(config.weight for config in schedule_configs) # Find the weight for the current hour current_hour_config = ScheduleConfig.query.get(current_hour) if not current_hour_config: # Create config for current hour if it doesn't exist current_hour_config = ScheduleConfig(hour=current_hour, weight=1.0) db.session.add(current_hour_config) db.session.commit() # Calculate papers for current hour: (hour_weight / total_weight) * daily_volume if total_weight > 0: weight_ratio = current_hour_config.weight / total_weight papers_this_hour = math.floor(weight_ratio * volume_config.volume) else: papers_this_hour = 0 return papers_this_hour @celery.task def dummy_scheduled_scraper(): """ The main scheduler task that runs every hour to process papers according to the configured schedule. """ global SCRAPER_ACTIVE, SCRAPER_PAUSED if not SCRAPER_ACTIVE or SCRAPER_PAUSED: ActivityLog.log_scraper_activity( action="scheduled_scraping", status="info", description=f"Scheduled scraping skipped: active={SCRAPER_ACTIVE}, paused={SCRAPER_PAUSED}" ) return False # Calculate how many papers to download this hour papers_to_download = calculate_papers_for_current_hour() if papers_to_download <= 0: ActivityLog.log_scraper_activity( action="scheduled_scraping", status="info", description=f"No papers scheduled for current hour" ) return True # Get all pending papers pending_papers = PaperMetadata.query.filter_by(status="Pending").all() # If no pending papers available, create some dummy pending papers if not pending_papers: ActivityLog.log_scraper_activity( action="scheduled_scraping", status="info", description=f"No pending papers found - creating {papers_to_download} dummy pending papers" ) # Create dummy pending papers for i in range(papers_to_download): new_paper = PaperMetadata( title=f"Dummy Pending Paper {random.randint(1000, 9999)}", doi=f"10.1234/dummy-pending.{random.randint(1000, 9999)}", journal=random.choice([ "Nature", "Science", "PLOS ONE", "Journal of Dummy Research", "Proceedings of the Dummy Society", "Cell", "Dummy Review Letters" ]), type="article", language="en", published_online=datetime.now().date(), status="Pending" ) db.session.add(new_paper) db.session.commit() # Get the newly created papers pending_papers = PaperMetadata.query.filter_by(status="Pending").all() # Select papers_to_download random papers from pending_papers selected_papers = random.sample( pending_papers, min(papers_to_download, len(pending_papers)) ) ActivityLog.log_scraper_activity( action="scheduled_scraping", status="info", description=f"Starting scheduled scraping of {len(selected_papers)} papers for hour {datetime.now().hour}" ) # For each paper, schedule it to run at a random time within the hour current_time = time.time() one_hour_in_seconds = 3600 for paper in selected_papers: # Random delay within this hour (0 to 60 minutes) random_delay = random.randint(0, one_hour_in_seconds) # Schedule the dummy_process_paper task with the random delay dummy_process_paper.apply_async( args=[paper.id], countdown=random_delay ) return True @celery.task(bind=True) def dummy_process_paper(self, paper_id): """ Process a single paper for the dummy scraper. Args: paper_id (int): ID of the paper to process """ # Get the paper from database paper = PaperMetadata.query.get(paper_id) if not paper: # Log error if paper not found ActivityLog.log_scraper_activity( action="process_paper", status="error", description=f"Paper with ID {paper_id} not found" ) return False # Simulate random success/failure (70% success rate) success = random.random() < 0.7 # Simulate processing time (1-5 seconds) process_time = random.uniform(1, 5) time.sleep(process_time) if success: # Update paper status to "Done" paper.status = "Done" paper.file_path = f"/path/to/dummy/papers/{paper.doi.replace('/', '_')}.pdf" # Log success ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="success", description=f"Successfully processed paper: {paper.title}" ) else: # Update paper status to "Failed" paper.status = "Failed" # Generate random error message error_message = random.choice([ "Publisher website unavailable", "No PDF download link found", "Access restricted", "Download timeout", "Invalid DOI", "Rate limited by publisher" ]) paper.error_msg = error_message # Log failure ActivityLog.log_scraper_activity( action="process_paper", paper_id=paper.id, status="error", description=f"Failed to process paper: {error_message}" ) # Update the timestamp paper.updated_at = datetime.utcnow() # Commit changes to database db.session.commit() return success