Commit 248264fa authored by Marcel Henrik Schubert's avatar Marcel Henrik Schubert
Browse files

added all recent changes and scripts

parent 628bde6d
This diff is collapsed.
This diff is collapsed.
......@@ -3,12 +3,14 @@ import sys
import gc
import json
import jsonlines
import pickle
import pandas as pd
import tables
#import tables
from scipy import sparse
import numpy as np
from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer
def store_sparse_mat(M, name, filename='store.h5'):
'''def store_sparse_mat(M, name, filename='store.h5'):
"""
Store a csr matrix in HDF5
......@@ -39,9 +41,9 @@ def store_sparse_mat(M, name, filename='store.h5'):
arr = np.array(getattr(M, attribute))
atom = tables.Atom.from_dtype(arr.dtype)
ds = f.create_carray(f.root, full_name, atom, arr.shape)
ds[:] = arr
ds[:] = arr'''
def load_sparse_mat(name, filename='store.h5'):
'''def load_sparse_mat(name, filename='store.h5'):
"""
Load a csr matrix from HDF5
......@@ -67,7 +69,7 @@ def load_sparse_mat(name, filename='store.h5'):
# construct sparse matrix
M = sparse.csr_matrix(tuple(attributes[:3]), shape=attributes[3])
return M
return M'''
def split_path_unix_win(path):
#make windows-unix problem go away
......@@ -84,7 +86,8 @@ def batch_read(fileobj, batchsize, to_json=True):
line = fileobj.readline()
if line and to_json:
line = jsonize(line)
line['uID'] = make_uids(line, 'tweetIDs')
line = make_uids(line, 'tweetIDs', secondary='tweetID')
lines.append(line)
elif line:
lines.append(line)
......@@ -105,37 +108,295 @@ def jsonize(line):
return line
def make_uids(dic, key):
dic[key] = '|'.join(dic[key])
def make_uids(dic, key, secondary='tweetID'):
if key not in dic.keys():
key = secondary
dic['uID'] = '|'.join(dic[key])
return dic
def select_uids(df, ids):
df = df.loc[df['uID'].isin(ids),:]
return df
def pandas_to_ndjson(df:pd.DataFrame, f):
df = df.to_records()
writer = jsonlines.Writer(df)
df = df.to_dict(orient='records')
writer = jsonlines.Writer(f)
writer.write_all(df)
print('wrote subset to file...')
sys.stdout.flush()
writer.close()
def make_save_dirs(path):
# create files for later saving
path = split_path_unix_win(path)
ind = path.index('preprocessed')
os.makedirs(os.path.join(*(path[0:ind]+ ['models'] + path[(ind+1):])))
model_dir = os.path.join(*(path[0:ind]+ ['models'] + path[(ind+1):]))
os.makedirs(model_dir, exist_ok=True)
print('made models directory with subdirs...')
sys.stdout.flush()
return model_dir
def identity_prepr(text):
return text
def reset_num(num:pd.DataFrame, fix:pd.DataFrame):
num = num.sort_values(by=['uID'])
fix = fix.sort_values(by=['uid']).copy(deep=True)
#fix emoji count
num.loc[num.isin(fix.uID), 'emojis_num'] = fix.emoji.apply(len)
num.loc[num.isin(fix.uID), 'emotic_num'] = fix.emoticon.apply(len)
num.sort_index(inplace=True)
return num
def identity_tokenizer(text):
return text
def identity_analyzer(text):
return text
\ No newline at end of file
return text
def zero_columns(M, columns):
diag = sparse.eye(M.shape[1]).tolil()
for c in columns:
diag[c, c] = 0
return M.dot(diag)
def make_vocab(dict, path):
with open(path, 'w', encoding='utf-8') as json_file:
json.dump(dict, json_file, ensure_ascii=False)
print('saved vocab to disk...')
sys.stdout.flush()
def get_vocab(path):
with open(path, 'r', encoding='utf-8') as json_file:
vocab = json.load(json_file)
return vocab
def update_vocab(vocab, lines:pd.DataFrame, key):
for el in lines[key]:
subvocab = {}
#basically make a set here - we do not want total frequency but doc frequency (i.e. in how many docs does it appear not how often does it appear overall
for token in el:
subvocab[token] = subvocab.get(token, 0) + 1
for token in subvocab.keys():
vocab[token] = vocab.get(token, 0) +1
return vocab
def reduce_vocab(vocab, cutoff):
ind = 0
new = {}
for key in vocab.keys():
if vocab[key] >=cutoff:
new[key] = ind
ind +=1
return new, ind
def vocab_maker(vocab):
ind = 0
new = {}
for key in vocab.keys():
new[key] = ind
ind +=1
return new, ind
def construct_tdm(docs, vocab, maxcol):
indptr = [0]
indices = []
data = []
exception_counter = 0
term_counter = 0
for d in docs:
for term in d:
term_counter +=1
index = vocab.get(term, 'exception') #if term does not appear often enough
if index == 'exception':
exception_counter +=1
continue
indices.append(index)
data.append(1)
#make it so that we have enough columns
indices.append(maxcol+1)
data.append(1)
indptr.append(len(indices))
print('went over {} terms'.format(term_counter))
print('of these {} are not in dict'.format(exception_counter))
sys.stdout.flush()
return sparse.csr_matrix((data, indices, indptr), dtype=int)
def find_variance_cutoff(pca, dat, cutoff=0.99, tol = 0.005, start = 100):
#recurisve cutoff search (divide and conquer)
print('make pca with {} components'.format(start))
sys.stdout.flush()
pca = pca.set_params(n_components=start)
#pca = pca.set_params(batch_size = round(start*3))
pca = pca.fit(dat)
diff = np.abs(pca.explained_variance_ratio_.sum() - cutoff)
print('Difference to cutoff is {}'.format(diff))
sys.stdout.flush()
if diff < tol:
print('Use {} components. That is {} of num_features'.format(start, round(start/dat.shape[1], 2)))
sys.stdout.flush()
return pca
#else we continue search recursively
if pca.explained_variance_ratio_.sum() > cutoff:
start = round(3*start/4)
return find_variance_cutoff(pca, dat, cutoff, tol, start = start)
else:
start = start + round((dat.shape[1]-start)/2)
return find_variance_cutoff(pca, dat, cutoff, tol, start=start)
def make_gram_dict(path:str):
gram_dict = {}
path = os.path.join(*split_path_unix_win(path))
for dir in os.listdir(path):
if dir not in ['vectors', 'process', 'age_ids', 'gender_ids']:
direct = os.path.join(path, dir)
if os.path.isdir(direct):
#make key char, num, etc.
gram_dict[dir] = {}
for tweetLen in os.listdir(direct):
subdirect = os.path.join(direct, tweetLen)
if os.path.isdir(subdirect):
#make key tweetlen 500, 250, 100
gram_dict[dir][str(tweetLen)] = {}
for target in os.listdir(subdirect):
target_direct = os.path.join(subdirect, target)
if os.path.isdir(target_direct):
#make target key
gram_dict[dir][str(tweetLen)][target] = {}
for subset in os.listdir(target_direct):
subset_dir = os.path.join(target_direct, subset)
if os.path.isdir(subset_dir):
#make subset key, i.e. train, test, val
gram_dict[dir][str(tweetLen)][target][subset] = {}
#now we only have files (sometimes wtih and sometimes without grams
files = os.listdir(subset_dir)
for file in files:
##now we have to go over the number of authors in each subset
authors = file.split('_')[-2]
#make or keep key with dictionary
gram_dict[dir][str(tweetLen)][target][subset][authors] = gram_dict[dir][str(tweetLen)][target][subset].get(authors, {})
if '_grams_' in file:
grams = file.split('_')[-4]
gram_dict[dir][str(tweetLen)][target][subset][authors][grams] = os.path.join(subset_dir, file)
else:
gram_dict[dir][str(tweetLen)][target][subset][authors]['no_gram'] = os.path.join(subset_dir, file)
gram_dict[dir][str(tweetLen)][target][subset][authors]['ids'] = os.path.join(path, '{}_ids'.format(target),
subset, '{}_ids_{}_{}_{}_authors_balanced.json'.format(subset,
target,
tweetLen,
authors))
return gram_dict
def assess_equal(dat1, ids1, dat2, ids2):
sym = set(ids1).symmetric_difference(set(ids2))
if len(ids1) > len(ids2):
assert len(set(ids2) - set(ids1)) == 0
print('data1 is ({}) larger than dat2 ({})...'.format(dat1.shape, dat2.shape))
sys.stdout.flush()
dat1 = assess_order(np.array(ids2), dat1, np.array(ids1))
ids1 = ids2
else:
assert len(set(ids1) - set(ids2)) == 0
print('data2 is ({}) larger than dat1 ({})...'.format(dat2.shape, dat1.shape))
sys.stdout.flush()
dat2 = assess_order(np.array(ids1), dat2, np.array(ids2))
ids2 = ids1
return dat1, ids1,dat2, ids2
def assess_order(sorter,data, data_ids):
if type(sorter) == type({}):
sorter = sorter[list(sorter.keys())[0]]
sorter = np.array(sorter).flatten()
assert type(data_ids) == type(np.array([]))
d = {k:v for v,k in enumerate(data_ids)}
sort_ids = [d[el] for el in sorter] #order we want the ids in
#d = {k:v for v,k in enumerate(sorter)}
#sort_ids = [d[el] for el in data_ids]
return data[sort_ids, :] #slice the array in correct order
def load_make_vectorizers(target, path, featuretyp, minTw_gram_auth, hash):
vectorizer_dic = {}
minTW = minTw_gram_auth[0]
numAuthors = minTw_gram_auth[2]
path_vocab = os.path.join(*split_path_unix_win(path),
featuretyp, str(minTW), 'vocab', target)
for gram in sorted(minTw_gram_auth[1]):
vectorizer_dic[str(gram)] = {}
if not hash:
path_vocab_red = os.path.join(path_vocab,
'{}_grams_{}_{}_{}_authors_reduced.vocab'.format(featuretyp, gram, minTW,
numAuthors))
vocab = get_vocab(path_vocab_red)
vectorizer_dic[str(gram)]['vect'] = CountVectorizer(vocabulary=vocab, analyzer=identity_analyzer)
vectorizer_dic[str(gram)]['vocab'] = vocab
else:
path_colids = os.join.path(path_vocab,
'{}_grams_{}_{}_{}_authors_reduced.colids'.format(featuretyp, gram, minTW,
numAuthors))
vectorizer_dic[str(gram)]['vect'] = HashingVectorizer(analyzer=identity_analyzer)
with open(path_colids, 'rb') as p:
colinds = pickle.load(p)
vectorizer_dic[str(gram)]['colinds'] = colinds
return vectorizer_dic
def make_prediction_dir(path, featuretyp, minTweet, target):
path = split_path_unix_win(path)
ind = path.index('preprocessed')
path[ind] = 'predictions'
path = os.path.join(*path, featuretyp, minTweet,target)
return path
def y_categories_target_key(target):
if 'age' in target:
target_key = 'centered_age'
cats = np.array([1947, 1963, 1975, 1985, 1995])
else:
target_key = 'gender'
cats = np.array(['male', 'female']) # male is dropped -> female mapped to 1
return cats, target_key
def save_sparse(matrix:sparse.csr_matrix, path:str):
sparse.save_npz(path, matrix)
print('saved sparse matrix to file...')
sys.stdout.flush()
def load_sparse(path:str):
matrix = sparse.load_npz(path+'.npz')
print('loaded sparse matrix from file...')
sys.stdout.flush()
return matrix
def save_uids(uids, path:str):
with open(path, 'wb') as w:
pickle.dump(uids, w)
def load_uids(path:str):
with open(path, 'rb') as w:
uids= pickle.load(w)
return uids
This diff is collapsed.
......@@ -377,7 +377,7 @@ if __name__ == "__main__":
args["path"] = "../../Data/pan19-celebrity-profiling-training-dataset-2019-01-31/preprocessed"
args["labelpath"] = "../../Data/pan19-celebrity-profiling-training-dataset-2019-01-31/labels.ndjson"
args["workset"] ='workset'
args["part"] =['creator']
args["part"] =['creator', 'performer']
args["minchars"] = [100, 250, 500]
args["authors"] = [1000, 500, 150, 50]
args["threshold"] = 0.1
......
#!/usr/bin/env python3
import pandas as pd
import numpy as np
import ndjson
import jsonlines
import json
import os
import sys
import random as rd
import json
import multiprocessing as mp
import re, regex
import gc
import psutil
import time
import psutil
datapath = '/cobra/ptmp/mschuber/PAN/Data/pan19-celebrity-profiling-training-dataset-2019-01-31/'
#datapath = '../Data/pan19-celebrity-profiling-training-dataset-2019-01-31/'
combinefile = 'combined_preprocess'
combined = 'combined_mult.ndjson'
#regex for rpeprocessing
http = re.compile(r'(https{0,1}:\S+)')
www = re.compile(r'(www\.\S+)')
emoji = regex.compile(r'\X')
tagging = re.compile(r'#\S+')
retweet = re.compile(r'(RT\s{0,1}@\w+(?![@,\\])[\w]:{0,1})')
mention = re.compile(r'(@\w+(?![@,\\])\w)')
def chunkify(fname,size=np.NaN):
linebytes = []
if np.isnan(size):
print('size is nan...look at single lines only')
i = 0
#fileEnd = os.path.getsize(fname)
with open(fname, 'r', encoding='utf-8') as f:
nextLineByte = f.tell()
while True:
linebytes.append(nextLineByte)
line = f.readline()
nextLineByte = f.tell() #returns the location of the next line
#if i == 10000:
# break
#i+=1
if not line or line == '':
break
else:
sys.exit('Chunking not yet implemented')
return linebytes
def preprocess(dic):
#tweet_pre = ['§BEGIN§']
tweet_pre = []
tweet_pre.extend(retweet.split(dic.pop('text_org', None)))
tweet_pre = [retweet.sub('§RETWEET§', tweet_pre[i]) for i in range(0, len(tweet_pre))]
#tweet_pre.append('§END§')
tmp_tweet = []
#find and substitute HTTP-URLs
for i in range(0, len(tweet_pre)):
tmp = http.split(tweet_pre[i])
tmp = [http.sub('§LINK§', tmp[j]) for j in range(0, len(tmp)) if tmp[j] != '']
tmp_tweet.extend(tmp)
tweet_pre = tmp_tweet
tmp_tweet = []
#find and substitute www-URLs
for i in range(0, len(tweet_pre)):
tmp = www.split(tweet_pre[i])
tmp = [www.sub('§LINK§', tmp[j]) for j in range(0, len(tmp)) if tmp[j] != '']
tmp_tweet.extend(tmp)
tweet_pre = tmp_tweet
tmp_tweet = []
#find and substitute mentions
for i in range(0, len(tweet_pre)):
tmp = mention.split(tweet_pre[i])
tmp = [mention.sub('§MENTION§', tmp[j]) for j in range(0, len(tmp)) if tmp[j] != '']
tmp_tweet.extend(tmp)
tweet_pre = tmp_tweet
tmp_tweet = []
#find and substitute Tagging
for i in range(0, len(tweet_pre)):
tmp = tagging.split(tweet_pre[i])
tmp = [tagging.sub('§TAG§', tmp[j]) for j in range(0, len(tmp)) if tmp[j] != '']
tmp_tweet.extend(tmp)
tweet_pre = tmp_tweet
tmp_tweet = []
#make character lists; keep unicode emojis as well as subsitutes as singel individual character
for i in range(0, len(tweet_pre)):
if tweet_pre[i] not in ['§RETWEET§', '§LINK§', '§MENTION§', '§TAGGING§', '§BEGIN§', '§END§']:
tmp = emoji.findall(tweet_pre[i])
tmp_tweet.extend(tmp)
else:
tmp_tweet.append(tweet_pre[i])
#dic['text_pre'] = tmp_tweet
#del tmp_tweet
gc.collect()
return dic, tmp_tweet
def ngrams(charlist):
bigrams = []
trigrams = []
quadgrams = []
leng = len(charlist)
for i in range(0, leng):
if i <leng-4:
quadgrams.append(charlist[i] + charlist[i+1] +
charlist[i+2] + charlist[i+3])
trigrams.append(charlist[i] + charlist[i+1] + charlist[i+2])
bigrams.append(charlist[i] + charlist[i+1])
elif i <leng-3:
trigrams.append(charlist[i] + charlist[i+1] + charlist[i+2])
bigrams.append(charlist[i] + charlist[i+1])
elif i<leng-1:
bigrams.append(charlist[i] + charlist[i+1])
return [bigrams, trigrams, quadgrams]
def process_wrapper(chunkStart, q):
res = []
grams = []
with open(datapath+combined, 'r', encoding='utf-8') as f:
f.seek(chunkStart)
lines = f.readline()
lines = ndjson.loads(lines)[0]
dic, text_pre = preprocess(lines)
res.append(dic)
grams.append(text_pre)
del text_pre
del dic
tmp = ngrams(grams[0])
grams.extend(tmp)
res.append(grams)
q.put(res)
del lines
del grams
del tmp
return 'finished'
def listener(q):
'''listens for messages on the q, writes to file. '''
#print('finished making files..')
sys.stdout.flush()
prof = ['sports', 'performer', 'creator', 'politics', 'manager','science', 'professional', 'religious']
writers = []
filehandles = []
pathmodify = ['singlegrams/', 'bigrams/', 'trigrams/', 'quadgrams/']
types =['', '_bigrams', '_trigrams', '_quadgrams']
for i in range(0, len(types)):
item = types[i]
gramwriter = []
f = open(datapath+pathmodify[i]+combinefile+'{}.ndjson'.format(item), mode='a', encoding='utf-8')
writer = jsonlines.Writer(f, flush=True)
gramwriter.append(writer)
filehandles.append(f)
for j in range(0, len(prof)):
job = prof[j]
f_p = open(datapath+pathmodify[i]+combinefile+'{}_{}.ndjson'.format(item, job), mode='a', encoding='utf-8')
writer_p = jsonlines.Writer(f_p, flush=True)
gramwriter.append(writer_p)
filehandles.append(f_p)
writers.append(gramwriter)
print('made writers...')
sys.stdout.flush()
while True:
print('waiting for item from queue...')
sys.stdout.flush()
m = q.get() ##Q:
print('got item from queue')
sys.stdout.flush()
if m == 'kill':
break
dic = m[0]
grams = m[1:5]
index = prof.index(dic['occupation'])+1
del m
for i in range(0, len(types)):
dic['text_pre'] = grams[0][i]
writers[i][0].write(dic)
writers[i][index].write(dic)
del dic
del grams
for r in range(0, len(writers)):
for w in range(0, len(writers[r])):
writers[r][w].close()
for r in range(0,len(filehandles)):
filehandles[r].close()
def make_file_paths():
prof = ['sports', 'performer', 'creator', 'politics', 'manager','science', 'professional', 'religious']
pathmodify = ['singlegrams/', 'bigrams/', 'trigrams/', 'quadgrams/']
types =['', '_bigrams', '_trigrams', '_quadgrams']
for el in pathmodify:
os.makedirs(datapath+el, exist_ok=True)
for i in range(0, len(types)):
item = types[i]
f = open(datapath+pathmodify[i]+combinefile+'{}.ndjson'.format(item), mode='w', encoding='utf-8')
f.flush()
f.close()
for j in range(0, len(prof)):