52 lines
2.0 KiB
Python
52 lines
2.0 KiB
Python
from celery import Celery
|
|
from celery.schedules import crontab
|
|
|
|
# Create Celery instance without Flask app initially
|
|
celery = Celery(
|
|
'scipaperloader',
|
|
broker='redis://localhost:6379/0',
|
|
backend='redis://localhost:6379/0',
|
|
)
|
|
|
|
def configure_celery(app=None):
|
|
"""Configure Celery with the Flask app settings and ensure tasks run in the app context."""
|
|
if app is None:
|
|
# Import here to avoid circular import
|
|
from scipaperloader import create_app
|
|
app = create_app()
|
|
|
|
# Update Celery configuration using the app settings
|
|
celery.conf.update(
|
|
broker_url=app.config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
|
|
result_backend=app.config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
|
|
task_serializer='json',
|
|
accept_content=['json'],
|
|
result_serializer='json',
|
|
timezone='UTC',
|
|
enable_utc=True,
|
|
task_time_limit=3600, # 1 hour max runtime
|
|
task_soft_time_limit=3000, # 50 minutes soft limit
|
|
worker_max_tasks_per_child=10, # Restart workers after 10 tasks
|
|
worker_max_memory_per_child=1000000, # 1GB memory limit
|
|
task_acks_late=True, # Acknowledge tasks after completion
|
|
task_reject_on_worker_lost=True, # Requeue tasks if worker dies
|
|
# Configure Beat schedule for periodic tasks
|
|
beat_schedule={
|
|
'scheduled-scraper-hourly': {
|
|
'task': 'scipaperloader.blueprints.scraper.dummy_scheduled_scraper',
|
|
'schedule': crontab(minute=0), # Run at the start of every hour
|
|
'options': {'expires': 3600}
|
|
},
|
|
}
|
|
)
|
|
|
|
# Create a custom task class that pushes the Flask application context
|
|
class ContextTask(celery.Task):
|
|
abstract = True
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
with app.app_context():
|
|
return self.run(*args, **kwargs)
|
|
|
|
celery.Task = ContextTask
|
|
return celery |