387 lines
14 KiB
Python

"""Upload functionality for paper metadata."""
import codecs
import csv
import datetime
import traceback
from io import StringIO, BytesIO
import json
import uuid
from typing import Dict, Any
import pandas as pd
from flask import (
Blueprint,
flash,
jsonify,
redirect,
render_template,
request,
send_file,
session,
url_for,
current_app
)
from ..db import db
from ..models import PaperMetadata, ActivityLog
from ..defaults import DUPLICATE_STRATEGIES
bp = Blueprint("upload", __name__)
REQUIRED_COLUMNS = {"alternative_id", "journal", "doi", "issn", "title"}
CHUNK_SIZE = 100 # Number of rows to process per batch
# Store task progress in memory (for simplicity)
# In production, you might want to use Redis or database
task_progress = {}
def parse_date(date_str):
"""Parse date string into datetime object."""
if not date_str or pd.isna(date_str):
return None
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
return None
def _process_csv_background(task_id: str, file_content: str, delimiter: str, duplicate_strategy: str):
"""Background function to process CSV file using APScheduler."""
print(f"DEBUG: _process_csv_background called with task_id: {task_id}")
# Get Flask app for context
from flask import current_app
# Get the Flask app from the scheduler context
from ..scheduler import _get_flask_app
app = _get_flask_app()
print(f"DEBUG: Flask app obtained: {app}")
if not app:
# Fallback: try to get current_app
try:
app = current_app
print(f"DEBUG: Using current_app: {app}")
except RuntimeError as e:
print(f"DEBUG: Failed to get current_app: {e}")
task_progress[task_id] = {
"state": "FAILURE",
"progress": 0,
"error": "Flask app context not available"
}
return
with app.app_context():
try:
print(f"DEBUG: Inside app context, starting CSV processing for task {task_id}")
# Initialize progress
task_progress[task_id] = {
"state": "PROGRESS",
"progress": 0,
"message": "Starting CSV processing..."
}
result = process_csv(file_content, delimiter, duplicate_strategy, task_id)
print(f"DEBUG: CSV processing completed for task {task_id}, result: {result}")
# Mark as completed
task_progress[task_id] = {
"state": "SUCCESS",
"progress": 100,
"result": result
}
except Exception as e:
print(f"DEBUG: Exception in _process_csv_background: {e}")
import traceback
traceback.print_exc()
# Mark as failed
task_progress[task_id] = {
"state": "FAILURE",
"progress": 0,
"error": str(e)
}
try:
ActivityLog.log_error(
error_message=f"Background CSV processing failed: {str(e)}",
source="upload._process_csv_background"
)
except Exception:
# If logging fails, just print the error
print(f"Background CSV processing failed: {str(e)}")
@bp.route("/", methods=["GET", "POST"])
def upload():
if request.method == "POST":
file = request.files.get("file")
delimiter = request.form.get("delimiter", ",")
duplicate_strategy = request.form.get("duplicate_strategy", "skip")
if not file:
return jsonify({"error": "No file selected."})
stream = codecs.iterdecode(file.stream, "utf-8")
content = "".join(stream)
# Generate task ID
task_id = str(uuid.uuid4())
# Get the APScheduler instance from the global variable
from ..scheduler import _scheduler
if not _scheduler:
return jsonify({"error": "APScheduler not initialized."})
if not _scheduler.running:
return jsonify({"error": "APScheduler not running."})
# Initialize task progress immediately
task_progress[task_id] = {
"state": "PENDING",
"progress": 0,
"message": "Task queued for processing..."
}
# Schedule background task
job_id = f"csv_upload_{task_id}"
# Use UTC time to match APScheduler's timezone configuration
run_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=1) # Start in 1 second
try:
_scheduler.add_job(
func=_process_csv_background,
trigger='date',
run_date=run_time,
args=[task_id, content, delimiter, duplicate_strategy],
id=job_id,
name=f"CSV Upload {task_id}",
replace_existing=True
)
ActivityLog.log_import_activity(
action="schedule_csv_upload",
status="info",
description=f"Scheduled CSV upload task {task_id}",
task_id=task_id
)
except Exception as e:
task_progress[task_id] = {
"state": "FAILURE",
"progress": 0,
"error": f"Failed to schedule task: {str(e)}"
}
return jsonify({"error": f"Failed to schedule background task: {str(e)}"})
return jsonify({"task_id": task_id})
return render_template("upload.html.jinja", duplicate_strategies=DUPLICATE_STRATEGIES)
def process_csv(file_content, delimiter, duplicate_strategy="skip", task_id=None):
"""Process CSV file and import paper metadata."""
added_count = skipped_count = updated_count = error_count = 0
errors = []
skipped_records = [] # Add this to track skipped records
try:
# Update task progress if provided
if task_id:
task_progress[task_id] = {
"state": "PROGRESS",
"progress": 10,
"message": "Starting CSV import..."
}
# Log the start of import using ActivityLog model
ActivityLog.log_import_activity(
action="start_csv_import",
status="processing",
description=f"Starting CSV import with strategy: {duplicate_strategy}",
file_size=len(file_content),
delimiter=delimiter
)
# Read CSV into chunks
csv_buffer = StringIO(file_content)
# Count total chunks
csv_buffer.seek(0)
total_chunks = len(list(pd.read_csv(csv_buffer, delimiter=delimiter, chunksize=CHUNK_SIZE)))
csv_buffer.seek(0)
# Process each chunk of rows
for chunk_idx, chunk in enumerate(pd.read_csv(csv_buffer, delimiter=delimiter, chunksize=CHUNK_SIZE)):
for index, row in chunk.iterrows():
try:
doi = str(row.get("doi", "N/A"))
# Validate required fields
if pd.isna(row.get("title")) or pd.isna(row.get("doi")) or pd.isna(row.get("issn")):
raise ValueError("Missing required fields")
# Try finding an existing record based on DOI
existing = db.session.query(PaperMetadata).filter_by(doi=doi).first()
if existing:
if duplicate_strategy == "update":
existing.title = row["title"]
existing.alt_id = row.get("alternative_id")
existing.issn = row["issn"]
existing.journal = row.get("journal")
existing.published_online = parse_date(row.get("published_online"))
updated_count += 1
else:
# Track why this record was skipped
skipped_records.append({
"row": index + 2,
"doi": doi,
"reason": f"Duplicate DOI found and strategy is '{duplicate_strategy}'"
})
skipped_count += 1
continue
else:
paper = PaperMetadata(
title=row.get("title"),
doi=row.get("doi"),
alt_id=row.get("alt_id") or row.get("alternative_id"), # Handle both column names
issn=row.get("issn"),
journal=row.get("journal"),
published_online=parse_date(row.get("published_online")),
status="New"
)
db.session.add(paper)
added_count += 1
except Exception as e:
error_count += 1
errors.append({"row": index + 2, "doi": row.get("doi", "N/A"), "error": str(e)})
# Commit the chunk and roll session fresh
db.session.commit()
# Update progress
if task_id:
progress = min(90, 10 + int((chunk_idx + 1) * 80 / total_chunks))
task_progress[task_id] = {
"state": "PROGRESS",
"progress": progress,
"message": f"Processed {chunk_idx+1}/{total_chunks} chunks"
}
# Log periodic progress every 5 chunks
if (chunk_idx + 1) % 5 == 0:
ActivityLog.log_import_activity(
action="import_progress",
status="processing",
description=f"Processed {chunk_idx+1}/{total_chunks} chunks",
current_stats={
"added": added_count,
"updated": updated_count,
"skipped": skipped_count,
"errors": error_count
}
)
# Final progress update and completion log
if task_id:
task_progress[task_id] = {
"state": "PROGRESS",
"progress": 100,
"message": "Finalizing import..."
}
ActivityLog.log_import_activity(
action="complete_csv_import",
status="success",
description="CSV import completed",
stats={
"added": added_count,
"updated": updated_count,
"skipped": skipped_count,
"errors": error_count
}
)
except Exception as e:
db.session.rollback()
if task_id:
task_progress[task_id] = {
"state": "FAILURE",
"progress": 0,
"error": str(e)
}
ActivityLog.log_error(
error_message="CSV import failed",
exception=e,
severity="error",
source="upload.process_csv"
)
return {'error': str(e), 'progress': 0}
finally:
db.session.remove()
# If there were errors, store an error CSV for potential download
if errors:
try:
error_csv = StringIO()
writer = csv.DictWriter(error_csv, fieldnames=["row", "doi", "error"])
writer.writeheader()
writer.writerows(errors)
ActivityLog.log_import_activity(
action="import_errors",
status="error",
description=f"Import completed with {error_count} errors",
error_csv=error_csv.getvalue(),
task_id=task_id,
error_count=error_count
)
except Exception:
# Do not fail the task if error logging fails
pass
# Update the return value to include skipped records information
return {
"added": added_count,
"updated": updated_count,
"skipped": skipped_count,
"skipped_records": skipped_records[:5], # Include up to 5 examples
"skipped_reason_summary": "Records were skipped because they already exist in the database. Use 'update' strategy to update them.",
"errors": errors[:5],
"error_count": error_count
}
@bp.route("/task_status/<task_id>")
def task_status(task_id):
"""Get status of background task."""
progress_data = task_progress.get(task_id)
if not progress_data:
return jsonify({"error": "Task not found."})
return jsonify(progress_data)
@bp.route("/download_error_log/<task_id>")
def download_error_log(task_id):
# Find the most recent error log for this task
error_log = ActivityLog.query.filter(
ActivityLog.action == "import_errors"
).order_by(ActivityLog.timestamp.desc()).first()
if not error_log:
flash("No error data available.")
return redirect(url_for("upload.upload"))
# Get the CSV data from extra_data
extra_data = error_log.get_extra_data()
error_csv = extra_data.get("error_csv")
if not error_csv:
flash("Error data format is invalid.")
return redirect(url_for("upload.upload"))
buffer = StringIO(error_csv)
return send_file(
BytesIO(buffer.getvalue().encode()), # Corrected to use BytesIO
mimetype="text/csv",
as_attachment=True,
download_name=f"upload_errors_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)