94 lines
2.9 KiB
Python
94 lines
2.9 KiB
Python
from celery import Celery
|
|
from app.models import Scraper
|
|
import redis
|
|
from datetime import timedelta
|
|
from flask import current_app
|
|
|
|
def create_celery():
|
|
celery = Celery('tasks', broker='redis://localhost:6379/0')
|
|
celery.conf.update(
|
|
task_serializer='json',
|
|
accept_content=['json'],
|
|
result_serializer='json',
|
|
timezone='UTC'
|
|
)
|
|
return celery
|
|
|
|
def init_celery(app):
|
|
"""Initialize Celery with Flask app context"""
|
|
celery = create_celery()
|
|
celery.conf.update(app.config)
|
|
|
|
class ContextTask(celery.Task):
|
|
def __call__(self, *args, **kwargs):
|
|
with app.app_context():
|
|
return self.run(*args, **kwargs)
|
|
|
|
celery.Task = ContextTask
|
|
return celery
|
|
|
|
celery = create_celery() # This will be initialized properly in app/__init__.py
|
|
|
|
def get_redis():
|
|
return redis.StrictRedis(
|
|
host='localhost',
|
|
port=6379,
|
|
db=0,
|
|
decode_responses=True
|
|
)
|
|
|
|
@celery.task
|
|
def start_scraping_task(faction_id, fetch_interval, run_interval, config_dict):
|
|
"""
|
|
Start scraping task with serializable parameters
|
|
Args:
|
|
faction_id: ID of the faction to scrape
|
|
fetch_interval: Interval between fetches in seconds
|
|
run_interval: How long to run the scraper in days
|
|
config_dict: Dictionary containing configuration
|
|
"""
|
|
try:
|
|
redis_client = get_redis()
|
|
# Set current faction ID at task start
|
|
redis_client.set("current_faction_id", str(faction_id))
|
|
|
|
scraper = Scraper(
|
|
faction_id=faction_id,
|
|
fetch_interval=int(fetch_interval),
|
|
run_interval=int(run_interval),
|
|
config=config_dict
|
|
)
|
|
scraper.start_scraping()
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
# Clean up Redis state on error
|
|
redis_client = get_redis()
|
|
redis_client.delete("current_faction_id")
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
@celery.task
|
|
def stop_scraping_task(faction_id):
|
|
"""Stop scraping task and clean up Redis state"""
|
|
try:
|
|
redis_client = get_redis()
|
|
|
|
# Clean up Redis state
|
|
redis_client.hset(f"scraper:{faction_id}", "scraping_active", "0")
|
|
redis_client.delete(f"scraper:{faction_id}")
|
|
|
|
# Clean up current_faction_id if it matches
|
|
current_id = redis_client.get("current_faction_id")
|
|
if current_id and current_id == str(faction_id):
|
|
redis_client.delete("current_faction_id")
|
|
|
|
# Revoke any running tasks for this faction
|
|
celery.control.revoke(
|
|
celery.current_task.request.id,
|
|
terminate=True,
|
|
signal='SIGTERM'
|
|
)
|
|
|
|
return {"status": "success", "message": f"Stopped scraping for faction {faction_id}"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|