2023-07-07 18:18:51 +02:00

130 lines
4.2 KiB
Python

import re
import string
import numpy as np
import pandas as pd
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
from datasets import load_dataset
from transformers.pipelines.pt_utils import KeyDataset
from funs.CleanTweets import remove_URL, remove_emoji, remove_html, remove_punct
#%%
# prepare
# install xformers (pip install xformers) for better performance
###################
# Setup directories
# WD Michael
wd = "/home/michael/Documents/PS/Data/collectTweets/"
# WD Server
# wd = '/home/yunohost.multimedia/polsoc/Politics & Society/TweetCollection/'
# datafile input directory
di = "data/IN/"
# Tweet-datafile output directory
ud = "data/OUT/"
# Name of file that all senator data will be written to
senCSV = "ALL-SENATORS-TWEETS.csv"
# Name of new datafile generated
senCSVc = "Tweets-Stub.csv"
# Name of pretest files
preTestIDsFake = "pretest-tweets_fake.txt"
preTestIDsNot = "pretest-tweets_not_fake.txt"
# Name of pretest datafile
senCSVPretest = "Pretest.csv"
senCSVPretestPrep = "Pretest-Prep.csv"
senCSVPretestResult = "Pretest-Results.csv"
# don't change this one
senCSVPath = wd + ud + senCSV
senCSVcPath = wd + ud + senCSVc
senCSVcPretestPath = wd + ud + senCSVPretest
senCSVcPretestPrepPath = wd + ud + senCSVPretestPrep
senCSVcPretestResultPath = wd + ud + senCSVPretestResult
preTestIDsFakePath = wd + di + preTestIDsFake
preTestIDsNotPath = wd + di + preTestIDsNot
# List of IDs to select
# Read the IDs from a file
preTestIDsFakeL = []
preTestIDsNotL = []
with open(preTestIDsFakePath, "r") as file:
lines = file.readlines()
for line in lines:
tid = line.strip() # Remove the newline character
preTestIDsFakeL.append(tid)
with open(preTestIDsNotPath, "r") as file:
lines = file.readlines()
for line in lines:
tid = line.strip() # Remove the newline character
preTestIDsNotL.append(tid)
# Select rows based on the IDs
df = pd.read_csv(senCSVPath, dtype=(object))
#%%
# Create pretest dataframe
dfPreTest = df[df['id'].isin(preTestIDsFakeL)].copy()
dfPreTest['fake'] = True
dfPreTest = pd.concat([dfPreTest, df[df['id'].isin(preTestIDsNotL)]], ignore_index=True)
dfPreTest['fake'] = dfPreTest['fake'].fillna(False)
#%%
# https://huggingface.co/bvrau/covid-twitter-bert-v2-struth
# HowTo:
# https://huggingface.co/docs/transformers/main/en/model_doc/bert#transformers.BertForSequenceClassification
# https://stackoverflow.com/questions/75932605/getting-the-input-text-from-transformers-pipeline
pipe = pipeline("text-classification", model="bvrau/covid-twitter-bert-v2-struth")
model = AutoModelForSequenceClassification.from_pretrained("bvrau/covid-twitter-bert-v2-struth")
tokenizer = AutoTokenizer.from_pretrained("bvrau/covid-twitter-bert-v2-struth")
# Source https://www.kaggle.com/code/daotan/tweet-analysis-with-transformers-bert
dfPreTest['cleanContent'] = dfPreTest['rawContent'].apply(remove_URL)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_emoji)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_html)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_punct)
dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(lambda x: x.lower())
#%%
max_length = 128
dfPreTest['input_ids'] = dfPreTest['cleanContent'].apply(lambda x: tokenizer(x, max_length=max_length, padding="max_length",)['input_ids'])
#train.rename(columns={'target': 'labels'}, inplace=True)
#train.head()
# %%
dfPreTest.to_csv(senCSVcPretestPrepPath, encoding='utf-8', columns=['id', 'cleanContent'])
#%%
dataset = load_dataset("csv", data_files=senCSVcPretestPrepPath)
# %%
results = pipe(KeyDataset(dataset, "text"))
# %%
#from tqdm.auto import tqdm
#for out in tqdm(pipe(KeyDataset(dataset['train'], "cleanContent"))):
# print(out)
#%%
output_labels = []
output_score = []
for out in pipe(KeyDataset(dataset['train'], "cleanContent"), batch_size=8, truncation="only_first"):
output_labels.append(out['label'])
output_score.append(out['score'])
# [{'label': 'POSITIVE', 'score': 0.9998743534088135}]
# Exactly the same output as before, but the content are passed
# as batches to the model
# %%
dfPreTest['output_label'] = output_labels
dfPreTest['output_score'] = output_score
# %%
dfPreTest.to_csv(senCSVcPretestResultPath, encoding='utf-8')
# %%