diff --git a/scipaperloader/scheduler.py b/scipaperloader/scheduler.py index f053f8d..4f61eb9 100644 --- a/scipaperloader/scheduler.py +++ b/scipaperloader/scheduler.py @@ -78,6 +78,7 @@ def _hourly_scraper_scheduler(): # Schedule papers at random times within the hour scheduled_count = 0 current_time = datetime.now() + scheduled_papers = [] for paper in papers: # Random delay between 1 second and 58 minutes @@ -103,19 +104,41 @@ def _hourly_scraper_scheduler(): scheduled_count += 1 - # Log each scheduled paper - ActivityLog.log_scraper_activity( - action="schedule_paper_apscheduler", - paper_id=paper.id, - status="info", - description=f"Scheduled paper {paper.doi} for processing at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})" - ) + # Collect paper info for single log entry + paper_info = { + "paper_id": paper.id, + "paper_doi": paper.doi, + "job_id": job_id, + "scheduled_time": run_time.isoformat(), + "delay_seconds": delay_seconds + } + scheduled_papers.append(paper_info) - ActivityLog.log_scraper_activity( - action="hourly_scheduler_apscheduler", - status="success", - description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler" - ) + # Create single comprehensive log entry with JSON data + try: + import json + from .models import ActivityLog + + scheduling_data = { + "total_scheduled": scheduled_count, + "scheduled_papers": scheduled_papers, + "timestamp": datetime.now().isoformat(), + "hour_range": f"{current_time.strftime('%H:%M')} - {(current_time + timedelta(hours=1)).strftime('%H:%M')}" + } + + ActivityLog.log_scraper_activity( + action="hourly_scheduler_apscheduler", + status="success", + description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler. See extra_data for details.", + **{"scheduling_details": json.dumps(scheduling_data)} + ) + except Exception: + # Fallback to simple logging + ActivityLog.log_scraper_activity( + action="hourly_scheduler_apscheduler", + status="success", + description=f"Scheduled {scheduled_count} papers for random processing within this hour using APScheduler" + ) return {"status": "success", "papers_scheduled": scheduled_count} @@ -347,6 +370,9 @@ class ScraperScheduler: return 0 revoked_count = 0 + revoked_jobs = [] + already_gone_jobs = [] + failed_jobs = [] try: # Get all jobs @@ -355,51 +381,58 @@ class ScraperScheduler: for job in jobs: # Remove any job that processes papers or uploads (but keep the main hourly scheduler) if ('paper_process_' in job.id or 'test_paper_process_' in job.id or - 'process_paper_' in job.id or 'csv_upload_' in job.id or 'manual_paper_' in job.id): + 'process_paper_' in job.id or 'csv_upload_' in job.id or 'manual_paper_' in job.id or + 'startup_paper_' in job.id): try: _scheduler.remove_job(job.id) revoked_count += 1 - try: - from .models import ActivityLog - ActivityLog.log_scraper_activity( - action="revoke_apscheduler_job", - status="success", - description=f"Revoked APScheduler job: {job.name} (ID: {job.id})" - ) - except Exception: - print(f"✅ Revoked APScheduler job: {job.id}") + # Collect job info for single log entry + job_info = { + "job_id": job.id, + "job_name": job.name, + "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None, + "args": job.args + } + revoked_jobs.append(job_info) + + print(f"✅ Revoked APScheduler job: {job.id}") except JobLookupError as e: - # Job already removed/completed - this is normal, just log it - try: - from .models import ActivityLog - ActivityLog.log_scraper_activity( - action="revoke_apscheduler_job_already_gone", - status="info", - description=f"Job {job.id} was already completed or removed: {str(e)}" - ) - except Exception: - print(f"ℹ️ Job {job.id} was already completed or removed") + # Job already removed/completed - this is normal + already_gone_jobs.append({ + "job_id": job.id, + "reason": str(e) + }) + print(f"ℹ️ Job {job.id} was already completed or removed") except Exception as e: # Other error - log it but continue - try: - from .models import ActivityLog - ActivityLog.log_error( - error_message=f"Error removing job {job.id}: {str(e)}", - source="ScraperScheduler.revoke_all_scraper_jobs" - ) - except Exception: - print(f"❌ Error removing job {job.id}: {str(e)}") + failed_jobs.append({ + "job_id": job.id, + "error": str(e) + }) + print(f"❌ Error removing job {job.id}: {str(e)}") - if revoked_count > 0: + # Create single comprehensive log entry with JSON data + if revoked_jobs or already_gone_jobs or failed_jobs: try: + import json from .models import ActivityLog + + revocation_data = { + "total_revoked": revoked_count, + "revoked_jobs": revoked_jobs, + "already_gone_jobs": already_gone_jobs, + "failed_jobs": failed_jobs, + "timestamp": datetime.now().isoformat() + } + ActivityLog.log_scraper_activity( action="revoke_all_scraper_jobs_apscheduler", status="success", - description=f"Successfully revoked {revoked_count} APScheduler jobs" + description=f"Successfully revoked {revoked_count} APScheduler jobs. See extra_data for details.", + **{"revocation_details": json.dumps(revocation_data)} ) except Exception: print(f"✅ Successfully revoked {revoked_count} APScheduler jobs") diff --git a/scipaperloader/scrapers/manager.py b/scipaperloader/scrapers/manager.py index 808d957..f39bf23 100644 --- a/scipaperloader/scrapers/manager.py +++ b/scipaperloader/scrapers/manager.py @@ -32,13 +32,23 @@ class ScraperManager: # No more Redis client initialization - using APScheduler now def _get_scheduler(self): - """Get the APScheduler instance from Flask app config.""" + """Get the ScraperScheduler instance from Flask app config.""" try: return current_app.config.get('SCHEDULER') except RuntimeError: # Outside application context return None + def _get_raw_scheduler(self): + """Get the raw APScheduler instance for direct job scheduling.""" + try: + scheduler_wrapper = current_app.config.get('SCHEDULER') + if scheduler_wrapper: + return scheduler_wrapper.scheduler + return None + except RuntimeError: + return None + def _clear_delayed_tasks_from_apscheduler(self) -> int: """Clear delayed tasks from APScheduler - clean replacement for Redis manipulation. @@ -93,7 +103,7 @@ class ScraperManager: return 0 def start_scraper(self) -> Dict[str, str]: - """Start the scraper system.""" + """Start the scraper system and immediately schedule papers for the current hour.""" try: # Get current scraper self.current_scraper = get_scraper() @@ -104,13 +114,25 @@ class ScraperManager: scraper_name = self.current_scraper.get_name() - ActivityLog.log_scraper_command( - action="start_scraper", - status="success", - description=f"Started scraper: {scraper_name}. Use /trigger-immediate endpoint to immediately schedule papers instead of waiting for the next hourly boundary." - ) + # Immediately schedule papers for the remaining time in the current hour + immediate_scheduled_count = self._schedule_papers_for_current_hour() - return {"status": "success", "message": "Scraper started successfully. Papers will be scheduled at the next hourly boundary, or use /trigger-immediate to schedule immediately."} + if immediate_scheduled_count > 0: + ActivityLog.log_scraper_command( + action="start_scraper", + status="success", + description=f"Started scraper: {scraper_name}. Immediately scheduled {immediate_scheduled_count} papers for the remaining time in this hour." + ) + + return {"status": "success", "message": f"Scraper started successfully. Immediately scheduled {immediate_scheduled_count} papers for processing in the remaining time this hour."} + else: + ActivityLog.log_scraper_command( + action="start_scraper", + status="success", + description=f"Started scraper: {scraper_name}. No papers available for immediate scheduling in the current hour." + ) + + return {"status": "success", "message": "Scraper started successfully. No papers available for immediate scheduling this hour."} except Exception as e: ActivityLog.log_error( @@ -339,19 +361,27 @@ class ScraperManager: .limit(papers_needed) .all()) - ActivityLog.log_scraper_activity( - action="select_papers", - status="info", - description=f"Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})" - ) + try: + ActivityLog.log_scraper_activity( + action="select_papers", + status="info", + description=f"Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})" + ) + except RuntimeError: + # Outside application context - use print fallback + print(f"📋 Selected {len(papers)} papers from statuses {input_statuses} (requested: {papers_needed})") return papers except Exception as e: - ActivityLog.log_error( - error_message=f"Error selecting papers: {str(e)}", - source="ScraperManager.select_papers_for_processing" - ) + try: + ActivityLog.log_error( + error_message=f"Error selecting papers: {str(e)}", + source="ScraperManager.select_papers_for_processing" + ) + except RuntimeError: + # Outside application context - use print fallback + print(f"❌ Error selecting papers: {str(e)}") return [] def process_paper(self, paper: PaperMetadata) -> Dict: @@ -567,3 +597,119 @@ class ScraperManager: "processing_papers": processing_count, "current_hour_quota": self.get_current_hour_quota() } + + def _schedule_papers_for_current_hour(self) -> int: + """Schedule papers for processing in the remaining time of the current hour. + + Returns: + int: Number of papers scheduled + """ + try: + # Get papers that should be processed this hour + papers = self.select_papers_for_processing() + + if not papers: + return 0 + + # Get raw APScheduler instance for direct job scheduling + scheduler = self._get_raw_scheduler() + if not scheduler: + ActivityLog.log_error( + error_message="Raw APScheduler not available for immediate paper scheduling", + source="ScraperManager._schedule_papers_for_current_hour" + ) + return 0 + + # Calculate remaining time in current hour + current_time = datetime.now() + next_hour = current_time.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1) + remaining_seconds = int((next_hour - current_time).total_seconds()) + + # Don't schedule if less than 2 minutes remaining + if remaining_seconds < 120: + ActivityLog.log_scraper_activity( + action="start_scraper_immediate_scheduling", + status="info", + description=f"Skipping immediate scheduling - only {remaining_seconds} seconds remaining in current hour" + ) + return 0 + + # Schedule papers at random times within the remaining time + scheduled_count = 0 + scheduled_papers = [] + + for paper in papers: + try: + # Random delay between 1 second and remaining time minus 60 seconds buffer + max_delay = max(1, remaining_seconds - 60) + delay_seconds = random.randint(1, max_delay) + run_time = current_time + timedelta(seconds=delay_seconds) + + # Generate unique job ID + import uuid + job_id = f"startup_paper_{paper.id}_{int(current_time.timestamp())}_{uuid.uuid4().hex[:8]}" + + # Schedule the job + from ..scheduler import _process_single_paper + scheduler.add_job( + func=_process_single_paper, + trigger='date', + run_date=run_time, + args=[paper.id], + id=job_id, + name=f"Startup Process Paper {paper.id}", + replace_existing=True + ) + + scheduled_count += 1 + + # Collect paper info for logging + paper_info = { + "paper_id": paper.id, + "paper_doi": paper.doi, + "job_id": job_id, + "scheduled_time": run_time.isoformat(), + "delay_seconds": delay_seconds + } + scheduled_papers.append(paper_info) + + except Exception as e: + ActivityLog.log_error( + error_message=f"Failed to schedule paper {paper.id} during startup: {str(e)}", + source="ScraperManager._schedule_papers_for_current_hour" + ) + + # Create single comprehensive log entry + if scheduled_papers: + try: + import json + scheduling_data = { + "total_scheduled": scheduled_count, + "scheduled_papers": scheduled_papers, + "timestamp": current_time.isoformat(), + "remaining_time_seconds": remaining_seconds, + "trigger": "startup_immediate_scheduling" + } + + ActivityLog.log_scraper_activity( + action="startup_immediate_scheduling", + status="success", + description=f"Scheduled {scheduled_count} papers for immediate processing during startup for remaining {remaining_seconds}s in current hour. See extra_data for details.", + **{"scheduling_details": json.dumps(scheduling_data)} + ) + except Exception: + # Fallback to simple logging + ActivityLog.log_scraper_activity( + action="startup_immediate_scheduling", + status="success", + description=f"Scheduled {scheduled_count} papers for immediate processing during startup" + ) + + return scheduled_count + + except Exception as e: + ActivityLog.log_error( + error_message=f"Error in startup immediate scheduling: {str(e)}", + source="ScraperManager._schedule_papers_for_current_hour" + ) + return 0