adds multiprocessing to scrape tweets.

This commit is contained in:
Michael Beck 2023-06-23 16:41:20 +02:00
parent c675db9d00
commit 5d0c41407e
3 changed files with 71 additions and 52 deletions

View File

@ -58,6 +58,7 @@ import glob
import time
import sys
from datetime import datetime
import concurrent.futures
## Setup directories
# WD Michael
@ -131,10 +132,12 @@ tweetDFColumns = [
'lang',
'source']
##
## Import other files
import snscrape.modules.twitter as sntwitter
from funs.TimeSlice import *
from funs.ClearDupes import deDupe
from funs.Scrape import scrapeTweets
# create logfile & log all outputs
logfilen = logfile + datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.txt'
@ -150,7 +153,6 @@ for slice in time_slices:
print(slice['suffix'] + ': ' + slice['beg_time'] + ' - ' + slice['end_time'])
print('---')
## Keywords
keywords = []
# Remove duplicate Keywords and save all non-duplicates to 'data/keywords.txt'
@ -178,57 +180,24 @@ print("Starting scraping at:")
print(timeStartScrape.strftime('%Y-%m-%d_%H-%M-%S'))
print('---')
# Iterate over each Twitter account
for handle in accounts:
# Iterate over each time slice
for slice_data in time_slices:
# define slice data variables from time_slices
ts_beg = slice_data['beg_time']
ts_end = slice_data['end_time']
suffix = slice_data['suffix']
tweetFileName = "Tweets-{handle}{suffix}.csv"
# create empty tweetlist that will be filled with tweets of current sen
TweetList = []
# statusmsg
print(f'Fetching: {handle:>15}{suffix:<7} - from {ts_beg} to {ts_end}')
# Snscrape query:
query = f'from:{handle} since:{ts_beg} until:{ts_end}'
for i,tweet in enumerate(sntwitter.TwitterSearchScraper(query).get_items()):
singleTweetList = []
if i>maxTweets:
break
# get tweet vars from tweetDFColumns and append to singleTweetList
# which will then be appended to TweetList. TweetList contains all tweets of the current slice.
for col in tweetDFColumns:
singleTweetList.append(eval(f'tweet.{col}'))
TweetList.append(singleTweetList)
# Check if no tweets fetched for the current time slice. If there are no tweets, skip to next time_slices loop iteration
if len(TweetList) == 0:
msg = f'return empty in {handle}{suffix} - from {ts_beg} to {ts_end}'
open(file, 'a').close()
print(msg)
continue
print(f'{i:<6} tweets scraped for: {handle:>15}{suffix:<7}')
# convert to dataframe
tweet_df = pd.DataFrame(TweetList, columns=tweetDFColumns)
## Check if tweet-text contains keyword
tweet_df['contains_keyword'] = ''
tweet_df['contains_keyword'] = (tweet_df['rawContent'].str.findall('|'.join(keywords)).str.join(',').replace('', 'none'))
## Save two versions of the dataset, one with all fields and one without dict fields
# define filepaths
csv_path = td + tweetFileName
# save short csv
tweet_df.to_csv(csv_path)
# sleep 1 second to not get blocked because of excessive requests
time.sleep(0.5)
# Iterate over each Twitter account using multiprocessing
with concurrent.futures.ThreadPoolExecutor() as executor:
# List to store the scraping tasks
tasks = []
for handle in accounts:
# Iterate over each time slice
for slice_data in time_slices:
# ... code to prepare the slice_data ...
# Schedule the scraping task
task = executor.submit(scrapeTweets, handle, slice_data, keywords, td)
tasks.append(task)
# Wait for all tasks to complete
concurrent.futures.wait(tasks)
timeEndScrape = datetime.now()tweetFileName
timeEndScrape = datetime.now()
print("---")
print("End of scraping at:")
print(timeEndScrape.strftime('%Y-%m-%d_%H-%M-%S'))

View File

@ -6,6 +6,12 @@ Created on Wed Jun 21 13:58:42 2023
@author: michael
'''
def deDupe(inFile, outFile):
"""Reads file line by line and removes duplicates. Saves deduplicated lines into another file.
Args:
inFile (string): Path to file that shall be deduplicated.
outFile (string): Path to output-file.
"""
from collections import Counter
with open(inFile) as f:
lines = f.readlines()

44
funs/Scrape.py Normal file
View File

@ -0,0 +1,44 @@
def scrapeTweets(handle, slice_data, keywords, td, maxTweets = 5000):
from datetime import datetime
currentTime = datetime.now
import snscrape.modules.twitter as sntwitter
ts_beg = slice_data['beg_time']
ts_end = slice_data['end_time']
suffix = slice_data['suffix']
tweetDataFilePath = td + "Tweets-{handle}{suffix}.csv"
# create empty tweetlist that will be filled with tweets of current sen
TweetList = []
# statusmsg
print(f'{currentTime:<30} Fetching: {handle:>15}{suffix:<7} - from {ts_beg} to {ts_end}')
# Snscrape query:
query = f'from:{handle} since:{ts_beg} until:{ts_end}'
for i,tweet in enumerate(sntwitter.TwitterSearchScraper(query).get_items()):
if i>maxTweets:
break
# get tweet vars from tweetDFColumns and append to singleTweetList
# which will then be appended to TweetList. TweetList contains all tweets of the current slice.
singleTweetList = [singleTweetList.append(eval(f'tweet.{col}')) for col in tweetDFColumns]
TweetList.append(singleTweetList)
# Check if no tweets fetched for the current time slice. If there are no tweets, skip to next time_slices loop iteration
if TweetList:
open(tweetDataFilePath, 'a').close()
print(f'return empty in {handle}{suffix} - from {ts_beg} to {ts_end}')
return
print(f'{i:<6} tweets scraped for: {handle:>15}{suffix:<7}')
# convert to dataframe
tweet_df = pd.DataFrame(TweetList, columns=tweetDFColumns)
## Check if tweet-text contains keyword
tweet_df['contains_keyword'] = ''
tweet_df['contains_keyword'] = (tweet_df['rawContent'].str.findall('|'.join(keywords)).str.join(',').replace('', 'none'))
## Save two versions of the dataset, one with all fields and one without dict fields
# define filepaths
csv_path = tweetDataFilePath
# save short csv
tweet_df.to_csv(csv_path, encoding='utf-8')
# sleep 1 second to not get blocked because of excessive requests
# time.sleep(0.5)