fixes multiprocessing.

This commit is contained in:
Michael Beck 2023-06-23 19:18:03 +02:00
parent b00f75e9fe
commit e8ba02ca0f
2 changed files with 34 additions and 26 deletions

View File

@ -188,21 +188,20 @@ print(timeStartScrape.strftime(fTimeFormat))
print("---") print("---")
# Iterate over each Twitter account using multiprocessing # Iterate over each Twitter account using multiprocessing
# Iterate over each Twitter account using multiprocessing # with concurrent.futures.ProcessPoolExecutor() as executor:
with concurrent.futures.ProcessPoolExecutor() as executor: # # List to store the scraping tasks
# List to store the scraping tasks # tasks = []
tasks = [] # for handle in accounts:
for handle in accounts: # # Iterate over each time slice
# Iterate over each time slice # for slice_data in time_slices:
for slice_data in time_slices: # # ... Code to prepare the slice_data ...
# ... Code to prepare the slice_data ... # # Schedule the scraping task
# Schedule the scraping task # task = executor.submit(
task = executor.submit( # scrapeTweets, handle, slice_data, keywords, td, tweetDFColumns
scrapeTweets, handle, slice_data, keywords, td, tweetDFColumns # )
) # # Store the handle and slice_data as attributes of the task
tasks.append(task) # # Wait for all tasks to complete
# Wait for all tasks to complete # concurrent.futures.wait(tasks)
concurrent.futures.wait(tasks)
timeEndScrape = datetime.now() timeEndScrape = datetime.now()
print("---") print("---")
@ -219,10 +218,14 @@ for handle in accounts:
for tslice in time_slices: for tslice in time_slices:
suffix = tslice['suffix'] suffix = tslice['suffix']
AllFilesList.append(f"Tweets-{handle}{suffix}.csv") AllFilesList.append(f"Tweets-{handle}{suffix}.csv")
with open(f"{logfile}missing-"+timeStartScrape.strftime(fTimeFormat)+".txt", "w") as fout: with open(f"{logfile}"+timeStartScrape.strftime(fTimeFormat)+"_missing.txt", "w") as fout:
for file in AllFilesList: for file in AllFilesList:
if file not in tweetfiles: if file not in tweetfiles:
fout.write(f'Missing: {file}.\n') # if file is not in tweetfiles, print error message. fout.write(f'Missing: {file}.\n') # if file is not in tweetfiles, print error message.
else:
fout.write('all slices scraped.')
# check if file_alltweets (previously scraped tweets that have been merged into one file) exists, if it exists, remove from list to not include it in the following merge # check if file_alltweets (previously scraped tweets that have been merged into one file) exists, if it exists, remove from list to not include it in the following merge
if file_alltweets in tweetfiles: if file_alltweets in tweetfiles:
tweetfiles.remove(file_alltweets) tweetfiles.remove(file_alltweets)

View File

@ -1,15 +1,16 @@
from datetime import datetime, time from datetime import datetime
import time
import pandas as pd import pandas as pd
import snscrape.modules.twitter as sntwitter import snscrape.modules.twitter as sntwitter
def scrapeTweets(handle, slice_data, keywords, td, tweetDFColumns, maxTweets = 5000): def scrapeTweets(handle, slice_data, keywords, td, tweetDFColumns, maxTweets = 5000):
i = 0 i = 0
currentTime = datetime.now currentTime = datetime.now()
ts_beg = slice_data['beg_time'] ts_beg = slice_data['beg_time']
ts_end = slice_data['end_time'] ts_end = slice_data['end_time']
suffix = slice_data['suffix'] suffix = slice_data['suffix']
tweetDataFilePath = td + "Tweets-{handle}{suffix}.csv" tweetDataFilePath = td + f"Tweets-{handle}{suffix}.csv"
# create empty tweetlist that will be filled with tweets of current sen # create empty tweetlist that will be filled with tweets of current sen
TweetList = [] TweetList = []
@ -24,14 +25,17 @@ def scrapeTweets(handle, slice_data, keywords, td, tweetDFColumns, maxTweets = 5
break break
# get tweet vars from tweetDFColumns and append to singleTweetList # get tweet vars from tweetDFColumns and append to singleTweetList
# which will then be appended to TweetList. TweetList contains all tweets of the current slice. # which will then be appended to TweetList. TweetList contains all tweets of the current slice.
singleTweetList = [singleTweetList.append(eval(f'tweet.{col}')) for col in tweetDFColumns] singleTweetList = []
for col in tweetDFColumns:
singleTweetList.append(eval(f'tweet.{col}'))
TweetList.append(singleTweetList) TweetList.append(singleTweetList)
# Check if no tweets fetched for the current time slice. If there are no tweets, skip to next time_slices loop iteration # # Check if no tweets fetched for the current time slice. If there are no tweets, skip to next time_slices loop iteration
if not TweetList: # if not TweetList:
open(tweetDataFilePath, 'a').close() # open(tweetDataFilePath, 'a').close()
print(f'return empty in {handle}{suffix} - from {ts_beg} to {ts_end}') # print(f'return empty in {handle}{suffix} - from {ts_beg} to {ts_end}')
return # return
print(f'{i:<6} tweets scraped for: {handle:>15}{suffix:<7}') print(f'{i:<6} tweets scraped for: {handle:>15}{suffix:<7}')
@ -43,7 +47,8 @@ def scrapeTweets(handle, slice_data, keywords, td, tweetDFColumns, maxTweets = 5
tweet_df['contains_keyword'] = ( tweet_df['contains_keyword'] = (
tweet_df['rawContent'].str.findall('|'.join(keywords)).str.join(',').replace('', 'none') tweet_df['rawContent'].str.findall('|'.join(keywords)).str.join(',').replace('', 'none')
) )
## Save two versions of the dataset, one with all fields and one without dict fields #return(tweet_df)
# Save two versions of the dataset, one with all fields and one without dict fields
# define filepaths # define filepaths
csv_path = tweetDataFilePath csv_path = tweetDataFilePath
# save short csv # save short csv