From 5d0c41407ef873c6dcd0a1754cc9007a7976a695 Mon Sep 17 00:00:00 2001 From: Michael Beck Date: Fri, 23 Jun 2023 16:41:20 +0200 Subject: [PATCH] adds multiprocessing to scrape tweets. --- collect.py | 73 +++++++++++++--------------------------------- funs/ClearDupes.py | 6 ++++ funs/Scrape.py | 44 ++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 52 deletions(-) create mode 100644 funs/Scrape.py diff --git a/collect.py b/collect.py index 9eb711c..ae030b1 100644 --- a/collect.py +++ b/collect.py @@ -58,6 +58,7 @@ import glob import time import sys from datetime import datetime +import concurrent.futures ## Setup directories # WD Michael @@ -131,10 +132,12 @@ tweetDFColumns = [ 'lang', 'source'] +## + ## Import other files -import snscrape.modules.twitter as sntwitter from funs.TimeSlice import * from funs.ClearDupes import deDupe +from funs.Scrape import scrapeTweets # create logfile & log all outputs logfilen = logfile + datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '.txt' @@ -150,7 +153,6 @@ for slice in time_slices: print(slice['suffix'] + ': ' + slice['beg_time'] + ' - ' + slice['end_time']) print('---') - ## Keywords keywords = [] # Remove duplicate Keywords and save all non-duplicates to 'data/keywords.txt' @@ -178,57 +180,24 @@ print("Starting scraping at:") print(timeStartScrape.strftime('%Y-%m-%d_%H-%M-%S')) print('---') -# Iterate over each Twitter account -for handle in accounts: - # Iterate over each time slice - for slice_data in time_slices: - # define slice data variables from time_slices - ts_beg = slice_data['beg_time'] - ts_end = slice_data['end_time'] - suffix = slice_data['suffix'] - tweetFileName = "Tweets-{handle}{suffix}.csv" - - # create empty tweetlist that will be filled with tweets of current sen - TweetList = [] - - # statusmsg - print(f'Fetching: {handle:>15}{suffix:<7} - from {ts_beg} to {ts_end}') - - # Snscrape query: - query = f'from:{handle} since:{ts_beg} until:{ts_end}' - for i,tweet in enumerate(sntwitter.TwitterSearchScraper(query).get_items()): - singleTweetList = [] - if i>maxTweets: - break - # get tweet vars from tweetDFColumns and append to singleTweetList - # which will then be appended to TweetList. TweetList contains all tweets of the current slice. - for col in tweetDFColumns: - singleTweetList.append(eval(f'tweet.{col}')) - TweetList.append(singleTweetList) - # Check if no tweets fetched for the current time slice. If there are no tweets, skip to next time_slices loop iteration - if len(TweetList) == 0: - msg = f'return empty in {handle}{suffix} - from {ts_beg} to {ts_end}' - open(file, 'a').close() - print(msg) - continue - - print(f'{i:<6} tweets scraped for: {handle:>15}{suffix:<7}') - - # convert to dataframe - tweet_df = pd.DataFrame(TweetList, columns=tweetDFColumns) - - ## Check if tweet-text contains keyword - tweet_df['contains_keyword'] = '' - tweet_df['contains_keyword'] = (tweet_df['rawContent'].str.findall('|'.join(keywords)).str.join(',').replace('', 'none')) - ## Save two versions of the dataset, one with all fields and one without dict fields - # define filepaths - csv_path = td + tweetFileName - # save short csv - tweet_df.to_csv(csv_path) - # sleep 1 second to not get blocked because of excessive requests - time.sleep(0.5) +# Iterate over each Twitter account using multiprocessing +with concurrent.futures.ThreadPoolExecutor() as executor: + # List to store the scraping tasks + tasks = [] + + for handle in accounts: + # Iterate over each time slice + for slice_data in time_slices: + # ... code to prepare the slice_data ... + + # Schedule the scraping task + task = executor.submit(scrapeTweets, handle, slice_data, keywords, td) + tasks.append(task) + + # Wait for all tasks to complete + concurrent.futures.wait(tasks) -timeEndScrape = datetime.now()tweetFileName +timeEndScrape = datetime.now() print("---") print("End of scraping at:") print(timeEndScrape.strftime('%Y-%m-%d_%H-%M-%S')) diff --git a/funs/ClearDupes.py b/funs/ClearDupes.py index 17c921a..9124825 100644 --- a/funs/ClearDupes.py +++ b/funs/ClearDupes.py @@ -6,6 +6,12 @@ Created on Wed Jun 21 13:58:42 2023 @author: michael ''' def deDupe(inFile, outFile): + """Reads file line by line and removes duplicates. Saves deduplicated lines into another file. + + Args: + inFile (string): Path to file that shall be deduplicated. + outFile (string): Path to output-file. + """ from collections import Counter with open(inFile) as f: lines = f.readlines() diff --git a/funs/Scrape.py b/funs/Scrape.py new file mode 100644 index 0000000..1ee4edb --- /dev/null +++ b/funs/Scrape.py @@ -0,0 +1,44 @@ +def scrapeTweets(handle, slice_data, keywords, td, maxTweets = 5000): + from datetime import datetime + currentTime = datetime.now + import snscrape.modules.twitter as sntwitter + ts_beg = slice_data['beg_time'] + ts_end = slice_data['end_time'] + suffix = slice_data['suffix'] + tweetDataFilePath = td + "Tweets-{handle}{suffix}.csv" + + # create empty tweetlist that will be filled with tweets of current sen + TweetList = [] + + # statusmsg + print(f'{currentTime:<30} Fetching: {handle:>15}{suffix:<7} - from {ts_beg} to {ts_end}') + + # Snscrape query: + query = f'from:{handle} since:{ts_beg} until:{ts_end}' + for i,tweet in enumerate(sntwitter.TwitterSearchScraper(query).get_items()): + if i>maxTweets: + break + # get tweet vars from tweetDFColumns and append to singleTweetList + # which will then be appended to TweetList. TweetList contains all tweets of the current slice. + singleTweetList = [singleTweetList.append(eval(f'tweet.{col}')) for col in tweetDFColumns] + TweetList.append(singleTweetList) + # Check if no tweets fetched for the current time slice. If there are no tweets, skip to next time_slices loop iteration + if TweetList: + open(tweetDataFilePath, 'a').close() + print(f'return empty in {handle}{suffix} - from {ts_beg} to {ts_end}') + return + print(f'{i:<6} tweets scraped for: {handle:>15}{suffix:<7}') + + # convert to dataframe + tweet_df = pd.DataFrame(TweetList, columns=tweetDFColumns) + + ## Check if tweet-text contains keyword + tweet_df['contains_keyword'] = '' + tweet_df['contains_keyword'] = (tweet_df['rawContent'].str.findall('|'.join(keywords)).str.join(',').replace('', 'none')) + ## Save two versions of the dataset, one with all fields and one without dict fields + # define filepaths + csv_path = tweetDataFilePath + # save short csv + tweet_df.to_csv(csv_path, encoding='utf-8') + # sleep 1 second to not get blocked because of excessive requests + # time.sleep(0.5) \ No newline at end of file