From a4eb7648d57dfd183a481cffb985dfa45eea0f4a Mon Sep 17 00:00:00 2001 From: Michael Beck Date: Wed, 11 Jun 2025 21:32:01 +0200 Subject: [PATCH] fixes scraper --- scipaperloader/blueprints/scraper.py | 17 ++- scipaperloader/scheduler.py | 139 +++++++++++++++++--- scipaperloader/scrapers/manager.py | 85 ++++++++++++ scipaperloader/scrapers/tasks.py | 36 ++++- scipaperloader/static/js/scraper-control.js | 15 ++- 5 files changed, 264 insertions(+), 28 deletions(-) diff --git a/scipaperloader/blueprints/scraper.py b/scipaperloader/blueprints/scraper.py index bc25354..b73e17b 100644 --- a/scipaperloader/blueprints/scraper.py +++ b/scipaperloader/blueprints/scraper.py @@ -365,7 +365,8 @@ def trigger_immediate_processing(): scheduled_count = 0 for paper in papers: try: - job_id = f"immediate_paper_{paper.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + import uuid + job_id = f"immediate_paper_{paper.id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}" scheduler.schedule_paper_processing(paper.id, delay_seconds=1, job_id=job_id) scheduled_count += 1 except Exception as e: @@ -544,20 +545,24 @@ def process_single_paper_endpoint(paper_id): "message": "APScheduler not available" }), 500 - # Schedule the paper for immediate processing via APScheduler - job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + # Schedule the paper for immediate manual processing via APScheduler + # Use UUID suffix to ensure unique job IDs + import uuid + job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}" try: - scheduler.schedule_paper_processing(paper_id, delay_seconds=1, job_id=job_id) + scheduler.schedule_manual_paper_processing(paper_id, scraper_name=scraper_name, delay_seconds=1, job_id=job_id) ActivityLog.log_scraper_command( action="manual_process_single", status="success", - description=f"Scheduled manual processing for paper {paper.doi} via APScheduler" + description=f"Scheduled manual processing for paper {paper.doi} via APScheduler" + + (f" using scraper '{scraper_name}'" if scraper_name else " using system default scraper") ) return jsonify({ "success": True, - "message": f"Processing scheduled for paper {paper.doi}", + "message": f"Processing scheduled for paper {paper.doi}" + + (f" using {scraper_name} scraper" if scraper_name else " using system default scraper"), "paper_id": paper_id }) except Exception as e: diff --git a/scipaperloader/scheduler.py b/scipaperloader/scheduler.py index dd71089..a351411 100644 --- a/scipaperloader/scheduler.py +++ b/scipaperloader/scheduler.py @@ -11,6 +11,7 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED +from apscheduler.jobstores.base import JobLookupError # Configure APScheduler logging logging.getLogger('apscheduler').setLevel(logging.WARNING) @@ -83,8 +84,10 @@ def _hourly_scraper_scheduler(): delay_seconds = random.randint(1, 3480) # Up to 58 minutes run_time = current_time + timedelta(seconds=delay_seconds) - # Schedule the individual paper processing job - job_id = f"process_paper_{paper.id}_{int(current_time.timestamp())}" + # Schedule the individual paper processing job with unique ID + # Include microseconds and random suffix to prevent collisions + import uuid + job_id = f"process_paper_{paper.id}_{int(current_time.timestamp())}_{uuid.uuid4().hex[:8]}" global _scheduler if _scheduler: @@ -94,7 +97,7 @@ def _hourly_scraper_scheduler(): run_date=run_time, args=[paper.id], id=job_id, - replace_existing=False, + replace_existing=True, # Changed to True to handle conflicts gracefully name=f"Process Paper {paper.doi}" ) @@ -187,6 +190,37 @@ def _process_single_paper(paper_id: int): return {"status": "error", "paper_id": paper_id, "message": str(e)} +def _process_single_paper_manual(paper_id: int, scraper_name: Optional[str] = None): + """Standalone function to process a single paper manually (bypasses scraper state checks).""" + app = _get_flask_app() + if not app: + return + + with app.app_context(): + try: + from .models import ActivityLog, PaperMetadata + + # Get the paper + paper = PaperMetadata.query.get(paper_id) + if not paper: + return {"status": "error", "message": f"Paper {paper_id} not found"} + + # Process the paper using manual method (bypasses scraper state checks) + from .scrapers.manager import ScraperManager + manager = ScraperManager() + result = manager.process_paper_manual(paper, scraper_name=scraper_name) + + return result + + except Exception as e: + from .models import ActivityLog + ActivityLog.log_error( + error_message=f"Error manually processing paper {paper_id} in APScheduler: {str(e)}", + source="_process_single_paper_manual" + ) + return {"status": "error", "paper_id": paper_id, "message": str(e)} + + def _job_listener(event): """Listen to job execution events.""" app = _get_flask_app() @@ -317,19 +351,43 @@ class ScraperScheduler: for job in jobs: # Remove any job that processes papers or uploads (but keep the main hourly scheduler) if ('paper_process_' in job.id or 'test_paper_process_' in job.id or - 'process_paper_' in job.id or 'csv_upload_' in job.id): - _scheduler.remove_job(job.id) - revoked_count += 1 - + 'process_paper_' in job.id or 'csv_upload_' in job.id or 'manual_paper_' in job.id): try: - from .models import ActivityLog - ActivityLog.log_scraper_activity( - action="revoke_apscheduler_job", - status="success", - description=f"Revoked APScheduler job: {job.name} (ID: {job.id})" - ) - except Exception: - print(f"✅ Revoked APScheduler job: {job.id}") + _scheduler.remove_job(job.id) + revoked_count += 1 + + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="revoke_apscheduler_job", + status="success", + description=f"Revoked APScheduler job: {job.name} (ID: {job.id})" + ) + except Exception: + print(f"✅ Revoked APScheduler job: {job.id}") + + except JobLookupError as e: + # Job already removed/completed - this is normal, just log it + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="revoke_apscheduler_job_already_gone", + status="info", + description=f"Job {job.id} was already completed or removed: {str(e)}" + ) + except Exception: + print(f"ℹ️ Job {job.id} was already completed or removed") + + except Exception as e: + # Other error - log it but continue + try: + from .models import ActivityLog + ActivityLog.log_error( + error_message=f"Error removing job {job.id}: {str(e)}", + source="ScraperScheduler.revoke_all_scraper_jobs" + ) + except Exception: + print(f"❌ Error removing job {job.id}: {str(e)}") if revoked_count > 0: try: @@ -418,7 +476,9 @@ class ScraperScheduler: # Generate job ID if not provided if not job_id: - job_id = f"process_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + # Use microseconds and UUID suffix to prevent collisions + import uuid + job_id = f"process_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}_{uuid.uuid4().hex[:8]}" # Calculate run time run_time = datetime.now() + timedelta(seconds=delay_seconds) @@ -447,3 +507,50 @@ class ScraperScheduler: print(f"✅ Scheduled paper {paper_id} for processing (Job ID: {job_id})") return job_id + + def schedule_manual_paper_processing(self, paper_id: int, scraper_name: Optional[str] = None, delay_seconds: int = 0, job_id: Optional[str] = None) -> str: + """ + Schedule manual paper processing that bypasses scraper state checks. + + Args: + paper_id: ID of the paper to process + scraper_name: Optional specific scraper module to use (defaults to system scraper) + delay_seconds: Delay before processing starts (default: 0) + job_id: Optional custom job ID (auto-generated if not provided) + + Returns: + Job ID of the scheduled task + """ + global _scheduler + if not _scheduler: + raise RuntimeError("APScheduler not initialized") + + if job_id is None: + job_id = f"manual_paper_{paper_id}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}" + + run_time = datetime.now() + timedelta(seconds=delay_seconds) + + # Schedule the manual processing job + job = _scheduler.add_job( + func=_process_single_paper_manual, + trigger='date', + run_date=run_time, + args=[paper_id, scraper_name], + id=job_id, + name=f"Manual Process Paper {paper_id}", + replace_existing=True + ) + + # Log the scheduling + try: + from .models import ActivityLog + ActivityLog.log_scraper_activity( + action="schedule_manual_paper_processing", + paper_id=paper_id, + status="info", + description=f"Scheduled manual processing for paper {paper_id} at {run_time.strftime('%H:%M:%S')} (Job ID: {job_id})" + ) + except Exception: + pass # Don't fail if logging fails + + return job_id diff --git a/scipaperloader/scrapers/manager.py b/scipaperloader/scrapers/manager.py index 376e635..808d957 100644 --- a/scipaperloader/scrapers/manager.py +++ b/scipaperloader/scrapers/manager.py @@ -455,6 +455,91 @@ class ScraperManager: return {"paper_id": paper.id, "status": "error", "message": str(e)} + def process_paper_manual(self, paper: PaperMetadata, scraper_name: Optional[str] = None) -> Dict: + """Process a single paper manually, bypassing scraper state checks.""" + try: + # Get scraper configuration but skip state validation for manual processing + if scraper_name: + # Use the specified scraper + import importlib + from .base import BaseScraper + try: + module = importlib.import_module(f"scipaperloader.scrapers.{scraper_name}") + scraper_cls = getattr(module, "Scraper") + if not issubclass(scraper_cls, BaseScraper): + raise TypeError(f"Scraper class in module '{scraper_name}' does not inherit from BaseScraper") + scraper = scraper_cls() + except (ImportError, AttributeError, TypeError) as e: + ActivityLog.log_error( + error_message=f"Failed to load specified scraper '{scraper_name}': {str(e)}. Falling back to system default.", + source="ScraperManager.process_paper_manual" + ) + scraper = get_scraper() + else: + # Use system default scraper + scraper = get_scraper() + + output_statuses = scraper.get_output_statuses() + + # Store the previous status before changing it + previous_status = paper.status + + # Update paper status to processing + paper.previous_status = previous_status + paper.status = output_statuses["processing"] + paper.updated_at = datetime.now(UTC) + db.session.commit() + + # Perform scraping (no state checks for manual processing) + result = scraper.scrape(paper.doi) + + # Update paper status based on result + if result.status == "success": + paper.status = output_statuses["success"] + paper.error_msg = None + if result.data and "file_path" in result.data: + paper.file_path = result.data["file_path"] + else: + paper.status = output_statuses["failure"] + paper.error_msg = result.message + + paper.updated_at = datetime.now(UTC) + db.session.commit() + + # Log result + ActivityLog.log_scraper_activity( + action="process_paper_manual", + paper_id=paper.id, + status=result.status, + description=f"Manually processed {paper.doi}: {result.message}" + ) + + return { + "paper_id": paper.id, + "status": result.status, + "message": result.message, + "duration": result.duration + } + + except Exception as e: + # Revert paper status on error + try: + input_statuses = get_scraper().get_input_statuses() + if input_statuses: + paper.status = input_statuses[0] + paper.error_msg = f"Manual processing error: {str(e)}" + paper.updated_at = datetime.now(UTC) + db.session.commit() + except: + pass # Don't fail if reversion fails + + ActivityLog.log_error( + error_message=f"Error manually processing paper {paper.id}: {str(e)}", + source="ScraperManager.process_paper_manual" + ) + + return {"paper_id": paper.id, "status": "error", "message": str(e)} + def get_status(self) -> Dict: """Get current scraper status.""" scraper_state = ScraperState.get_current_state() diff --git a/scipaperloader/scrapers/tasks.py b/scipaperloader/scrapers/tasks.py index faa599a..b796acb 100644 --- a/scipaperloader/scrapers/tasks.py +++ b/scipaperloader/scrapers/tasks.py @@ -189,15 +189,45 @@ def process_single_paper(paper_id: int): source="process_single_paper" ) return {"status": "error", "paper_id": paper_id, "message": str(e)} + + +def process_single_paper_manual(paper_id: int, scraper_name: Optional[str] = None): + """ + Process a single paper manually, bypassing scraper state checks. + Used for manual paper processing from the UI. + + Args: + paper_id: ID of the paper to process + scraper_name: Optional specific scraper module to use + """ + try: + # Get the paper without checking scraper state + paper = PaperMetadata.query.get(paper_id) + if not paper: + ActivityLog.log_error( + error_message=f"Paper {paper_id} not found for manual processing", + source="process_single_paper_manual" + ) + return {"status": "error", "message": f"Paper {paper_id} not found"} + + # Process the paper using the manual processing method (bypasses state checks) manager = ScraperManager() - result = manager.process_paper(paper) + result = manager.process_paper_manual(paper, scraper_name=scraper_name) + + ActivityLog.log_scraper_activity( + action="manual_process_complete", + paper_id=paper_id, + status=result["status"], + description=f"Manual processing completed for paper {paper.doi}" + + (f" using scraper '{scraper_name}'" if scraper_name else " using system default scraper") + ) return result except Exception as e: ActivityLog.log_error( - error_message=f"Error processing paper {paper_id}: {str(e)}", - source="process_single_paper" + error_message=f"Error manually processing paper {paper_id}: {str(e)}", + source="process_single_paper_manual" ) return {"status": "error", "paper_id": paper_id, "message": str(e)} diff --git a/scipaperloader/static/js/scraper-control.js b/scipaperloader/static/js/scraper-control.js index 410d519..614653f 100644 --- a/scipaperloader/static/js/scraper-control.js +++ b/scipaperloader/static/js/scraper-control.js @@ -120,7 +120,10 @@ class ScraperController { console.log("Start button clicked - sending request to /scraper/start"); try { - const data = await apiRequest("/scraper/start", { method: "POST" }); + const data = await apiRequest("/scraper/start", { + method: "POST", + body: JSON.stringify({}), + }); console.log("Data received:", data); if (data.success) { @@ -144,7 +147,10 @@ class ScraperController { */ async togglePauseScraper() { try { - const data = await apiRequest("/scraper/pause", { method: "POST" }); + const data = await apiRequest("/scraper/pause", { + method: "POST", + body: JSON.stringify({}), + }); if (data.success) { showFlashMessage(data.message, "info"); @@ -166,7 +172,10 @@ class ScraperController { */ async stopScraper() { try { - const data = await apiRequest("/scraper/stop", { method: "POST" }); + const data = await apiRequest("/scraper/stop", { + method: "POST", + body: JSON.stringify({}), + }); if (data.success) { showFlashMessage("Scraper stopped successfully", "warning");