194 lines
8.6 KiB
Python
194 lines
8.6 KiB
Python
![]() |
from typing import List, Dict, Optional
|
||
|
import requests
|
||
|
import pandas as pd
|
||
|
import os
|
||
|
import time
|
||
|
from datetime import datetime, timedelta
|
||
|
from requests.exceptions import ConnectionError, Timeout, RequestException
|
||
|
import redis
|
||
|
import threading
|
||
|
|
||
|
from flask import current_app
|
||
|
|
||
|
class Scraper:
|
||
|
_instances = {} # Track all instances by faction_id
|
||
|
_lock = threading.Lock()
|
||
|
|
||
|
def __new__(cls, faction_id, *args, **kwargs):
|
||
|
with cls._lock:
|
||
|
# Stop any existing instance for this faction
|
||
|
if faction_id in cls._instances:
|
||
|
old_instance = cls._instances[faction_id]
|
||
|
old_instance.stop_scraping()
|
||
|
|
||
|
instance = super().__new__(cls)
|
||
|
cls._instances[faction_id] = instance
|
||
|
return instance
|
||
|
|
||
|
def __init__(self, faction_id, fetch_interval, run_interval, config):
|
||
|
# Only initialize if not already initialized
|
||
|
if not hasattr(self, 'faction_id'):
|
||
|
self.redis_client = redis.StrictRedis(
|
||
|
host='localhost', port=6379, db=0, decode_responses=True
|
||
|
)
|
||
|
self.faction_id = faction_id
|
||
|
self.fetch_interval = fetch_interval
|
||
|
self.run_interval = run_interval
|
||
|
self.API_KEY = config['DEFAULT']['API_KEY']
|
||
|
self.data_file_name = os.path.join(
|
||
|
config['DATA']['DATA_DIR'],
|
||
|
f"{faction_id}-{datetime.now().strftime('%Y-%m-%d-%H-%M')}.csv"
|
||
|
)
|
||
|
self.end_time = datetime.now() + timedelta(days=int(run_interval))
|
||
|
|
||
|
# Store scraper state in Redis
|
||
|
self.redis_client.hmset(f"scraper:{faction_id}", {
|
||
|
"faction_id": faction_id,
|
||
|
"fetch_interval": fetch_interval,
|
||
|
"run_interval": run_interval,
|
||
|
"end_time": self.end_time.isoformat(),
|
||
|
"data_file_name": self.data_file_name,
|
||
|
"scraping_active": "0",
|
||
|
"api_key": self.API_KEY
|
||
|
})
|
||
|
|
||
|
@property
|
||
|
def scraping_active(self):
|
||
|
return bool(int(self.redis_client.hget(f"scraper:{self.faction_id}", "scraping_active")))
|
||
|
|
||
|
@scraping_active.setter
|
||
|
def scraping_active(self, value):
|
||
|
self.redis_client.hset(f"scraper:{self.faction_id}", "scraping_active", "1" if value else "0")
|
||
|
|
||
|
def fetch_faction_data(self):
|
||
|
url = f"https://api.torn.com/faction/{self.faction_id}?selections=&key={self.API_KEY}"
|
||
|
response = requests.get(url)
|
||
|
if response.status_code == 200:
|
||
|
return response.json()
|
||
|
current_app.logger.warning(f"Failed to fetch faction data for faction ID {self.faction_id}. Response: {response.text}")
|
||
|
return None
|
||
|
|
||
|
def fetch_user_activity(self, user_id):
|
||
|
url = f"https://api.torn.com/user/{user_id}?selections=basic,profile&key={self.API_KEY}"
|
||
|
retries = 3
|
||
|
for attempt in range(retries):
|
||
|
try:
|
||
|
response = requests.get(url, timeout=10)
|
||
|
response.raise_for_status()
|
||
|
return response.json()
|
||
|
except ConnectionError as e:
|
||
|
current_app.logger.error(f"Connection error while fetching user activity for user ID {user_id}: {e}")
|
||
|
except Timeout as e:
|
||
|
current_app.logger.error(f"Timeout error while fetching user activity for user ID {user_id}: {e}")
|
||
|
except RequestException as e:
|
||
|
current_app.logger.error(f"Error while fetching user activity for user ID {user_id}: {e}")
|
||
|
if attempt < retries - 1:
|
||
|
current_app.logger.debug(f"Retrying {attempt + 1}/{retries} for user {user_id}")
|
||
|
time.sleep(2 ** attempt) # Exponential backoff
|
||
|
return None
|
||
|
|
||
|
def start_scraping(self) -> None:
|
||
|
"""Starts the scraping process until the end time is reached or stopped manually."""
|
||
|
self.scraping_active = True
|
||
|
|
||
|
current_app.logger.info(f"Starting scraping for faction ID {self.faction_id}")
|
||
|
current_app.logger.debug(f"Fetch interval: {self.fetch_interval}s, Run interval: {self.run_interval} days, End time: {self.end_time}")
|
||
|
|
||
|
MAX_FAILURES = 5
|
||
|
failure_count = 0
|
||
|
|
||
|
while datetime.now() < self.end_time and self.scraping_active:
|
||
|
current_app.logger.info(f"Fetching data at {datetime.now()}")
|
||
|
faction_data = self.fetch_faction_data()
|
||
|
|
||
|
if not faction_data or "members" not in faction_data:
|
||
|
current_app.logger.warning(f"No faction data found for ID {self.faction_id} (Failure {failure_count + 1}/{MAX_FAILURES})")
|
||
|
failure_count += 1
|
||
|
if failure_count >= MAX_FAILURES:
|
||
|
current_app.logger.error(f"Max failures reached ({MAX_FAILURES}). Stopping scraping.")
|
||
|
break
|
||
|
time.sleep(self.fetch_interval)
|
||
|
continue
|
||
|
|
||
|
current_app.logger.info(f"Fetched {len(faction_data['members'])} members for faction {self.faction_id}")
|
||
|
failure_count = 0 # Reset failure count on success
|
||
|
user_activity_data = self.process_faction_members(faction_data["members"])
|
||
|
self.save_data(user_activity_data)
|
||
|
|
||
|
current_app.logger.info(f"Data appended to {self.data_file_name}")
|
||
|
time.sleep(self.fetch_interval)
|
||
|
|
||
|
self.handle_scraping_end()
|
||
|
|
||
|
|
||
|
def process_faction_members(self, members: Dict[str, Dict]) -> List[Dict]:
|
||
|
"""Processes and retrieves user activity for all faction members."""
|
||
|
user_activity_data = []
|
||
|
for user_id in members.keys():
|
||
|
user_activity = self.fetch_user_activity(user_id)
|
||
|
if user_activity:
|
||
|
user_activity_data.append({
|
||
|
"user_id": user_id,
|
||
|
"name": user_activity.get("name", ""),
|
||
|
"last_action": user_activity.get("last_action", {}).get("timestamp", 0),
|
||
|
"status": user_activity.get("status", {}).get("state", ""),
|
||
|
"timestamp": datetime.now().timestamp(),
|
||
|
})
|
||
|
current_app.logger.info(f"Fetched data for user {user_id} ({user_activity.get('name', '')})")
|
||
|
else:
|
||
|
current_app.logger.warning(f"Failed to fetch data for user {user_id}")
|
||
|
|
||
|
return user_activity_data
|
||
|
|
||
|
def save_data(self, user_activity_data: List[Dict]) -> None:
|
||
|
"""Saves user activity data to a CSV file."""
|
||
|
if not user_activity_data:
|
||
|
current_app.logger.warning("No data to save.")
|
||
|
return
|
||
|
|
||
|
df = pd.DataFrame(user_activity_data)
|
||
|
df["last_action"] = pd.to_datetime(df["last_action"], unit="s")
|
||
|
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
|
||
|
|
||
|
file_exists = os.path.isfile(self.data_file_name)
|
||
|
|
||
|
try:
|
||
|
with open(self.data_file_name, "a" if file_exists else "w") as f:
|
||
|
df.to_csv(f, mode="a" if file_exists else "w", header=not file_exists, index=False)
|
||
|
current_app.logger.info(f"Data successfully saved to {self.data_file_name}")
|
||
|
except Exception as e:
|
||
|
current_app.logger.error(f"Error saving data to {self.data_file_name}: {e}")
|
||
|
|
||
|
def cleanup_redis_state(self):
|
||
|
"""Clean up all Redis state for this scraper instance"""
|
||
|
if hasattr(self, 'faction_id'):
|
||
|
self.redis_client.delete(f"scraper:{self.faction_id}")
|
||
|
current_id = self.redis_client.get("current_faction_id")
|
||
|
if current_id and current_id == str(self.faction_id):
|
||
|
self.redis_client.delete("current_faction_id")
|
||
|
# Remove from instances tracking
|
||
|
with self._lock:
|
||
|
if self.faction_id in self._instances:
|
||
|
del self._instances[self.faction_id]
|
||
|
|
||
|
def handle_scraping_end(self) -> None:
|
||
|
"""Handles cleanup and logging when scraping ends."""
|
||
|
if not self.scraping_active:
|
||
|
current_app.logger.warning(f"Scraping stopped manually at {datetime.now()}")
|
||
|
elif datetime.now() >= self.end_time:
|
||
|
current_app.logger.warning(f"Scraping stopped due to timeout at {datetime.now()} (Run interval: {self.run_interval} days)")
|
||
|
else:
|
||
|
current_app.logger.error(f"Unexpected stop at {datetime.now()}")
|
||
|
|
||
|
current_app.logger.info("Scraping completed.")
|
||
|
self.scraping_active = False
|
||
|
self.cleanup_redis_state()
|
||
|
|
||
|
def stop_scraping(self):
|
||
|
self.scraping_active = False
|
||
|
self.cleanup_redis_state()
|
||
|
current_app.logger.debug(f"Scraping stopped for faction {self.faction_id}")
|
||
|
|
||
|
def __del__(self):
|
||
|
"""Ensure Redis cleanup on object destruction"""
|
||
|
self.cleanup_redis_state()
|