194 lines
8.6 KiB
Python
Raw Permalink Normal View History

2025-02-22 16:55:41 +01:00
from typing import List, Dict, Optional
import requests
import pandas as pd
import os
import time
from datetime import datetime, timedelta
from requests.exceptions import ConnectionError, Timeout, RequestException
import redis
import threading
from flask import current_app
class Scraper:
_instances = {} # Track all instances by faction_id
_lock = threading.Lock()
def __new__(cls, faction_id, *args, **kwargs):
with cls._lock:
# Stop any existing instance for this faction
if faction_id in cls._instances:
old_instance = cls._instances[faction_id]
old_instance.stop_scraping()
instance = super().__new__(cls)
cls._instances[faction_id] = instance
return instance
def __init__(self, faction_id, fetch_interval, run_interval, config):
# Only initialize if not already initialized
if not hasattr(self, 'faction_id'):
self.redis_client = redis.StrictRedis(
host='localhost', port=6379, db=0, decode_responses=True
)
self.faction_id = faction_id
self.fetch_interval = fetch_interval
self.run_interval = run_interval
self.API_KEY = config['DEFAULT']['API_KEY']
self.data_file_name = os.path.join(
config['DATA']['DATA_DIR'],
f"{faction_id}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}.csv"
)
self.end_time = datetime.now() + timedelta(days=int(run_interval))
# Store scraper state in Redis
self.redis_client.hmset(f"scraper:{faction_id}", {
"faction_id": faction_id,
"fetch_interval": fetch_interval,
"run_interval": run_interval,
"end_time": self.end_time.isoformat(),
"data_file_name": self.data_file_name,
"scraping_active": "0",
"api_key": self.API_KEY
})
@property
def scraping_active(self):
return bool(int(self.redis_client.hget(f"scraper:{self.faction_id}", "scraping_active")))
@scraping_active.setter
def scraping_active(self, value):
self.redis_client.hset(f"scraper:{self.faction_id}", "scraping_active", "1" if value else "0")
def fetch_faction_data(self):
url = f"https://api.torn.com/faction/{self.faction_id}?selections=&key={self.API_KEY}"
response = requests.get(url)
if response.status_code == 200:
return response.json()
current_app.logger.warning(f"Failed to fetch faction data for faction ID {self.faction_id}. Response: {response.text}")
return None
def fetch_user_activity(self, user_id):
url = f"https://api.torn.com/user/{user_id}?selections=basic,profile&key={self.API_KEY}"
retries = 3
for attempt in range(retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except ConnectionError as e:
current_app.logger.error(f"Connection error while fetching user activity for user ID {user_id}: {e}")
except Timeout as e:
current_app.logger.error(f"Timeout error while fetching user activity for user ID {user_id}: {e}")
except RequestException as e:
current_app.logger.error(f"Error while fetching user activity for user ID {user_id}: {e}")
if attempt < retries - 1:
current_app.logger.debug(f"Retrying {attempt + 1}/{retries} for user {user_id}")
time.sleep(2 ** attempt) # Exponential backoff
return None
def start_scraping(self) -> None:
"""Starts the scraping process until the end time is reached or stopped manually."""
self.scraping_active = True
current_app.logger.info(f"Starting scraping for faction ID {self.faction_id}")
current_app.logger.debug(f"Fetch interval: {self.fetch_interval}s, Run interval: {self.run_interval} days, End time: {self.end_time}")
MAX_FAILURES = 5
failure_count = 0
while datetime.now() < self.end_time and self.scraping_active:
current_app.logger.info(f"Fetching data at {datetime.now()}")
faction_data = self.fetch_faction_data()
if not faction_data or "members" not in faction_data:
current_app.logger.warning(f"No faction data found for ID {self.faction_id} (Failure {failure_count + 1}/{MAX_FAILURES})")
failure_count += 1
if failure_count >= MAX_FAILURES:
current_app.logger.error(f"Max failures reached ({MAX_FAILURES}). Stopping scraping.")
break
time.sleep(self.fetch_interval)
continue
current_app.logger.info(f"Fetched {len(faction_data['members'])} members for faction {self.faction_id}")
failure_count = 0 # Reset failure count on success
user_activity_data = self.process_faction_members(faction_data["members"])
self.save_data(user_activity_data)
current_app.logger.info(f"Data appended to {self.data_file_name}")
time.sleep(self.fetch_interval)
self.handle_scraping_end()
def process_faction_members(self, members: Dict[str, Dict]) -> List[Dict]:
"""Processes and retrieves user activity for all faction members."""
user_activity_data = []
for user_id in members.keys():
user_activity = self.fetch_user_activity(user_id)
if user_activity:
user_activity_data.append({
"user_id": user_id,
"name": user_activity.get("name", ""),
"last_action": user_activity.get("last_action", {}).get("timestamp", 0),
"status": user_activity.get("status", {}).get("state", ""),
"timestamp": datetime.now().timestamp(),
})
current_app.logger.info(f"Fetched data for user {user_id} ({user_activity.get('name', '')})")
else:
current_app.logger.warning(f"Failed to fetch data for user {user_id}")
return user_activity_data
def save_data(self, user_activity_data: List[Dict]) -> None:
"""Saves user activity data to a CSV file."""
if not user_activity_data:
current_app.logger.warning("No data to save.")
return
df = pd.DataFrame(user_activity_data)
df["last_action"] = pd.to_datetime(df["last_action"], unit="s")
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
file_exists = os.path.isfile(self.data_file_name)
try:
with open(self.data_file_name, "a" if file_exists else "w") as f:
df.to_csv(f, mode="a" if file_exists else "w", header=not file_exists, index=False)
current_app.logger.info(f"Data successfully saved to {self.data_file_name}")
except Exception as e:
current_app.logger.error(f"Error saving data to {self.data_file_name}: {e}")
def cleanup_redis_state(self):
"""Clean up all Redis state for this scraper instance"""
if hasattr(self, 'faction_id'):
self.redis_client.delete(f"scraper:{self.faction_id}")
current_id = self.redis_client.get("current_faction_id")
if current_id and current_id == str(self.faction_id):
self.redis_client.delete("current_faction_id")
# Remove from instances tracking
with self._lock:
if self.faction_id in self._instances:
del self._instances[self.faction_id]
def handle_scraping_end(self) -> None:
"""Handles cleanup and logging when scraping ends."""
if not self.scraping_active:
current_app.logger.warning(f"Scraping stopped manually at {datetime.now()}")
elif datetime.now() >= self.end_time:
current_app.logger.warning(f"Scraping stopped due to timeout at {datetime.now()} (Run interval: {self.run_interval} days)")
else:
current_app.logger.error(f"Unexpected stop at {datetime.now()}")
current_app.logger.info("Scraping completed.")
self.scraping_active = False
self.cleanup_redis_state()
def stop_scraping(self):
self.scraping_active = False
self.cleanup_redis_state()
current_app.logger.debug(f"Scraping stopped for faction {self.faction_id}")
def __del__(self):
"""Ensure Redis cleanup on object destruction"""
self.cleanup_redis_state()