"""Upload functionality for paper metadata.""" import codecs import csv import datetime import traceback from io import StringIO, BytesIO import json import uuid from typing import Dict, Any import pandas as pd from flask import ( Blueprint, flash, jsonify, redirect, render_template, request, send_file, session, url_for, current_app ) from ..db import db from ..models import PaperMetadata, ActivityLog from ..defaults import DUPLICATE_STRATEGIES bp = Blueprint("upload", __name__) REQUIRED_COLUMNS = {"alternative_id", "journal", "doi", "issn", "title"} CHUNK_SIZE = 100 # Number of rows to process per batch # Store task progress in memory (for simplicity) # In production, you might want to use Redis or database task_progress = {} def parse_date(date_str): """Parse date string into datetime object.""" if not date_str or pd.isna(date_str): return None try: return datetime.datetime.strptime(date_str, "%Y-%m-%d") except ValueError: return None def _process_csv_background(task_id: str, file_content: str, delimiter: str, duplicate_strategy: str): """Background function to process CSV file using APScheduler.""" print(f"DEBUG: _process_csv_background called with task_id: {task_id}") # Get Flask app for context from flask import current_app # Get the Flask app from the scheduler context from ..scheduler import _get_flask_app app = _get_flask_app() print(f"DEBUG: Flask app obtained: {app}") if not app: # Fallback: try to get current_app try: app = current_app print(f"DEBUG: Using current_app: {app}") except RuntimeError as e: print(f"DEBUG: Failed to get current_app: {e}") task_progress[task_id] = { "state": "FAILURE", "progress": 0, "error": "Flask app context not available" } return with app.app_context(): try: print(f"DEBUG: Inside app context, starting CSV processing for task {task_id}") # Initialize progress task_progress[task_id] = { "state": "PROGRESS", "progress": 0, "message": "Starting CSV processing..." } result = process_csv(file_content, delimiter, duplicate_strategy, task_id) print(f"DEBUG: CSV processing completed for task {task_id}, result: {result}") # Mark as completed task_progress[task_id] = { "state": "SUCCESS", "progress": 100, "result": result } except Exception as e: print(f"DEBUG: Exception in _process_csv_background: {e}") import traceback traceback.print_exc() # Mark as failed task_progress[task_id] = { "state": "FAILURE", "progress": 0, "error": str(e) } try: ActivityLog.log_error( error_message=f"Background CSV processing failed: {str(e)}", source="upload._process_csv_background" ) except Exception: # If logging fails, just print the error print(f"Background CSV processing failed: {str(e)}") @bp.route("/", methods=["GET", "POST"]) def upload(): if request.method == "POST": file = request.files.get("file") delimiter = request.form.get("delimiter", ",") duplicate_strategy = request.form.get("duplicate_strategy", "skip") if not file: return jsonify({"error": "No file selected."}) stream = codecs.iterdecode(file.stream, "utf-8") content = "".join(stream) # Generate task ID task_id = str(uuid.uuid4()) # Get the APScheduler instance from the global variable from ..scheduler import _scheduler if not _scheduler: return jsonify({"error": "APScheduler not initialized."}) if not _scheduler.running: return jsonify({"error": "APScheduler not running."}) # Initialize task progress immediately task_progress[task_id] = { "state": "PENDING", "progress": 0, "message": "Task queued for processing..." } # Schedule background task job_id = f"csv_upload_{task_id}" # Use UTC time to match APScheduler's timezone configuration run_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=1) # Start in 1 second try: _scheduler.add_job( func=_process_csv_background, trigger='date', run_date=run_time, args=[task_id, content, delimiter, duplicate_strategy], id=job_id, name=f"CSV Upload {task_id}", replace_existing=True ) ActivityLog.log_import_activity( action="schedule_csv_upload", status="info", description=f"Scheduled CSV upload task {task_id}", task_id=task_id ) except Exception as e: task_progress[task_id] = { "state": "FAILURE", "progress": 0, "error": f"Failed to schedule task: {str(e)}" } return jsonify({"error": f"Failed to schedule background task: {str(e)}"}) return jsonify({"task_id": task_id}) return render_template("upload.html.jinja", duplicate_strategies=DUPLICATE_STRATEGIES) def process_csv(file_content, delimiter, duplicate_strategy="skip", task_id=None): """Process CSV file and import paper metadata.""" added_count = skipped_count = updated_count = error_count = 0 errors = [] skipped_records = [] # Add this to track skipped records try: # Update task progress if provided if task_id: task_progress[task_id] = { "state": "PROGRESS", "progress": 10, "message": "Starting CSV import..." } # Log the start of import using ActivityLog model ActivityLog.log_import_activity( action="start_csv_import", status="processing", description=f"Starting CSV import with strategy: {duplicate_strategy}", file_size=len(file_content), delimiter=delimiter ) # Read CSV into chunks csv_buffer = StringIO(file_content) # Count total chunks csv_buffer.seek(0) total_chunks = len(list(pd.read_csv(csv_buffer, delimiter=delimiter, chunksize=CHUNK_SIZE))) csv_buffer.seek(0) # Process each chunk of rows for chunk_idx, chunk in enumerate(pd.read_csv(csv_buffer, delimiter=delimiter, chunksize=CHUNK_SIZE)): for index, row in chunk.iterrows(): try: doi = str(row.get("doi", "N/A")) # Validate required fields if pd.isna(row.get("title")) or pd.isna(row.get("doi")) or pd.isna(row.get("issn")): raise ValueError("Missing required fields") # Try finding an existing record based on DOI existing = db.session.query(PaperMetadata).filter_by(doi=doi).first() if existing: if duplicate_strategy == "update": existing.title = row["title"] existing.alt_id = row.get("alternative_id") existing.issn = row["issn"] existing.journal = row.get("journal") existing.published_online = parse_date(row.get("published_online")) updated_count += 1 else: # Track why this record was skipped skipped_records.append({ "row": index + 2, "doi": doi, "reason": f"Duplicate DOI found and strategy is '{duplicate_strategy}'" }) skipped_count += 1 continue else: paper = PaperMetadata( title=row.get("title"), doi=row.get("doi"), alt_id=row.get("alt_id") or row.get("alternative_id"), # Handle both column names issn=row.get("issn"), journal=row.get("journal"), published_online=parse_date(row.get("published_online")), status="New" ) db.session.add(paper) added_count += 1 except Exception as e: error_count += 1 errors.append({"row": index + 2, "doi": row.get("doi", "N/A"), "error": str(e)}) # Commit the chunk and roll session fresh db.session.commit() # Update progress if task_id: progress = min(90, 10 + int((chunk_idx + 1) * 80 / total_chunks)) task_progress[task_id] = { "state": "PROGRESS", "progress": progress, "message": f"Processed {chunk_idx+1}/{total_chunks} chunks" } # Log periodic progress every 5 chunks if (chunk_idx + 1) % 5 == 0: ActivityLog.log_import_activity( action="import_progress", status="processing", description=f"Processed {chunk_idx+1}/{total_chunks} chunks", current_stats={ "added": added_count, "updated": updated_count, "skipped": skipped_count, "errors": error_count } ) # Final progress update and completion log if task_id: task_progress[task_id] = { "state": "PROGRESS", "progress": 100, "message": "Finalizing import..." } ActivityLog.log_import_activity( action="complete_csv_import", status="success", description="CSV import completed", stats={ "added": added_count, "updated": updated_count, "skipped": skipped_count, "errors": error_count } ) except Exception as e: db.session.rollback() if task_id: task_progress[task_id] = { "state": "FAILURE", "progress": 0, "error": str(e) } ActivityLog.log_error( error_message="CSV import failed", exception=e, severity="error", source="upload.process_csv" ) return {'error': str(e), 'progress': 0} finally: db.session.remove() # If there were errors, store an error CSV for potential download if errors: try: error_csv = StringIO() writer = csv.DictWriter(error_csv, fieldnames=["row", "doi", "error"]) writer.writeheader() writer.writerows(errors) ActivityLog.log_import_activity( action="import_errors", status="error", description=f"Import completed with {error_count} errors", error_csv=error_csv.getvalue(), task_id=task_id, error_count=error_count ) except Exception: # Do not fail the task if error logging fails pass # Update the return value to include skipped records information return { "added": added_count, "updated": updated_count, "skipped": skipped_count, "skipped_records": skipped_records[:5], # Include up to 5 examples "skipped_reason_summary": "Records were skipped because they already exist in the database. Use 'update' strategy to update them.", "errors": errors[:5], "error_count": error_count } @bp.route("/task_status/") def task_status(task_id): """Get status of background task.""" progress_data = task_progress.get(task_id) if not progress_data: return jsonify({"error": "Task not found."}) return jsonify(progress_data) @bp.route("/download_error_log/") def download_error_log(task_id): # Find the most recent error log for this task error_log = ActivityLog.query.filter( ActivityLog.action == "import_errors" ).order_by(ActivityLog.timestamp.desc()).first() if not error_log: flash("No error data available.") return redirect(url_for("upload.upload")) # Get the CSV data from extra_data extra_data = error_log.get_extra_data() error_csv = extra_data.get("error_csv") if not error_csv: flash("Error data format is invalid.") return redirect(url_for("upload.upload")) buffer = StringIO(error_csv) return send_file( BytesIO(buffer.getvalue().encode()), # Corrected to use BytesIO mimetype="text/csv", as_attachment=True, download_name=f"upload_errors_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" )