import re import string import numpy as np import pandas as pd from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline from datasets import load_dataset from transformers.pipelines.pt_utils import KeyDataset #%% # prepare # install xformers (pip install xformers) for better performance ################### # Setup directories # WD Michael wd = "/home/michael/Documents/PS/Data/collectTweets/" # WD Server # wd = '/home/yunohost.multimedia/polsoc/Politics & Society/TweetCollection/' # datafile input directory di = "data/IN/" # Tweet-datafile output directory ud = "data/OUT/" # Name of file that all senator data will be written to senCSV = "ALL-SENATORS-TWEETS.csv" # Name of new datafile generated senCSVc = "Tweets-Stub.csv" # Name of pretest files preTestIDsFake = "pretest-tweets_fake.txt" preTestIDsNot = "pretest-tweets_not_fake.txt" # Name of pretest datafile senCSVPretest = "Pretest.csv" senCSVPretestPrep = "Pretest-Prep.csv" senCSVPretestResult = "Pretest-Results.csv" # don't change this one senCSVPath = wd + ud + senCSV senCSVcPath = wd + ud + senCSVc senCSVcPretestPath = wd + ud + senCSVPretest senCSVcPretestPrepPath = wd + ud + senCSVPretestPrep senCSVcPretestResultPath = wd + ud + senCSVPretestResult preTestIDsFakePath = wd + di + preTestIDsFake preTestIDsNotPath = wd + di + preTestIDsNot # List of IDs to select # Read the IDs from a file preTestIDsFakeL = [] preTestIDsNotL = [] with open(preTestIDsFakePath, "r") as file: lines = file.readlines() for line in lines: tid = line.strip() # Remove the newline character preTestIDsFakeL.append(tid) with open(preTestIDsNotPath, "r") as file: lines = file.readlines() for line in lines: tid = line.strip() # Remove the newline character preTestIDsNotL.append(tid) # Select rows based on the IDs df = pd.read_csv(senCSVPath, dtype=(object)) #%% # Create pretest dataframe dfPreTest = df[df['id'].isin(preTestIDsFakeL)].copy() dfPreTest['fake'] = True dfPreTest = pd.concat([dfPreTest, df[df['id'].isin(preTestIDsNotL)]], ignore_index=True) dfPreTest['fake'] = dfPreTest['fake'].fillna(False) #%% # https://huggingface.co/bvrau/covid-twitter-bert-v2-struth # HowTo: # https://huggingface.co/docs/transformers/main/en/model_doc/bert#transformers.BertForSequenceClassification # https://stackoverflow.com/questions/75932605/getting-the-input-text-from-transformers-pipeline pipe = pipeline("text-classification", model="bvrau/covid-twitter-bert-v2-struth") model = AutoModelForSequenceClassification.from_pretrained("bvrau/covid-twitter-bert-v2-struth") tokenizer = AutoTokenizer.from_pretrained("bvrau/covid-twitter-bert-v2-struth") # Source https://www.kaggle.com/code/daotan/tweet-analysis-with-transformers-bert def remove_URL(text): url = re.compile(r'https?://\S+|www\.\S+') return url.sub(r'', text) def remove_emoji(text): emoji_pattern = re.compile( '[' u'\U0001F600-\U0001F64F' # emoticons u'\U0001F300-\U0001F5FF' # symbols & pictographs u'\U0001F680-\U0001F6FF' # transport & map symbols u'\U0001F1E0-\U0001F1FF' # flags (iOS) u'\U00002702-\U000027B0' u'\U000024C2-\U0001F251' ']+', flags=re.UNICODE) return emoji_pattern.sub(r'', text) def remove_html(text): html = re.compile(r'<.*?>|&([a-z0-9]+|#[0-9]{1,6}|#x[0-9a-f]{1,6});') return re.sub(html, '', text) def remove_punct(text): table = str.maketrans('', '', string.punctuation) return text.translate(table) dfPreTest['cleanContent'] = dfPreTest['rawContent'].apply(remove_URL) dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_emoji) dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_html) dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(remove_punct) dfPreTest['cleanContent'] = dfPreTest['cleanContent'].apply(lambda x: x.lower()) #%% max_length = 128 dfPreTest['input_ids'] = dfPreTest['cleanContent'].apply(lambda x: tokenizer(x, max_length=max_length, padding="max_length",)['input_ids']) #train.rename(columns={'target': 'labels'}, inplace=True) #train.head() # %% dfPreTest.to_csv(senCSVcPretestPrepPath, encoding='utf-8', columns=['id', 'cleanContent']) #%% dataset = load_dataset("csv", data_files=senCSVcPretestPrepPath) # %% results = pipe(KeyDataset(dataset, "text")) # %% #from tqdm.auto import tqdm #for out in tqdm(pipe(KeyDataset(dataset['train'], "cleanContent"))): # print(out) #%% output_labels = [] output_score = [] for out in pipe(KeyDataset(dataset['train'], "cleanContent"), batch_size=8, truncation="only_first"): output_labels.append(out['label']) output_score.append(out['score']) # [{'label': 'POSITIVE', 'score': 0.9998743534088135}] # Exactly the same output as before, but the content are passed # as batches to the model # %% dfPreTest['output_label'] = output_labels dfPreTest['output_score'] = output_score # %% dfPreTest.to_csv(senCSVcPretestResultPath, encoding='utf-8') # %%