Commit e14a2d92 authored by Marcel Henrik Schubert's avatar Marcel Henrik Schubert
Browse files

added complete balance

parent 8959564f
#!/bin/bash -l
# Standard output and error:
# #SBATCH --open-mode=truncate
#SBATCH -o ./out/strat_comp.out
#SBATCH -e ./out/strat_comp.err
# Initial working directory:
#SBATCH -D ./
# Job Name:
#SBATCH -J strat_comp
# Queue:
#SBATCH --partition=short
# Number of nodes and MPI tasks per node:
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
# Enable Hyperthreading:
# #SBATCH --ntasks-per-core=2
# for OpenMP:
#SBATCH --cpus-per-task=32
#SBATCH --mail-type=none
#SBATCH --mail-user=schubert@coll.mpg.de
# Wall clock limit:
#SBATCH --time=04:00:00
export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK
# For pinning threads correctly:
export OMP_PLACES=cores
module load gcc/8
module load anaconda/3/5.1
module load scikit-learn/0.19.1
# Run the program:
srun python /draco/u/mschuber/PAN/attributionfeatures/Scripts/stratified_sample_balanced_complete_balance.py
echo "job finished"
#!/usr/bin/env python3
import pandas as pd
import numpy as np
import ndjson
import jsonlines
import json
import os
import sys
import random as rd
import json
import multiprocessing as mp
import re, regex
import gc
from statistics import median, mean, stdev
import math
import time
from imblearn.under_sampling import RandomUnderSampler
from sklearn.model_selection import train_test_split
import random
#
datapath = '/draco/ptmp/mschuber/PAN/Data/pan19-celebrity-profiling-training-dataset-2019-01-31/'
subset = 'workset'
filebeg = 'workset_preprocessed_'
filend = '.ndjson'
grams = ['singlegram', 'bigram']
outfolder = 'stratified_subsample/complete_balance'
sizes = [200, 500, 1000, 2000]
def make_file_paths(dirname):
os.makedirs(dirname, exist_ok=True)
def chunkify(fname, gram, size=np.NaN):
i = 0
strat = []
ids = []
authors = {}
minTweets = 1e+6
if gram ='singlegram':
subtractor = 2
else:
subtractor = 1
if np.isnan(size):
print('size is nan...look at single lines only')
sys.stdout.flush()
with open(fname, 'r', encoding='utf-8') as f:
nextLineByte = f.tell()
while True:
line = f.readline()
if line or line != '':
line = ndjson.loads(line)[0]
if len(line[gram])-subtractor > 24 and len(line[gram])-subtractor < 108:
age = 2019-line['birthyear']
autId = line['author_id']
if autId in authors:
authors[autId]['lineBytes'].append(nextLineByte)
else:
if age <22:
lifePhase = 'child_21'
elif age <36:
lifePhase = 'young_adult_35'
elif age < 51:
lifePhase = 'adult_50'
elif age <66:
lifePhase = 'old_adult_65'
else:
lifePhase = 'retiree'
strat.append(line['gender']+'_'+lifePhase)
ids.append(autId)
authors[autId] = {}
authors[autId]['lineBytes'] = [nextLineByte]
authors[autId]['age_group'] = lifePhase
nextLineByte = f.tell() #returns the location of the next line
#if i == 30000:
# break
i+=1
else:
break
else:
sys.exit('Chunking not yet implemented')
###this forces every group to have the same amount of tweets
limIds = []
i = 0
for ind, key in enumerate(ids):
leng = len(authors[key]['lineBytes'])
if leng < 600:
limIds.append(ind)
else:
if leng < minTweets:
minTweets = leng
count = 0
for i in limIds:
key = ids[i-count]
tmp = ids.pop(i-count)
tmp = strat.pop(i-count)
tmp = authors.pop(key)
count +=1
return authors, strat, ids, minTweets
def random_strat_draw(x, strat, size):
keep, throw = train_test_split(x, random_state = 123456, train_size = size, test_size = None, stratify = strat)
return keep
def balance_sampler(indices, balance):
indices = np.array(indices).reshape(-1,1)
rus = RandomUnderSampler(random_state=123456)
X_resampled, y_resampled = rus.fit_resample(indices, balance)
return list(X_resampled.reshape(1,-1)[0]), list(y_resampled)
def write_to_file(authors, subsample, minTweets, infile, outfile):
random.seed(123456)
o = open(outfile, 'w', encoding='utf-8')
writer = jsonlines.Writer(o, flush = True)
f = open(infile, 'r', encoding='utf-8')
for ident in subsample:
author = authors[ident]
lineBytes = author['lineBytes']
####random sampling of minNumber of Tweets
lineBytes = random.sample(lineBytes, minTweets)
for lineByte in lineBytes:
f.seek(lineByte)
lines = f.readline()
if lines or lines != '':
dic = ndjson.loads(lines)[0]
dic['age_group'] = author['age_group']
writer.write(dic)
writer.close()
f.close()
o.close()
def process_wrapper(filepath, outfile, size, gram):
print('get ids for {} with size {}'.format(gram, size))
sys.stdout.flush()
authors, strat, ids, minTweets = chunkify(filepath, gram)
print('minimum number of tweets are {}'.format(minTweets))
print('make balanced sample for {} with size {} from initially {}'.format(gram, size, len(ids)))
sys.stdout.flush()
ids, strat = balance_sampler(ids, strat)
print('undersampled for {} with subset size {} - number of observations is now {}'.format(gram, size, len(ids)))
sys.stdout.flush()
printsize = len(ids)
if len(ids) > size:
printsize = size
print('make random draw {} with size {}'.format(gram, size))
sys.stdout.flush()
ids = random_strat_draw(ids, strat, size)
if sizes.index(size) > 0:
index = sizes.index(size)-1
else:
index = 0
if printsize > sizes[index] or (printsize == sizes[index] and index == 0):
print('write to file for {} with size {}'.format(gram, size))
sys.stdout.flush()
write_to_file(authors, ids, minTweets, filepath, outfile+str(printsize)+'.ndjson')
else:
print('{} with size {} was too small ({}) - did not write to file'.format(gram, size, printsize))
print('done for {} with size {}'.format(gram, size))
sys.stdout.flush()
return 1
def main():
pool = mp.Pool(mp.cpu_count())
jobs = []
print('make outdirs')
sys.stdout.flush()
make_file_paths(datapath+outfolder)
print('done making dirs')
sys.stdout.flush()
for gram in grams:
for size in sizes:
job = pool.apply_async(process_wrapper, (datapath+subset+'/'+filebeg+gram+filend, datapath+outfolder+'/stratified_subsample_preprocessed'+'_'+gram+'_', size, gram))
jobs.append(job)
for job in jobs:
tmp = job.get()
pool.close()
pool.join()
print('done...wil now exit :)')
sys.stdout.flush()
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment