CollectUSSenatorTweets/preTestClassification.py
2023-08-08 00:06:18 +02:00

141 lines
4.6 KiB
Python

import re
import string
import numpy as np
import pandas as pd
from datetime import datetime
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
from datasets import load_dataset
from transformers.pipelines.pt_utils import KeyDataset
from funs.CleanTweets import remove_URL, remove_emoji, remove_html, remove_punct
#%%
# prepare
# install xformers (pip install xformers) for better performance
###################
# Setup directories
# WD Michael
wd = "/home/michael/Documents/PS/Data/collectTweets/"
# WD Server
# wd = '/home/yunohost.multimedia/polsoc/Politics & Society/TweetCollection/'
# datafile input directory
di = "data/IN/"
# Tweet-datafile output directory
ud = "data/OUT/"
# Name of file that all senator data will be written to
senCSV = "ALL-SENATORS-TWEETS.csv"
# Name of new datafile generated
senCSVc = "Tweets-Stub.csv"
# Name of pretest files
preTestIDsFake = "pretest-tweets_fake.txt"
preTestIDsNot = "pretest-tweets_not_fake.txt"
# Name of pretest datafile
senCSVPretest = "Pretest.csv"
senCSVPretestPrep = "Pretest-Prep.csv"
senCSVPretestResult = "Pretest-Results.csv"
# don't change this one
senCSVPath = wd + ud + senCSV
senCSVcPath = wd + ud + senCSVc
senCSVcPretestPath = wd + ud + senCSVPretest
senCSVcPretestPrepPath = wd + ud + senCSVPretestPrep
senCSVcPretestResultPath = wd + ud + senCSVPretestResult
preTestIDsFakePath = wd + di + preTestIDsFake
preTestIDsNotPath = wd + di + preTestIDsNot
# List of IDs to select
# Read the IDs from a file
preTestIDsFakeL = []
preTestIDsNotL = []
with open(preTestIDsFakePath, "r") as file:
lines = file.readlines()
for line in lines:
tid = line.strip() # Remove the newline character
preTestIDsFakeL.append(tid)
with open(preTestIDsNotPath, "r") as file:
lines = file.readlines()
for line in lines:
tid = line.strip() # Remove the newline character
preTestIDsNotL.append(tid)
# Select rows based on the IDs
df = pd.read_csv(senCSVPath, dtype=(object))
#%%
# Create pretest dataframe
dfPreTest = df[df['id'].isin(preTestIDsFakeL)].copy()
dfPreTest['fake'] = True
dfPreTest = pd.concat([dfPreTest, df[df['id'].isin(preTestIDsNotL)]], ignore_index=True)
dfPreTest['fake'] = dfPreTest['fake'].fillna(False)
#%%
# https://huggingface.co/bvrau/covid-twitter-bert-v2-struth
# HowTo:
# https://huggingface.co/docs/transformers/main/en/model_doc/bert#transformers.BertForSequenceClassification
# https://stackoverflow.com/questions/75932605/getting-the-input-text-from-transformers-pipeline
pipe = pipeline("text-classification", model="bvrau/covid-twitter-bert-v2-struth")
model = AutoModelForSequenceClassification.from_pretrained("bvrau/covid-twitter-bert-v2-struth")
tokenizer = AutoTokenizer.from_pretrained("bvrau/covid-twitter-bert-v2-struth")
# Source https://www.kaggle.com/code/daotan/tweet-analysis-with-transformers-bert
dfPreTest['cleanContent'] = dfPreTest['rawContent'].apply(remove_URL)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_emoji)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_html)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_punct)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(lambda x: x.lower())
#%%
timeStart = datetime.now() # start counting execution time
max_length = 128
dfPreTest['input_ids'] = dfPreTest['cleanContent'].apply(lambda x: tokenizer(x, max_length=max_length, padding="max_length",)['input_ids'])
#train.rename(columns={'target': 'labels'}, inplace=True)
#train.head()
# %%
dfPreTest.to_csv(senCSVcPretestPrepPath, encoding='utf-8', columns=['id', 'cleanContent'])
#%%
dataset = load_dataset("csv", data_files=senCSVcPretestPrepPath)
# %%
results = pipe(KeyDataset(dataset, "text"))
# %%
#from tqdm.auto import tqdm
#for out in tqdm(pipe(KeyDataset(dataset['train'], "cleanContent"))):
# print(out)
#%%
output_labels = []
output_score = []
for out in pipe(KeyDataset(dataset['train'], "cleanContent"), batch_size=8, truncation="only_first"):
output_labels.append(out['label'])
output_score.append(out['score'])
# [{'label': 'POSITIVE', 'score': 0.9998743534088135}]
# Exactly the same output as before, but the content are passed
# as batches to the model
# %%
dfPreTest['output_label'] = output_labels
dfPreTest['output_score'] = output_score
timeEnd = datetime.now()
timeTotal = timeEnd - timeStart
timePerTweet = timeTotal / 96
print(f"Total classification execution time: {timeTotal} seconds")
print(f"Time per tweet classification: {timePerTweet}")
print(f"Estimated time for full classification of tweets: {timePerTweet*50183}")
# %%
dfPreTest.to_csv(senCSVcPretestResultPath, encoding='utf-8')
# %%