from typing import List, Dict, Optional import requests import pandas as pd import os import time from datetime import datetime, timedelta from requests.exceptions import ConnectionError, Timeout, RequestException import redis import threading from flask import current_app class Scraper: _instances = {} # Track all instances by faction_id _lock = threading.Lock() def __new__(cls, faction_id, *args, **kwargs): with cls._lock: # Stop any existing instance for this faction if faction_id in cls._instances: old_instance = cls._instances[faction_id] old_instance.stop_scraping() instance = super().__new__(cls) cls._instances[faction_id] = instance return instance def __init__(self, faction_id, fetch_interval, run_interval, config): # Only initialize if not already initialized if not hasattr(self, 'faction_id'): self.redis_client = redis.StrictRedis( host='localhost', port=6379, db=0, decode_responses=True ) self.faction_id = faction_id self.fetch_interval = fetch_interval self.run_interval = run_interval self.API_KEY = config['DEFAULT']['API_KEY'] self.data_file_name = os.path.join( config['DATA']['DATA_DIR'], f"{faction_id}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}.csv" ) self.end_time = datetime.now() + timedelta(days=int(run_interval)) # Store scraper state in Redis self.redis_client.hmset(f"scraper:{faction_id}", { "faction_id": faction_id, "fetch_interval": fetch_interval, "run_interval": run_interval, "end_time": self.end_time.isoformat(), "data_file_name": self.data_file_name, "scraping_active": "0", "api_key": self.API_KEY }) @property def scraping_active(self): return bool(int(self.redis_client.hget(f"scraper:{self.faction_id}", "scraping_active"))) @scraping_active.setter def scraping_active(self, value): self.redis_client.hset(f"scraper:{self.faction_id}", "scraping_active", "1" if value else "0") def fetch_faction_data(self): url = f"https://api.torn.com/faction/{self.faction_id}?selections=&key={self.API_KEY}" response = requests.get(url) if response.status_code == 200: return response.json() current_app.logger.warning(f"Failed to fetch faction data for faction ID {self.faction_id}. Response: {response.text}") return None def fetch_user_activity(self, user_id): url = f"https://api.torn.com/user/{user_id}?selections=basic,profile&key={self.API_KEY}" retries = 3 for attempt in range(retries): try: response = requests.get(url, timeout=10) response.raise_for_status() return response.json() except ConnectionError as e: current_app.logger.error(f"Connection error while fetching user activity for user ID {user_id}: {e}") except Timeout as e: current_app.logger.error(f"Timeout error while fetching user activity for user ID {user_id}: {e}") except RequestException as e: current_app.logger.error(f"Error while fetching user activity for user ID {user_id}: {e}") if attempt < retries - 1: current_app.logger.debug(f"Retrying {attempt + 1}/{retries} for user {user_id}") time.sleep(2 ** attempt) # Exponential backoff return None def start_scraping(self) -> None: """Starts the scraping process until the end time is reached or stopped manually.""" self.scraping_active = True current_app.logger.info(f"Starting scraping for faction ID {self.faction_id}") current_app.logger.debug(f"Fetch interval: {self.fetch_interval}s, Run interval: {self.run_interval} days, End time: {self.end_time}") MAX_FAILURES = 5 failure_count = 0 while datetime.now() < self.end_time and self.scraping_active: current_app.logger.info(f"Fetching data at {datetime.now()}") faction_data = self.fetch_faction_data() if not faction_data or "members" not in faction_data: current_app.logger.warning(f"No faction data found for ID {self.faction_id} (Failure {failure_count + 1}/{MAX_FAILURES})") failure_count += 1 if failure_count >= MAX_FAILURES: current_app.logger.error(f"Max failures reached ({MAX_FAILURES}). Stopping scraping.") break time.sleep(self.fetch_interval) continue current_app.logger.info(f"Fetched {len(faction_data['members'])} members for faction {self.faction_id}") failure_count = 0 # Reset failure count on success user_activity_data = self.process_faction_members(faction_data["members"]) self.save_data(user_activity_data) current_app.logger.info(f"Data appended to {self.data_file_name}") time.sleep(self.fetch_interval) self.handle_scraping_end() def process_faction_members(self, members: Dict[str, Dict]) -> List[Dict]: """Processes and retrieves user activity for all faction members.""" user_activity_data = [] for user_id in members.keys(): user_activity = self.fetch_user_activity(user_id) if user_activity: user_activity_data.append({ "user_id": user_id, "name": user_activity.get("name", ""), "last_action": user_activity.get("last_action", {}).get("timestamp", 0), "status": user_activity.get("status", {}).get("state", ""), "timestamp": datetime.now().timestamp(), }) current_app.logger.info(f"Fetched data for user {user_id} ({user_activity.get('name', '')})") else: current_app.logger.warning(f"Failed to fetch data for user {user_id}") return user_activity_data def save_data(self, user_activity_data: List[Dict]) -> None: """Saves user activity data to a CSV file.""" if not user_activity_data: current_app.logger.warning("No data to save.") return df = pd.DataFrame(user_activity_data) df["last_action"] = pd.to_datetime(df["last_action"], unit="s") df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s") file_exists = os.path.isfile(self.data_file_name) try: with open(self.data_file_name, "a" if file_exists else "w") as f: df.to_csv(f, mode="a" if file_exists else "w", header=not file_exists, index=False) current_app.logger.info(f"Data successfully saved to {self.data_file_name}") except Exception as e: current_app.logger.error(f"Error saving data to {self.data_file_name}: {e}") def cleanup_redis_state(self): """Clean up all Redis state for this scraper instance""" if hasattr(self, 'faction_id'): self.redis_client.delete(f"scraper:{self.faction_id}") current_id = self.redis_client.get("current_faction_id") if current_id and current_id == str(self.faction_id): self.redis_client.delete("current_faction_id") # Remove from instances tracking with self._lock: if self.faction_id in self._instances: del self._instances[self.faction_id] def handle_scraping_end(self) -> None: """Handles cleanup and logging when scraping ends.""" if not self.scraping_active: current_app.logger.warning(f"Scraping stopped manually at {datetime.now()}") elif datetime.now() >= self.end_time: current_app.logger.warning(f"Scraping stopped due to timeout at {datetime.now()} (Run interval: {self.run_interval} days)") else: current_app.logger.error(f"Unexpected stop at {datetime.now()}") current_app.logger.info("Scraping completed.") self.scraping_active = False self.cleanup_redis_state() def stop_scraping(self): self.scraping_active = False self.cleanup_redis_state() current_app.logger.debug(f"Scraping stopped for faction {self.faction_id}") def __del__(self): """Ensure Redis cleanup on object destruction""" self.cleanup_redis_state()