# -*- coding: utf-8 -*- """ Created on Wed May 9 14:56:32 2018 Version: 3.4.0 @author: Holger Niemann, Peter Drewelow, Yu Gao mainly to clean up the downloadversionIRdata code Tools for: checking IR images, calculate gain and offset again from check backgroundframes check coldframes ... """ import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches from IR_config_constants import portcamdict, IRCAMBadPixels_path, parameter_file_path import os import datetime #import h5py #import glob def get_OP_by_time(time_ns=None, shot_no=None, program_str=None): '''Derives operation phase (OP) of W7-X based on either: a nanosacond time stamp, a MDSplus style shot no. or a program ID. INPUT ----- time_ns: - integer of nanosecond time stamp, e.g. 1511972727249834301 (OPTIONAL) shot_no - integer of MDSplus style shot number, e.g. 171207022 (OPTIONAL) program_str - string of CoDaQ ArchiveDB style prgram number or date, e.g. '20171207.022' or '20171207' (OPTIONAL) RESULT ------ conn - MDSplus connection object, to be used in e.g. 1511972727249834301 read_MDSplus_image_simple(), read_MDSplus_metadata() ''' # derive operation phase (OP) from time as nanosecond time stamp or string if time_ns is not None: dateOP = datetime.datetime.utcfromtimestamp(time_ns/1e9) elif shot_no is not None: dateOP = datetime.datetime.strptime(str(shot_no)[:6], '%y%m%d') elif program_str is not None: dateOP = datetime.datetime.strptime(program_str[:8], '%Y%m%d') else: raise Exception('get_OP_by_time: ERROR! neither time, shot no. or program ID provided') if dateOP.year == 2017: if dateOP.month > 8 and dateOP.month < 12: OP = "OP1.2a" elif dateOP.month == 8 and dateOP.day >= 28: OP = "OP1.2a" elif dateOP.month == 12 and dateOP.day < 8: OP = "OP1.2a" else: OP = None elif dateOP.year >= 2018: return "OP1.2b" elif dateOP.year <= 2016 and dateOP.year >= 2015: if (dateOP.year == 2016 and dateOP.month <= 3) or (dateOP.year == 2015 and dateOP.month == 12): OP = "OP1.1" else: OP = None return OP def bestimmtheitsmass_general(data, fit): """ INPUT ------ RESULT ------ NOTE ------ """ R = 0 if len(fit) == len(data): mittel = np.sum(data)/len(data) qam = quad_abweich_mittel(fit, mittel) R = qam/(qam+quad_abweich(data, fit)) else: print("bestimmtheitsmass_general: Arrays must have same dimensions") return R def bestimmheitsmass_linear(data, fit, debugmode=False): """ INPUT ------ RESULT ------ NOTE ------ """ R2 = 0 if len(fit) == len(data): mittel_D = np.mean(data)#np.sum(data)/len(data) mittel_F = np.mean(fit) R2 = quad_abweich_mittel(fit, mittel_D)/quad_abweich_mittel(data, mittel_D) if debugmode: print(mittel_D, mittel_F, quad_abweich_mittel(fit, mittel_D), quad_abweich_mittel(data, mittel_D), R2) else: print("bestimmtheitsmass_linear: Arrays must have same dimensions") return R2 def quad_abweich_mittel(data, mittel): """ INPUT ------ RESULT ------ NOTE ------ """ R = 0 for i in data: R = R+(i-mittel)**2 return R def quad_abweich(data, fit): """ INPUT ------ RESULT ------ NOTE ------ """ R = 0 if len(fit) == len(data): for i in range(len(data)): R = R+(data[i]-fit[i])**2 else: print("quad_abweich: Arrays must have same dimensions") return R def find_nearest(array, value): """ INPUT ------ RESULT ------ NOTE ------ """ #a=array a = [x - value for x in array] mini = np.min(np.abs(a)) try: idx = a.index(mini) except: idx = a.index(-mini) return idx#array[idx] def check_coldframe(coldframe, references=None, threshold=0.5, plot_it=False): ''' return true/false and the quality factor INPUT ------ RESULT ------ NOTE ------ ''' shapi = np.shape(coldframe) ##function (np.arange(0,768)-384)**(2)/900-50 datasets = [] for i in [int(shapi[1]//4), int(shapi[1]//2), int(shapi[1]//4*3)]: dataline = coldframe[0:shapi[0], i] datasets.append(dataline-np.mean(dataline)) if references == None: references = [] for dat in datasets: mini = np.mean(dat[shapi[0]/2-50:shapi[0]/2+50]) a = (np.mean(dat[0:50])-mini)/(int(shapi[0]/2))**2 reference = a*(np.arange(0, shapi[0])-int(shapi[0]/2))**(2)+mini references.append(reference) bestimmtheit = [] if plot_it: plt.figure() plt.imshow(coldframe, vmin=np.mean(coldframe)-500, vmax=np.mean(coldframe)+500) plt.figure() for i_dat in range(len(datasets)): dat = datasets[i_dat] reference = references[i_dat] bestimmtheit.append(bestimmtheitsmass_general(dat, reference)) if plot_it: plt.plot(dat, label='data') plt.plot(reference, label='reference') # print(int(shapi[0]/2),1*(np.max(datasets[-1])-mini),mini) plt.legend() if np.mean(bestimmtheit) > threshold: return True, bestimmtheit else: return False, bestimmtheit def check_coldframe_by_refframe(coldframe, reference_frame, threshold=0.8, plot_it=False): ''' INPUT ------ RESULT ------ NOTE ------ ''' references = [] shapi = np.shape(reference_frame) for i in [int(shapi[1]//5), int(shapi[1]//2), int(shapi[1]//4*3)]: dataline = reference_frame[0:shapi[0], i] references.append(dataline-np.mean(dataline)) return check_coldframe(coldframe, references, threshold, plot_it) def check_backgroundframe(backgroundframe, threshold=50): ''' return true or false INPUT ------ RESULT ------ NOTE ------ ''' shapi = np.shape(backgroundframe) valid = True dataset = [] for i in [int(shapi[1]//4), int(shapi[1]//2), int(shapi[1]//4*3)]: referenceline = backgroundframe[0:shapi[0], i] meanref = referenceline-np.mean(referenceline) dataset.append(np.max(meanref)-np.min(meanref)) if np.mean(dataset) < threshold: valid = False return valid, np.mean(dataset) def read_bad_pixels_from_file(port, shot_no=None, program=None, time_ns=None): '''Reads bad pixels stored in *.bpx file on E4 server. Requires one of the optional arguments shot_no or program. IN port - integer of port no of camera shot_no - integer of MDSplus style shot number, e.g. 171207022 (OPTIONAL) program - string of CoDaQ ArchiveDB style prgram number or date, e.g. '20171207.022' or '20171207' (OPTIONAL) OUT bad_pixle_list - list of tuples (row,column) of pixel coordinates as integer INPUT ------ RESULT ------ NOTE ------ ''' if shot_no is not None: OP = get_OP_by_time(shot_no=shot_no) elif program is not None: OP = get_OP_by_time(program_str=program) elif time_ns is not None: OP = get_OP_by_time(time_ns=time_ns) else: raise Exception('read_bad_pixels_from_file: ERROR! Need either shot no. or program string.') port_name = 'AEF{0}'.format(port) bad_pixel_file = 'badpixel_{0}.bpx'.format(portcamdict[OP][port_name][6:]) try: data = np.genfromtxt(IRCAMBadPixels_path+bad_pixel_file, dtype=int) bad_pixle_list = list(zip(data[:, 1], data[:, 0])) except: bad_pixle_list = [] return bad_pixle_list def find_outlier_pixels(frame, tolerance=3, plot_it=False):#worry_about_edges=True, ''' This function finds the bad pixels in a 2D dataset. Tolerance is the number of standard deviations used for cutoff. INPUT ------ RESULT ------ NOTE ------ ''' frame = np.array(frame)#, dtype=int) from scipy.ndimage import median_filter blurred = median_filter(frame, size=9) difference = frame - blurred threshold = tolerance*np.std(difference) mean = np.mean(difference) if plot_it: fig = plt.figure() fig.suptitle('find_outlier_pixels: histogram') plt.hist(difference.ravel(), 50, log=True, histtype='stepfilled') plt.axvline(mean, linewidth=2, color='k', label='mean') x1 = mean - np.std(difference) x2 = mean + np.std(difference) plt.axvspan(x1, x2, linewidth=2, facecolor='g', alpha=0.1, label='standard deviation') x1 = mean - tolerance*np.std(difference) x2 = mean + tolerance*np.std(difference) plt.axvspan(x1, x2, linewidth=2, facecolor='r', alpha=0.1, label='threshold for bad pixel') plt.legend() plt.show() #find the hot pixels bad_pixels = np.transpose(np.nonzero((np.abs(difference) > threshold))) bad_pixels = (bad_pixels).tolist() bad_pixels = [tuple(l) for l in bad_pixels] if plot_it: plt.figure() plt.imshow(frame) for i in range(len(bad_pixels)): plt.scatter(bad_pixels[i][1], bad_pixels[i][0], c='None') plt.show() return bad_pixels def correct_images(images, badpixels, verbose=0): ''' INPUT ------ RESULT ------ NOTE ------ ''' if type(badpixels) != int: if type(images) == list: # return corrected images also as list of 2D arrays for i in range(len(images)): images[i] = restore_bad_pixels(images[i], np.invert(badpixels == 1), verbose=verbose-1) else: # keep shape images = restore_bad_pixels(images, np.invert(badpixels == 1), verbose=verbose-1) # for i in range(len(images)): # images[i]=(restore_pixels(images[i],np.invert(badpixels==1))).astype(np.float32) if verbose > 0: print("correct_images: done") return images def restore_bad_pixels(frames, bad_pixel, by_list=True, check_neighbours=True, plot_it=False, verbose=0): """Restore bad pixel by interpolation of adjacent pixels. Optionally make sure that adjacent pixels are not bad (time consuming). Default is to use a list of bad pixels and a for loop. For many bad pixels consider using the optinal alternative using a bad pixel mask. INPUT ------ frames - either list of frames as 2D numpy array, or 3D numpy array (frame number, n_rows, n_cols), or 2D numpy array (n_rows, n_cols) bad_pixel - either list of tuples of bad pixel coordinates, or mask of pixel status (good=True, bad=False) by_list - boolean of whether to use a list and a for loop (True), or to use a mask of bad pixel and array operations (False) (OPTIONAL: if not provided, True (list) is default) check_neighbours - boolean of whether to check if neighbours of a bad pixel are not bad either before computing a mean (works only in list mode!) (OPTIONAL: if not provided, check is on) plot_it - boolean to decide whether to plot intermediate results or not (OPTIONAL: if not provided, switched off) verbose - integer of feedback level (amount of prints) (OPTIONAL: if not provided, only ERROR output) RESULT ------ frames - 3D numpy array (frame number, n_rows, n_cols) of corrected frames NOTE ------ """ # make sure frames is correctly shaped if type(frames) == list: frames = np.array(frames) frame_shape = 'list' else: if len(np.shape(frames)) == 2: frames = np.array([frames]) frame_shape = '2D' elif len(np.shape(frames)) == 3: frame_shape = '3D' pass else: raise Exception('restore_bad_pixels: ERROR! Unexpected shape of frames.') frame_dtype = frames.dtype # frames = frames.astype(float) n_frames, n_rows, n_cols = np.shape(frames) if plot_it: start_frame = np.copy(frames[0]) # make sure bad pixel are provided as mask and list if type(bad_pixel) is list: blist = bad_pixel bmask = np.ones([n_rows, n_cols], dtype=bool) for pix in blist: try: bmask[pix] = False except Exception as E: Warning(E) bmask = np.invert(bmask) else: if np.shape(bad_pixel)[0] == n_rows and np.shape(bad_pixel)[1] == n_cols: bmask = np.invert(bad_pixel) x, y = np.where(bmask) blist = list(zip(x, y)) else: raise Exception('restore_bad_pixels: ERROR! bad_pixel in bad shape {0}'.format(np.shape(bad_pixel))) if verbose > 0: print('restore_bad_pixels: {0} bad pixels to be restored: {1} ... '.format(len(blist), blist[:3])) # expand frame by rows and columns of zeros to simplify treatment of edges frames = np.dstack([np.zeros([n_frames, n_rows], dtype=frame_dtype), frames, np.zeros([n_frames, n_rows], dtype=frame_dtype)]) frames = np.hstack([np.zeros([n_frames, 1, n_cols+2], dtype=frame_dtype), frames, np.zeros([n_frames, 1, n_cols+2], dtype=frame_dtype)]) bmask = np.vstack([np.zeros([1, n_cols], dtype=bool), bmask, np.zeros([1, n_cols], dtype=bool)]) bmask = np.hstack([np.zeros([n_rows+2, 1], dtype=bool), bmask, np.zeros([n_rows+2, 1], dtype=bool)]) # define number of neighbours (up to 4) ina an array of expanded frame size n_neighbours = np.ones([n_frames, n_rows+2, n_cols+2])*4 n_neighbours[:, 1, :] = 3 n_neighbours[:, -2, :] = 3 n_neighbours[:, :, 1] = 3 n_neighbours[:, :, -2] = 3 n_neighbours[:, 1, 1] = 2 n_neighbours[:, 1, -2] = 2 n_neighbours[:, -2, 1] = 2 n_neighbours[:, -2, -2] = 2 if by_list: # ===== correct bad pixels using the list of bad pixels ===== # for pos in blist: # Note: # pos points to real frame coordinates, while bmask, n_neighbours have been expanded! if check_neighbours: # takes only neighbours that are not bad pos_l = np.where(bmask[pos[0]+1,:pos[1]+1] == False)[0] if len(pos_l) != 0: pos_l = pos_l[-1] else: pos_l = pos[1]+1 pos_r = np.where(bmask[pos[0]+1,pos[1]+1:] == False)[0] if len(pos_r) != 0: pos_r = pos_r[0] + pos[1]+1 else: pos_r = pos[1]+2 pos_t = np.where(bmask[:pos[0]+1,pos[1]+1] == False)[0] if len(pos_t) != 0: pos_t = pos_t[-1] else: pos_t = pos[0]+1 pos_b = np.where(bmask[pos[0]+1:,pos[1]+1] == False)[0] if len(pos_b) != 0: pos_b = pos_b[0] + pos[0]+1 else: pos_b = pos[0]+2 else: # insensitive to neighbours being bad as well! pos_l = pos[1] pos_r = pos[1]+2 pos_t = pos[0] pos_b = pos[0]+2 average = (frames[:,pos[0]+1,pos_l].astype(float) + frames[:,pos[0]+1,pos_r].astype(float) + frames[:,pos_t,pos[1]+1].astype(float) + frames[:,pos_b,pos[1]+1].astype(float)) / n_neighbours[:,pos[0]+1,pos[1]+1] frames[:,pos[0]+1,pos[1]+1] = average.astype(frame_dtype) frames = frames[:,1:-1,1:-1] else: # ======= correct bad pixels using the bad pixel mask ======= # # (insensitive to neighbours being bad as well!) # prepare mask arrays for neighbours by shifting it to left, right, top and bottom bmask_l = np.hstack([bmask[:, 1:], np.zeros([n_rows+2, 1], dtype=bool)]) bmask_r = np.hstack([np.zeros([n_rows+2, 1], dtype=bool), bmask[:, :-1]]) bmask_t = np.vstack([bmask[1:, :], np.zeros([1, n_cols+2], dtype=bool)]) bmask_b = np.vstack([np.zeros([1, n_cols+2], dtype=bool), bmask[:-1, :]]) # ----------------------------------- # restore by mask # frames[:,bmask] = ( (frames[:,bmask_l].astype(float) + frames[:,bmask_r].astype(float) + frames[:,bmask_t].astype(float) + frames[:,bmask_b].astype(float)) / n_neighbours[:,bmask] ).astype(frame_dtype) frames = frames[:,1:-1,1:-1] # plot comparison if plot_it: plt.figure() plt.title('bad pixel correction of first frame') m = np.mean(start_frame) s = np.std(start_frame) plt.imshow(start_frame, vmin=m-s, vmax=m+s) plt.colorbar() x,y = zip(*blist) plt.scatter(y,x, marker='o', s=5, c='r', linewidths=1) plt.tight_layout() plt.show() if frame_shape == 'list': frames = list(frames) elif frame_shape == '2D' and len(np.shape(frames))==3: frames = frames[0] return frames def generate_new_hot_image(cold,reference_cold,reference_hot): ''' INPUT ------ RESULT ------ NOTE ------ ''' if cold is None or reference_cold is None or reference_hot is None: raise Exception("generate_new_hot_image: Cannot Calculate new Hot image, if images are missing!") else: return reference_hot+(cold-reference_cold) def calculate_gain_offset_image_pix(cold_image,hot_image=None,reference_cold=None,reference_hot=None,verbose=0): ''' INPUT ------ RESULT ------ NOTE ------ ''' if hot_image is None: hot_image=generate_new_hot_image(cold_image,reference_cold,reference_hot) if verbose>0: print("calculate_gain_offset_image_pix: calculate gain and offset") Sh_ref = hot_image[ ( np.int( np.shape(hot_image)[0] /2 ) ) ][np.int( (np.shape(hot_image)[1] /2 ) ) ] Sc_ref = cold_image[ ( np.int( (np.shape(cold_image)[0]) /2 ) ) ][( np.int( (np.shape(cold_image)[1]) /2 ) ) ] Gain_rel = ( Sh_ref - Sc_ref ) / ( hot_image - cold_image) Off_h_rel = Sh_ref - hot_image*Gain_rel Off_c_rel = Sc_ref - cold_image*Gain_rel Offset_rel = ( Off_h_rel + Off_c_rel ) /2 return Gain_rel,Offset_rel def calculate_gain_offset_image(cold_image,hot_image=None,reference_cold=None,reference_hot=None,verbose=0): """ INPUT ------ RESULT ------ NOTE ------ """ if hot_image is None: hot_image=generate_new_hot_image(cold_image,reference_cold,reference_hot) if verbose>0: print("calculate_gain_offset_image: calculate gain and offset") # Sh_ref = hot_image[ ( np.int( np.shape(hot_image)[0] /2 ) ) ][np.int( (np.shape(hot_image)[1] /2 ) ) ] # Sc_ref = cold_image[ ( np.int( (np.shape(cold_image)[0]) /2 ) ) ][( np.int( (np.shape(cold_image)[1]) /2 ) ) ] # print(hot_image[( np.int( np.shape(hot_image)[0]/2) )-2: (np.int( np.shape(hot_image)[0]/2))+3,np.int((np.shape(hot_image)[1]/2))-2:np.int((np.shape(hot_image)[1]/2))+3 ]) # print(cold_image[( np.int( np.shape(hot_image)[0]/2) )-2: (np.int( np.shape(hot_image)[0]/2))+3,np.int((np.shape(hot_image)[1]/2))-2:np.int((np.shape(hot_image)[1]/2))+3 ]) Sh_ref = np.mean(hot_image[( np.int(np.shape(hot_image)[0]/2))-2: (np.int(np.shape(hot_image)[0]/2))+3,np.int((np.shape(hot_image)[1]/2))-2:np.int((np.shape(hot_image)[1]/2))+3]) Sc_ref = np.mean(cold_image[( np.int(np.shape(cold_image)[0]/2))-2: (np.int(np.shape(cold_image)[0]/2))+3,np.int((np.shape(cold_image)[1]/2))-2:np.int((np.shape(cold_image)[1]/2))+3]) difference_image = hot_image - cold_image indexlist = np.where(difference_image==0) difference_image[indexlist] = 0.001 Gain_rel = ( Sh_ref - Sc_ref ) / ( difference_image) Gain_rel[indexlist]=0 Off_h_rel = Sh_ref - hot_image*Gain_rel Off_c_rel = Sc_ref - cold_image*Gain_rel Offset_rel = ( Off_h_rel + Off_c_rel ) /2 return Gain_rel, Offset_rel #%% functions from Yu Gao #""" functions by Yu Gao""" #outdated by download_hot_cold_reference in downloadversionIRdata. removed to remove dependency on data on the E4-drive #============================================================================== # def load_ref_images(port, exposuretime, verbose=0): # ''' # load the reference cold and hot frame during calibration from local files. # @port: e.g. 'AEF10' # @exposuretime: int number. # ''' # cameraname = portcamdict['OP1.2a'][port] # foldername = cameraname.split('_')[0] + '_' + cameraname.split('_')[2] # scanpath = os.path.join(IRCamRefImagespath, foldername) # coldref, hotref = [], [] # for filename in glob.iglob(scanpath + '\*' + str(int(exposuretime)) + 'us.h5', recursive=True): # if 'hot' in filename: # if verbose>0: # print('load_ref_images: read from ',filename) # with h5py.File(filename, 'r') as h5in: # hotref = h5in[os.path.basename(filename)].value # elif 'cold' in filename: # if verbose>0: # print('load_ref_images: read from ',filename) # with h5py.File(filename, 'r') as h5in: # coldref = h5in[os.path.basename(filename)].value # return coldref, hotref #============================================================================== def reconstruct_coldframe (exposuretime, sT, a, bnew, coldref): """ INPUT ------ exposuretime: integer the exposure time sT: a: bnew: coldref: numpy array the reference cold frame as the base for the reconstruction RESULT ------ NOTE ------ """ cirebuild = a * sT + bnew * exposuretime + coldref return cirebuild #%% other functions def check_dublicates(array): """ INPUT ------ RESULT ------ NOTE ------ """ a = array import collections return [item for item, count in collections.Counter(a).items() if count > 1] def check_dublicates_2(array): """ INPUT ------ RESULT ------ NOTE ------ """ seen = set() uniq = [] for x in array: if x not in seen: uniq.append(x) seen.add(x) return uniq,seen def get_work_list(pipepath,typ="q"): """ INPUT ------ pipepath: string the path to the folder where the files are located typ: string the typ of data which is requested in the working list\n possiblities: q, Aw, qpeak, width, load\n or anything else for the problematic programs RESULT ------ cam_programs: list a list containing two coloumns, cameras and programs reasons: list, optional, only for problematic programs a list showing the reasons, why data are not processed NOTE ------ """ today=datetime.datetime.now() cam_programs=[] if typ in ('q_old','load_old'): f=open(pipepath+str(today.year)+str(today.month)+"_"+typ+"_requests.txt") elif typ in ('q','load','qpeak','Aw','width'): f=open(pipepath+"Auto_"+typ+"_requests.txt") else: reasons=[] f = open(pipepath+"problematic_programs.txt") for line in f: koline=line.split("\t") if len(koline)>1: prog = koline[0] if typ in ('q','load'): cam_programs.append((prog,koline[1].split("\n")[0])) else: cam_programs.append((prog,koline[1])) reasons.append(koline[2].split("\n")[0]) f.close() if typ in ('q','load'): bla=check_dublicates_2(cam_programs) cam_programs=bla[0] return cam_programs else: return cam_programs,reasons #%% functions regarding wetted area calculation def read_finger_info(file_name=None, OP='OP1.2b', verbose=0): '''Read divertor finger information from file. The referenced fingers are those defined in the IR profile mapping, i.e. the target modlues 5 and 6 are divided in inner and outer fingers (see 'finger_part' in result dictionary). INPUT ----- file_name: str, optional file name of csv fiel with finger information (OPTIONAL: default is None, i.e. decide by OP) OP: str, optional label of operation phase of interest, e.g. 'OP1.2b' (OPTIONAL: default is 'OP1.2b', i.e. load TDU file) verbose: integer, optional feedback level (details of print messages) (OPTIONAL: if not provided, only ERROR output) RESULT ------ finger_dic: dictionary dictionary with keys 'ID', 'target', 'target_element','n_profiles', 'width', 'finger_part' (see NOTES) NOTES ----- contents of result dictionary: * 'ID' numpy array of integers of continuous finger number * 'target' list of strings of target identifier ('h_l' horizontal low-iota, 'h_m' horizontal middle part, 'h_h' horizontal high-iota, 'v' vertical) * 'target_element' numpy array of integers of target module number (1..9 on horizontal target, 1..3 on vertical target) * 'n_profiles' numpy array of integers of number of profiles defined on this finger * 'width' numpy array of floats of centre width of finger in meters * 'finger_part' numpy array of integers indicating with 0 this is a full finger and with 1 this is the second part of the previous finger ''' if file_name is None: # assume OP is given if OP.startswith('OP1'): file_name='finger_info_TDU.csv' elif OP.startswith('OP2'): file_name='finger_info_HHF.csv' full_path = os.path.join(parameter_file_path, file_name) print(full_path) if verbose > 0: print('read_finger_info: reading from file {0} in {1}'.format(file_name, parameter_file_path)) if not os.path.isfile(full_path): raise Exception('read_finger_info: ERROR! file not found') finger_dic = {'ID': [], 'target': [], 'target_element':[], 'n_profiles':[], 'width':[], 'finger_part':[]} data = np.genfromtxt(full_path, delimiter=';', dtype=(int, "|S3", int, int, float, int)) for i in range(len(data)): finger_dic['ID'].append( data[i][0] ) finger_dic['target'].append( data[i][1].decode('UTF-8') ) finger_dic['target_element'].append( data[i][2] ) finger_dic['n_profiles'].append( data[i][3] ) finger_dic['width'].append( data[i][4] ) finger_dic['finger_part'].append( data[i][5] ) finger_dic['ID'] = np.array(finger_dic['ID']) finger_dic['target_element'] = np.array(finger_dic['target_element']) finger_dic['n_profiles'] = np.array(finger_dic['n_profiles']) finger_dic['width'] = np.array(finger_dic['width']) finger_dic['finger_part'] = np.array(finger_dic['finger_part']) return finger_dic def derive_strike_line_width_per_module(heat_flux, mapping,mode='average', q_max=None, profile_average_range=3, noise_threshold=2E5, ports_loaded=None, verbose=0): ''' Derive strike-line width of heat flux array by integrating the total power load and dividing with a peak heat flux value. The peak heat flux value is either: 1) the maximum within each divertor finger (mode='finger') 2) the maximum of a whole divertor module (mode='module') 3) the maximum of the mean divertor averaged toroidally (mode='average'). Mode average is default. In case the input heat flux has only 2 dimensions (from one divertor), mode 'average' and 'module' result in the same. Returned are the strike-line width(s) and the corresponding q_max value(s). INPUT ----- heat_flux: numpy array array of heat fluxes from THEODOR on the profiles defined in the IR mapping; can be 2D (one divertor), or 3D (multiple divertor modules) mapping: dictionary IR profile mapping information as returned by downloadversionIRdara.download_heatflux_mapping_reference(); minimum necessary keys are 'finger_ID' and 's' mode: str, optional label to identify the normalization mode, either 'module' (normalize by peak heat flux per torus module), 'average' (normalize by peak heat flux in the toroidally mean heat flux pattern), 'finger' (normalize by peak heat flux per finger in each torus module) (OPTIONAL: default is 'average') q_max: float or numpy array, optional either single peak heat flux value or a peak heat flux for each divertor module or each finger (depends on mode) (OPTIONAL: default is None, i.e. derived based on mode) profile_average_range: int, optional number of central profiles on each finger to average the integral heat flux on (this avoids hot leading edges and shadowed edges) (OPTIONAL: default is 3 profiles) noise_threshold: float, optional minimum heat flux level to crop heat_flux to, if heat flux has negative values (OPTIONAL: default is 200kW/m²) ports_loaded: list or str or int, optional if mode not 'average' label of divertor modules provided in heat_flux array for plots; int of port number for single divertor data and list of port numbers for heat flux from multiple divertor modules; # gets renamed if a mean heat flux pattern is used (mode 'average') (OPTIONAL: default is None, i.e. label will be 'w_s') verbose: integer, optional feedback level (details of print messages) (OPTIONAL: if not provided, only ERROR output) RESULT ------ total_mean_width: float or numpy array mean strike-line width in a shape that depends on the mode (see NOTES) q_max: float or numpy array peak heat flux used for normalizatin in a shape that depends on the mode (see NOTES) NOTES ----- The shape of the results varies depending on the dimension of the input * 2D: singel divertor modules heat flux * 3D: heat flux from multiple divertor modules and the mode to derive q_max ('module', 'average', 'finger'): * '2D' + 'module' or 'average' --> one value for total_wetted_area and q_max * '3D' + 'average' --> 1D numpy arrays with two values for total_wetted_area and q_max (upper and lower divertors) * '3D' + 'module' --> 1D numpy arays with a value for each torus module (first dimension of heat_flux) * '2D' + 'finger' --> 1D numpy arays with a value for each divertor finger * '3D' + 'finger' --> 2D numpy arays with a value for each torus module and each divertor finger ''' #check input heat_flux_dim = len(np.shape(heat_flux)) if mode == 'average' and ports_loaded == None and heat_flux_dim>2: raise Exception("derive_strike_line_width_per_module: ports must be specified in average mode since V3.3.2") elif mode == 'average' and heat_flux_dim > 2: try: llen=len(ports_loaded) except: raise Exception("derive_strike_line_width_per_module: each divertor need a description to calcualte proper the wetted area!") else: if llen!=len(heat_flux): raise Exception("derive_strike_line_width_per_module: number of given divertors and number of descriptions does not match!") # prepare mapping and finger information finger_dic = read_finger_info(verbose=verbose-1) finger_ID = finger_dic['ID'] profile_no = mapping['Finger_ID'][0] # find profile IDs of central profiles on each finger central_profiles_on_finger = [] is_central_profile = [] for i_finger in range(len(finger_ID)): n_profiles = finger_dic['n_profiles'][i_finger] i_profile_start = n_profiles//2 - profile_average_range//2 -1 central_profiles = i_finger*100 + np.arange(i_profile_start, i_profile_start+profile_average_range) central_profiles_on_finger.append(central_profiles) is_central_profile.append(np.logical_or.reduce([profile_no == centre_profile for centre_profile in central_profiles])) central_profiles_on_finger = np.array(central_profiles_on_finger) if np.nanmin(heat_flux) < 0: heat_flux[heat_flux0: print('derive_strike_line_width_per_module: set heat_flux < {0:.1f}kW/m² to 0'.format(noise_threshold/1E3)) # reduce dimension of heat_flux if in 'average' mode if heat_flux_dim == 2 and mode == 'average': mode = 'module' elif heat_flux_dim == 3 and mode == 'average': # heat_flux = np.nanmean(heat_flux, axis=0) ## sort the divertors updiv=[] downdiv=[] for i in range(len(ports_loaded)): # entries in the array/list are either int or str or even float try: port=int(ports_loaded[i]) except: #okay it is not an int or an int like string ## what can it be? 'AEFXX'? But what about OP2 with the A or K ports? Still be 3 letters port=int(ports_loaded[i][3:]) if port%10==0: downdiv.append(heat_flux[i]) else: updiv.append(heat_flux[i]) heat_flux=np.array([np.nanmean(np.asarray(updiv),axis=0),np.nanmean(np.asarray(downdiv),axis=0)]) del downdiv,updiv # heat_flux_dim = 3 ports_loaded = [1,0]#'upper divertor','lower divertor']#'mean heat flux' mode = 'module' if verbose>0: print('derive_strike_line_width_per_module: averaged 3D heat flux array over first dimension') if heat_flux_dim==3: # assume dimensions: toroidal index (camera ports), row, column n_ports = np.shape(heat_flux)[0]#, n_rows, n_cols if verbose>0: print('derive_strike_line_width_per_module: deriving wetted area on {0} divertor modules in {1} mode...'.format(n_ports, mode)) # derive q_max for normalization of integral if q_max is None: # derive q_max on every finger q_max_on_finger = [] for i_finger in range(len(finger_ID)): q_max_on_finger.append([np.nanmax(h[is_central_profile[i_finger]]) for h in heat_flux]) q_max_on_finger = np.array(q_max_on_finger) q_max_on_finger[q_max_on_finger==0] = 1 if mode == 'module': # one value per torus half module q_max = np.nanmax(q_max_on_finger, axis=0) elif mode == 'finger': # one value per finger in each torus half module q_max = q_max_on_finger q_max[q_max==0] = 1 # integrate over profiles # goodcounter=np.zeros([n_ports]) finger_strikeline_width = np.zeros([len(finger_ID), n_ports]) strikeline_int = np.zeros([len(finger_ID), n_ports]) for i_finger in range(len(finger_ID)): # initialize temporary line integral for each module central_line_integral = np.zeros(n_ports) # integrate over each central profile and average for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) s = mapping['s'][ij_profile] h = heat_flux[:,ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num( np.trapz(h, x=s, axis=1) ) # complete averaging process central_line_integral = central_line_integral / profile_average_range central_line_integral = central_line_integral *((central_line_integral>noise_threshold*max(s))*1) # goodcounter+=(central_line_integral>noise_threshold*max(s))*1 # normalize by q_max and multiply with width of finger strikeline_int[i_finger,:] = central_line_integral if mode == 'module': finger_strikeline_width[i_finger,:] = central_line_integral / q_max #* finger_dic['width'][i_finger] elif mode == 'finger': finger_strikeline_width[i_finger,:] = central_line_integral / q_max[i_finger] #* finger_dic['width'][i_finger] elif heat_flux_dim==2: # assume dimensions: row, column if verbose>0: print('derive_strike_line_width_per_module: deriving wetted area on single divertor module in {0} mode...'.format(mode)) # derive q_max for normalization of integral if q_max is None: if mode == 'average' or mode == 'module': # one value q_max = np.nanmax(heat_flux[np.logical_or.reduce(is_central_profile)]) elif mode == 'finger': # one value per finger q_max = [] for i_finger in range(len(finger_ID)): q_max.append( np.nanmax(heat_flux[is_central_profile[i_finger]]) ) q_max = np.array(q_max) q_max[q_max==0] = 1 # integrate over profiles # goodcounter=0 finger_strikeline_width = np.zeros([len(finger_ID)]) strikeline_int = np.zeros([len(finger_ID)]) for i_finger in range(len(finger_ID)): # integrate over each central profile and average central_line_integral = 0 for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) s = mapping['s'][ij_profile] h = heat_flux[ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num( np.trapz(h, x=s, axis=0) ) central_line_integral = central_line_integral / profile_average_range central_line_integral = central_line_integral *((central_line_integral>noise_threshold*max(s))*1) # goodcounter+=(central_line_integral>noise_threshold*s)*1 strikeline_int[i_finger] = central_line_integral if mode == 'average' or mode == 'module': finger_strikeline_width[i_finger] = central_line_integral / q_max #* finger_dic['width'][i_finger] elif mode == 'finger': finger_strikeline_width[i_finger] = central_line_integral / q_max[i_finger] #* finger_dic['width'][i_finger] # merge half-fingers of TM5 and TM6 if np.any(finger_dic['finger_part']): if verbose>0: print('derive_strike_line_width_per_module: merge wetted area on half fingers of TM05 and TM06') new_finger_ID = np.copy(finger_ID) # scan backwards over fingers, merge and delete second finger halfs for i_finger in finger_ID[:0:-1]: if finger_dic['finger_part'][i_finger]: new_finger_ID = np.delete(new_finger_ID, i_finger) if heat_flux_dim==3 and mode != 'average': finger_strikeline_width[i_finger-1,:] = finger_strikeline_width[i_finger-1,:] + finger_strikeline_width[i_finger,:] strikeline_int[i_finger-1,:] = strikeline_int[i_finger-1,:] + strikeline_int[i_finger,:] else: finger_strikeline_width[i_finger-1] = finger_strikeline_width[i_finger-1] + finger_strikeline_width[i_finger] strikeline_int[i_finger-1] = strikeline_int[i_finger-1] + strikeline_int[i_finger] finger_strikeline_width = np.delete(finger_strikeline_width, i_finger, axis=0) strikeline_int=np.delete(strikeline_int, i_finger, axis=0) if mode == 'finger' and heat_flux_dim==3: q_max[i_finger-1,:] = np.maximum(q_max[i_finger-1,:], q_max[i_finger,:]) q_max = np.delete(q_max, i_finger, axis=0) elif mode == 'finger' and heat_flux_dim==2: q_max[i_finger-1] = np.maximum(q_max[i_finger-1], q_max[i_finger]) q_max = np.delete(q_max, i_finger, axis=0) # sum up # 'average' mode: sum all fingers over all torus modules --> wetted area of all divertors # divide by n_ports --> get average wetted area per divertor # 'module' mode: in each torus module sum over wetted area on all fingers # --> individual wetted areas per divertor # 'finger' mode: do not sum, since each finger was normalized with a differen q_max # --> individual wetted areas per finger # if only one divertors heat flux is given, proceed as in local mode if mode == 'finger': total_mean_width = finger_strikeline_width else: Weights=np.nan_to_num(strikeline_int/np.sum(strikeline_int,axis=0)) # print(Weights) WS=np.sum(Weights,axis=0) bla=np.where(WS==0)[0] for b in bla: Weights[:,b]=Weights[:,b]+1 total_mean_width = np.average(finger_strikeline_width,axis=0,weights=Weights) return total_mean_width, q_max def derive_peaking_factor_per_module(heat_flux, mapping,mode='average', q_max=None, profile_average_range=3, noise_threshold=2E5, ports_loaded=None, verbose=0): ''' Derive peaking of heat flux array by dividing the peak heat flux value with the mean heat flux value the total power load. The peak heat flux value is either: 1) the maximum within each divertor finger (mode='finger') 2) the maximum of a whole divertor module (mode='module') 3) the maximum of the mean divertor averaged toroidally (mode='average'). Mode average is default. In case the input heat flux has only 2 dimensions (from one divertor), mode 'average' and 'module' result in the same. Returned are the strike-line width(s) and the corresponding q_max value(s). INPUT ----- heat_flux: numpy array array of heat fluxes from THEODOR on the profiles defined in the IR mapping; can be 2D (one divertor), or 3D (multiple divertor modules) mapping: dictionary IR profile mapping information as returned by downloadversionIRdara.download_heatflux_mapping_reference(); minimum necessary keys are 'finger_ID' and 's' mode: str, optional label to identify the normalization mode, either 'module' (normalize by peak heat flux per torus module), 'average' (normalize by peak heat flux in the toroidally mean heat flux pattern), 'finger' (normalize by peak heat flux per finger in each torus module) (OPTIONAL: default is 'average') q_max: float or numpy array, optional either single peak heat flux value or a peak heat flux for each divertor module or each finger (depends on mode) (OPTIONAL: default is None, i.e. derived based on mode) profile_average_range: int, optional number of central profiles on each finger to average the integral heat flux on (this avoids hot leading edges and shadowed edges) (OPTIONAL: default is 3 profiles) noise_threshold: float, optional minimum heat flux level to crop heat_flux to, if heat flux has negative values (OPTIONAL: default is 200kW/m²) ports_loaded: list or str or int, optional if mode not 'average' label of divertor modules provided in heat_flux array for plots; int of port number for single divertor data and list of port numbers for heat flux from multiple divertor modules; # gets renamed if a mean heat flux pattern is used (mode 'average') (OPTIONAL: default is None, i.e. label will be 'w_s') verbose: integer, optional feedback level (details of print messages) (OPTIONAL: if not provided, only ERROR output) RESULT ------ mean_peaking_factor: float or numpy array mean peaking factor in a shape that depends on the mode (see NOTES) q_max: float or numpy array peak heat flux used for normalizatin in a shape that depends on the mode (see NOTES) NOTES ----- The shape of the results varies depending on the dimension of the input * 2D: singel divertor modules heat flux * 3D: heat flux from multiple divertor modules and the mode to derive q_max ('module', 'average', 'finger'): * '2D' + 'module' or 'average' --> one value for total_wetted_area and q_max * '3D' + 'average' --> 1D numpy arrays with two values for total_wetted_area and q_max (upper and lower divertors) * '3D' + 'module' --> 1D numpy arays with a value for each torus module (first dimension of heat_flux) * '2D' + 'finger' --> 1D numpy arays with a value for each divertor finger * '3D' + 'finger' --> 2D numpy arays with a value for each torus module and each divertor finger ''' #check input heat_flux_dim = len(np.shape(heat_flux)) if mode == 'average' and ports_loaded == None and heat_flux_dim>2: raise Exception("derive_peaking_factor_per_module: ports must be specified in average mode since V3.3.2") elif mode == 'average' and heat_flux_dim>2: try: llen=len(ports_loaded) except: raise Exception("derive_peaking_factor_per_module: each divertor need a description to calcualte proper the wetted area!") else: if llen!=len(heat_flux): raise Exception("derive_peaking_factor_per_module: number of given divertors and number of descriptions does not match!") # prepare mapping and finger information finger_dic = read_finger_info(verbose=verbose-1) finger_ID = finger_dic['ID'] profile_no = mapping['Finger_ID'][0] # find profile IDs of central profiles on each finger central_profiles_on_finger = [] is_central_profile = [] for i_finger in range(len(finger_ID)): n_profiles = finger_dic['n_profiles'][i_finger] i_profile_start = n_profiles//2 - profile_average_range//2 -1 central_profiles = i_finger*100 + np.arange(i_profile_start, i_profile_start+profile_average_range) central_profiles_on_finger.append(central_profiles) is_central_profile.append( np.logical_or.reduce([profile_no == centre_profile for centre_profile in central_profiles]) ) central_profiles_on_finger = np.array(central_profiles_on_finger) if np.nanmin(heat_flux) < 0: heat_flux[heat_flux0: print('derive_peaking_factor_per_module: set heat_flux < {0:.1f}kW/m² to 0'.format(noise_threshold/1E3)) # reduce dimension of heat_flux if in 'average' mode if heat_flux_dim == 2 and mode == 'average': mode = 'module' elif heat_flux_dim == 3 and mode == 'average': # heat_flux = np.nanmean(heat_flux, axis=0) ## sort the divertors updiv=[] downdiv=[] for i in range(len(ports_loaded)): # entries in the array/list are either int or str or even float try: port=int(ports_loaded[i]) except: #okay it is not an int or an int like string ## what can it be? 'AEFXX'? But what about OP2 with the A or K ports? Still be 3 letters port=int(ports_loaded[i][3:]) if port%10==0: downdiv.append(heat_flux[i]) else: updiv.append(heat_flux[i]) heat_flux=np.array([np.nanmean(np.asarray(updiv),axis=0),np.nanmean(np.asarray(downdiv),axis=0)]) del downdiv,updiv # heat_flux_dim = 3 ports_loaded = [1,0]#'upper divertor','lower divertor']#'mean heat flux' mode = 'module' if verbose>0: print('derive_peaking_factor_per_module: averaged 3D heat flux array over first dimension') if heat_flux_dim==3: # assume dimensions: toroidal index (camera ports), row, column n_ports = np.shape(heat_flux)[0]#, n_rows, n_cols if verbose>0: print('derive_peaking_factor_per_module: deriving wetted area on {0} divertor modules in {1} mode...'.format(n_ports, mode)) # derive q_max for normalization of integral if q_max is None: # derive q_max on every finger q_max_on_finger = [] for i_finger in range(len(finger_ID)): q_max_on_finger.append([np.nanmax(h[is_central_profile[i_finger]]) for h in heat_flux]) q_max_on_finger = np.array(q_max_on_finger) q_max_on_finger[q_max_on_finger==0] = 1 if mode == 'module': # one value per torus half module q_max = np.nanmax(q_max_on_finger, axis=0) elif mode == 'finger': # one value per finger in each torus half module q_max = q_max_on_finger q_max[q_max==0] = 1 # integrate over profiles # goodcounter=np.zeros([n_ports]) finger_strikeline_width = np.zeros([len(finger_ID), n_ports]) strikeline_int = np.zeros([len(finger_ID), n_ports]) for i_finger in range(len(finger_ID)): # initialize temporary line integral for each module central_line_integral = np.zeros(n_ports) # integrate over each central profile and average for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) # s = mapping['s'][ij_profile] h = heat_flux[:,ij_profile[0],ij_profile[1]] hh=[] for ele in h: hh=np.append(hh,np.mean(ele[np.where(ele>0)])) central_line_integral += np.nan_to_num(hh)#np.mean(h[:][np.where(h[:]>0)]))#np.trapz(h, x=s, axis=1) ) # complete averaging process central_line_integral = central_line_integral / profile_average_range # central_line_integral = central_line_integral *((central_line_integral>noise_threshold*max(s))*1) # goodcounter+=(central_line_integral>noise_threshold*max(s))*1 # normalize by q_max and multiply with width of finger strikeline_int[i_finger,:] = central_line_integral if mode == 'module': finger_strikeline_width[i_finger,:] = q_max / central_line_integral #/ q_max #* finger_dic['width'][i_finger] elif mode == 'finger': if np.min(q_max[i_finger] / central_line_integral) < 1: print("here comes something strange", q_max[i_finger], central_line_integral, np.shape(h)) finger_strikeline_width[i_finger,:] = q_max[i_finger] / central_line_integral #/ q_max[i_finger] #* finger_dic['width'][i_finger] elif heat_flux_dim==2: # assume dimensions: row, column if verbose>0: print('derive_peaking_factor_per_module: deriving peaking factor on single divertor module in {0} mode...'.format(mode)) # derive q_max for normalization of integral if q_max is None: if mode == 'average' or mode == 'module': # one value q_max = np.nanmax(heat_flux[np.logical_or.reduce(is_central_profile)]) elif mode == 'finger': # one value per finger q_max = [] for i_finger in range(len(finger_ID)): q_max.append( np.nanmax(heat_flux[is_central_profile[i_finger]]) ) q_max = np.array(q_max) q_max[q_max==0] = 1 # integrate over profiles # goodcounter=0 finger_strikeline_width = np.zeros([len(finger_ID)]) strikeline_int = np.zeros([len(finger_ID)]) for i_finger in range(len(finger_ID)): # integrate over each central profile and average central_line_integral = 0 for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) # s = mapping['s'][ij_profile] h = heat_flux[ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num(np.mean(h[np.where(h>0)]))#np.trapz(h, x=s, axis=0) ) central_line_integral = central_line_integral / profile_average_range # central_line_integral = central_line_integral *((central_line_integral>noise_threshold*max(s))*1) # goodcounter+=(central_line_integral>noise_threshold*s)*1 strikeline_int[i_finger] = central_line_integral if mode == 'average' or mode == 'module': finger_strikeline_width[i_finger] = q_max / central_line_integral #* finger_dic['width'][i_finger] elif mode == 'finger': if q_max[i_finger] / central_line_integral < 1: print("here comes something strange", q_max[i_finger], central_line_integral) finger_strikeline_width[i_finger] = q_max[i_finger] / central_line_integral #* finger_dic['width'][i_finger] # merge half-fingers of TM5 and TM6 if np.any(finger_dic['finger_part']): if verbose > 0: print('derive_peaking_factor_per_module: merge wetted area on half fingers of TM05 and TM06') new_finger_ID = np.copy(finger_ID) # scan backwards over fingers, merge and delete second finger halfs for i_finger in finger_ID[:0:-1]: if finger_dic['finger_part'][i_finger]: new_finger_ID = np.delete(new_finger_ID, i_finger) if heat_flux_dim==3 and mode != 'average': finger_strikeline_width[i_finger-1,:] = finger_strikeline_width[i_finger-1,:] + finger_strikeline_width[i_finger,:] strikeline_int[i_finger-1,:] = strikeline_int[i_finger-1,:] + strikeline_int[i_finger,:] else: finger_strikeline_width[i_finger-1] = finger_strikeline_width[i_finger-1] + finger_strikeline_width[i_finger] strikeline_int[i_finger-1] = strikeline_int[i_finger-1] + strikeline_int[i_finger] finger_strikeline_width = np.delete(finger_strikeline_width, i_finger, axis=0) strikeline_int=np.delete(strikeline_int, i_finger, axis=0) if mode == 'finger' and heat_flux_dim==3: q_max[i_finger-1,:] = np.maximum(q_max[i_finger-1,:], q_max[i_finger,:]) q_max = np.delete(q_max, i_finger, axis=0) elif mode == 'finger' and heat_flux_dim==2: q_max[i_finger-1] = np.maximum(q_max[i_finger-1], q_max[i_finger]) q_max = np.delete(q_max, i_finger, axis=0) # sum up # 'average' mode: sum all fingers over all torus modules --> wetted area of all divertors # divide by n_ports --> get average wetted area per divertor # 'module' mode: in each torus module sum over wetted area on all fingers # --> individual wetted areas per divertor # 'finger' mode: do not sum, since each finger was normalized with a differen q_max # --> individual wetted areas per finger # if only one divertors heat flux is given, proceed as in local mode if mode == 'finger': peaking_factor = finger_strikeline_width else: Weights=np.nan_to_num(strikeline_int/np.sum(strikeline_int,axis=0)) #deal with unloaded parts inf_pos=np.where(np.isinf(finger_strikeline_width)) finger_strikeline_width[inf_pos] = 0 Weights[inf_pos] = 0 # print(Weights) # print(finger_strikeline_width) WS=np.sum(Weights,axis=0) bla=np.where(WS==0)[0] for b in bla: Weights[:,b]=Weights[:,b]+1 peaking_factor = np.average(finger_strikeline_width,axis=0,weights=Weights) return peaking_factor, q_max def derive_wetted_area_per_module(heat_flux, mapping, mode='average', q_max=None, profile_average_range=3, noise_threshold=2E5, ports_loaded=None, plot_it=False, verbose=0): ''' Derive wetted area of heat flux array by integrating the total power load and dividing with a peak heat flux value. The peak heat flux value is either: 1) the maximum within each divertor finger (mode='finger') 2) the maximum of a whole divertor module (mode='module') 3) the maximum of the mean divertor averaged toroidally (mode='average'). Mode average is default. In case the input heat flux has only 2 dimensions (from one divertor), mode 'average' and 'module' result in the same. Returned are the wetted area(s) and the corresponding q_max value(s). INPUT ----- heat_flux: numpy array array of heat fluxes from THEODOR on the profiles defined in the IR mapping; can be 2D (one divertor), or 3D (multiple divertor modules) mapping: dictionary IR profile mapping information as returned by downloadversionIRdara.download_heatflux_mapping_reference(); minimum necessary keys are 'finger_ID' and 's' mode: str, optional label to identify the normalization mode, either 'module' (normalize by peak heat flux per torus module), 'average' (normalize by peak heat flux in the toroidally mean heat flux pattern), 'finger' (normalize by peak heat flux per finger in each torus module) (OPTIONAL: default is 'average') q_max: float or numpy array, optional either single peak heat flux value or a peak heat flux for each divertor module or each finger (depends on mode) (OPTIONAL: default is None, i.e. derived based on mode) profile_average_range: int, optional number of central profiles on each finger to average the integral heat flux on (this avoids hot leading edges and shadowed edges) (OPTIONAL: default is 3 profiles) noise_threshold: float, optional minimum heat flux level to crop heat_flux to, if heat flux has negative values (OPTIONAL: default is 200kW/m²) ports_loaded: list or str or int, optional if mode not 'average' label of divertor modules provided in heat_flux array for plots; int of port number for single divertor data and list of port numbers for heat flux from multiple divertor modules; # gets renamed if a mean heat flux pattern is used (mode 'average') (OPTIONAL: default is None, i.e. label will be 'A_w') plot_it: bool, optional switch of whether to plot intermediate results or not (OPTIONAL: deafult is NOT to plot) verbose: integer, optional feedback level (details of print messages) (OPTIONAL: if not provided, only ERROR output) RESULT ------ total_wetted_area: float or numpy array wetted area in a shape that depends on the mode (see NOTES) in average it is (upper,lower) q_max: float or numpy array peak heat flux used for normalizatin in a shape that depends on the mode (see NOTES) NOTES ----- The shape of the results varies depending on the dimension of the input * 2D: singel divertor modules heat flux * 3D: heat flux from multiple divertor modules and the mode to derive q_max ('module', 'average', 'finger'): * '2D' + 'module' or 'average' --> one value for total_wetted_area and q_max * '3D' + 'average' --> 1D numpy arrays with two values for total_wetted_area and q_max (upper and lower divertors) * '3D' + 'module' --> 1D numpy arays with a value for each torus module (first dimension of heat_flux) * '2D' + 'finger' --> 1D numpy arays with a value for each divertor finger * '3D' + 'finger' --> 2D numpy arays with a value for each torus module and each divertor finger ''' #check input heat_flux_dim = len(np.shape(heat_flux)) if mode == 'average' and ports_loaded == None and heat_flux_dim>2: raise Exception("derive_wetted_area_per_module: ports must be specified in average mode since V3.3.2") elif mode == 'average' and heat_flux_dim>2: try: llen=len(ports_loaded) except: raise Exception("derive_wetted_area_per_module: each divertor need a description to calcualte proper the wetted area!") else: if llen!=len(heat_flux): raise Exception("derive_wetted_area_per_module: number of given divertors and number of descriptions does not match!") # prepare mapping and finger information finger_dic = read_finger_info(verbose=verbose-1) finger_ID = finger_dic['ID'] profile_no = mapping['Finger_ID'][0] # find profile IDs of central profiles on each finger central_profiles_on_finger = [] is_central_profile = [] for i_finger in range(len(finger_ID)): n_profiles = finger_dic['n_profiles'][i_finger] i_profile_start = n_profiles//2 - profile_average_range//2 -1 central_profiles = i_finger*100 + np.arange(i_profile_start, i_profile_start+profile_average_range) central_profiles_on_finger.append(central_profiles) is_central_profile.append( np.logical_or.reduce([profile_no == centre_profile for centre_profile in central_profiles]) ) central_profiles_on_finger = np.array(central_profiles_on_finger) if np.nanmin(heat_flux) < 0: heat_flux[heat_flux0: print('derive_wetted_area_per_module: set heat_flux < {0:.1f}kW/m² to 0'.format(noise_threshold/1E3)) # reduce dimension of heat_flux if in 'average' mode if heat_flux_dim == 2 and mode == 'average': mode = 'module' elif heat_flux_dim == 3 and mode == 'average': # heat_flux = np.nanmean(heat_flux, axis=0) ## sort the divertors updiv=[] downdiv=[] for i in range(len(ports_loaded)): # entries in the array/list are either int or str or even float try: port=int(ports_loaded[i]) except: #okay it is not an int or an int like string ## what can it be? 'AEFXX'? But what about OP2 with the A or K ports? Still be 3 letters port=int(ports_loaded[i][3:]) if port%10==0: downdiv.append(heat_flux[i]) else: updiv.append(heat_flux[i]) heat_flux=np.array([np.nanmean(np.asarray(updiv),axis=0),np.nanmean(np.asarray(downdiv),axis=0)]) del downdiv,updiv # heat_flux_dim = 3 ports_loaded = [1,0]#'upper divertor','lower divertor']#'mean heat flux' mode = 'module' if verbose>0: print('derive_wetted_area_per_module: averaged 3D heat flux array over first dimension') if plot_it: fig,ax=plt.subplots(1,2) ax[0].imshow(heat_flux[0]/1e6,vmin=0,vmax=5) ax[1].imshow(heat_flux[1]/1e6,vmin=0,vmax=5) elif heat_flux_dim == 3 and mode == 'average_central_max': ## assuming that the strike lines on each divertor are shifted a little bit radially to each other, shift the profiles so that maximimas agree in position # if not p max is lower and the strike-line and wetted area would to too large. ## sort the divertors updiv=[] downdiv=[] for i in range(len(ports_loaded)): try: port=int(ports_loaded[i]) except: #okay it is not an int or an int like string port=int(ports_loaded[i][3:]) if port%10==0: downdiv.append(heat_flux[i]) else: updiv.append(heat_flux[i]) dummyup=updiv[0].copy() for i in range(1,len(updiv)): for j in range(len(finger_ID)): for k in range(finger_dic['n_profiles'][j]): loc=np.where(mapping['Finger_ID'][0]==j*100+k) line1=updiv[0][loc] line2=updiv[i][loc] if np.argmax(line1)!=0 and np.argmax(line2)!=0: pmax_div=np.argmax(line1)-np.argmax(line2) if pmax_div<0: pstart=0 pend=len(loc[0])+pmax_div else: pstart=pmax_div pend=len(loc[0])-pmax_div else: pmax_div=0 pstart=0 pend=len(loc[0]) # print(pend,max(loc[0]),pmax_div,np.shape(dummyup),max(loc[0]),max(loc[1])) for y in range(pstart,pend): dummyup[loc[0][y],loc[1][0]]=dummyup[loc[0][y],loc[1][0]]+updiv[i][loc[0][y]+pmax_div,loc[1][0]] dummyup=dummyup/len(updiv) dummydwn=downdiv[0].copy() for i in range(1,len(downdiv)): for j in range(len(finger_ID)): for k in range(finger_dic['n_profiles'][j]): loc=np.where(mapping['Finger_ID'][0]==j*100+k) line1=downdiv[0][loc] line2=downdiv[i][loc] if np.argmax(line1)!=0 and np.argmax(line2)!=0: pmax_div=np.argmax(line1)-np.argmax(line2) if pmax_div<0: pstart=0 pend=len(loc[0])+pmax_div else: pstart=pmax_div pend=len(loc[0])-pmax_div else: pmax_div = 0 pstart =0 pend=len(loc[0]) for y in range(pstart,pend): dummydwn[loc[0][y],loc[1][0]] = dummydwn[loc[0][y],loc[1][0]]+downdiv[i][loc[0][y]+pmax_div,loc[1][0]] dummydwn = dummydwn/len(downdiv) heat_flux = np.array([dummyup,dummydwn]) heat_flux = np.nan_to_num(heat_flux) ports_loaded = [1,0]#'upper divertor','lower divertor']#'mean heat flux' mode = 'module' if plot_it: print(np.shape(dummyup),np.shape(dummydwn)) # plt.figure() fig,ax = plt.subplots(2,2) ax[0][0].imshow(np.nanmean(np.asarray(updiv), axis=0), vmin=0, vmax=5e6) ax[1][0].imshow(dummyup, vmin=0, vmax=5e6) ax[0][1].imshow(np.nanmean(np.asarray(downdiv), axis=0),vmin=0,vmax=5e6) ax[1][1].imshow(dummydwn,vmin=0,vmax=5e6) # print("in development") if heat_flux_dim==3: # assume dimensions: toroidal index (camera ports), row, column n_ports = np.shape(heat_flux)[0]#, n_rows, n_cols if verbose>0: print('derive_wetted_area_per_module: deriving wetted area on {0} divertor modules in {1} mode...'.format(n_ports, mode)) # derive q_max for normalization of integral if q_max is None: # derive q_max on every finger q_max_on_finger = [] for i_finger in range(len(finger_ID)): q_max_on_finger.append([np.nanmax(h[is_central_profile[i_finger]]) for h in heat_flux]) q_max_on_finger = np.array(q_max_on_finger) q_max_on_finger[q_max_on_finger==0] = 1 if mode == 'module': # one value per torus half module q_max = np.nanmax(q_max_on_finger, axis=0) elif mode == 'finger': # one value per finger in each torus half module q_max = q_max_on_finger q_max[q_max==0] = 1 # integrate over profiles finger_wetted_area = np.zeros([len(finger_ID), n_ports]) for i_finger in range(len(finger_ID)): # initialize temporary line integral for each module central_line_integral = np.zeros(n_ports) # integrate over each central profile and average for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) s = mapping['s'][ij_profile] h = heat_flux[:,ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num( np.trapz(h, x=s, axis=1) ) # complete averaging process central_line_integral = central_line_integral / profile_average_range # normalize by q_max and multiply with width of finger if mode == 'module': finger_wetted_area[i_finger,:] = central_line_integral / q_max * finger_dic['width'][i_finger] elif mode == 'finger': finger_wetted_area[i_finger,:] = central_line_integral / q_max[i_finger] * finger_dic['width'][i_finger] # get example profiles and wetted area widths at local maxima i_finger_max = np.argmax(q_max_on_finger, axis=0) h_max = [] s_max = [] width_max = [] height_max = [] for i in range(len(i_finger_max)): central_profiles = central_profiles_on_finger[i_finger_max[i]] h_profiles = [ heat_flux[i][profile_no == cp] for cp in central_profiles ] i_max = np.argmax([np.sum(h) for h in h_profiles]) # i_max = np.argsort([np.sum(h) for h in h_profiles])[profile_average_range//2] h_max.append( h_profiles[ i_max ] ) s_max.append( mapping['s'][profile_no == central_profiles[i_max]] ) if mode == 'module': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max[i])) height_max.append(q_max[i]) elif mode == 'finger': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max[i_finger_max[i],i])) height_max.append(q_max[i_finger_max[i],i]) i_samples = [np.argmin(width_max), np.argsort(width_max)[len(width_max)//2], np.argmax(width_max)] i_centre = np.argsort(width_max)[len(width_max)//2] s_max = [s_max[np.argmin(width_max)], s_max[i_centre], s_max[np.argmax(width_max)]] h_max = [h_max[np.argmin(width_max)], h_max[i_centre], h_max[np.argmax(width_max)]] height_max = [height_max[np.argmin(width_max)], height_max[i_centre], height_max[np.argmax(width_max)]] width_max = [width_max[np.argmin(width_max)], width_max[i_centre], width_max[np.argmax(width_max)]] elif heat_flux_dim==2: # assume dimensions: row, column if verbose>0: print('derive_wetted_area_per_module: deriving wetted area on single divertor module in {0} mode...'.format(mode)) # derive q_max for normalization of integral if q_max is None: if mode == 'average' or mode == 'module': # one value q_max = np.nanmax(heat_flux[np.logical_or.reduce(is_central_profile)]) elif mode == 'finger': # one value per finger q_max = [] for i_finger in range(len(finger_ID)): q_max.append( np.nanmax(heat_flux[is_central_profile[i_finger]]) ) q_max = np.array(q_max) q_max[q_max==0] = 1 # integrate over profiles finger_wetted_area = np.zeros([len(finger_ID)]) for i_finger in range(len(finger_ID)): # integrate over each central profile and average central_line_integral = 0 for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) s = mapping['s'][ij_profile] h = heat_flux[ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num( np.trapz(h, x=s, axis=0) ) central_line_integral = central_line_integral / profile_average_range if mode in ('average','module'): finger_wetted_area[i_finger] = central_line_integral / q_max * finger_dic['width'][i_finger] elif mode == 'finger': finger_wetted_area[i_finger] = central_line_integral / q_max[i_finger] * finger_dic['width'][i_finger] # get example profiles and wetted area widths at local maxima i_samples = [np.nanargmin(finger_wetted_area), np.argsort(finger_wetted_area)[len(finger_wetted_area)//2], np.nanargmax(finger_wetted_area)] h_max = [] s_max = [] width_max = [] height_max = [] for i in range(len(i_samples)): central_profiles = central_profiles_on_finger[i_samples[i]] h_profiles = [ heat_flux[profile_no == cp] for cp in central_profiles ] i_max = np.argmax([np.sum(h) for h in h_profiles]) h_max.append( h_profiles[ i_max ] ) s_max.append( mapping['s'][profile_no == central_profiles[i_max]] ) if mode in ('module','average'): width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max)) height_max.append(q_max) elif mode == 'finger': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max[i_samples[i]])) height_max.append(q_max[i_samples[i]]) # merge half-fingers of TM5 and TM6 if np.any(finger_dic['finger_part']): if verbose>0: print('derive_wetted_area_per_module: merge wetted area on half fingers of TM05 and TM06') new_finger_ID = np.copy(finger_ID) # scan backwards over fingers, merge and delete second finger halfs for i_finger in finger_ID[:0:-1]: if finger_dic['finger_part'][i_finger]: new_finger_ID = np.delete(new_finger_ID, i_finger) if heat_flux_dim==3 and mode != 'average': finger_wetted_area[i_finger-1,:] = finger_wetted_area[i_finger-1,:] + finger_wetted_area[i_finger,:] else: finger_wetted_area[i_finger-1] = finger_wetted_area[i_finger-1] + finger_wetted_area[i_finger] finger_wetted_area = np.delete(finger_wetted_area, i_finger, axis=0) if mode == 'finger' and heat_flux_dim==3: q_max[i_finger-1,:] = np.maximum(q_max[i_finger-1,:], q_max[i_finger,:]) q_max = np.delete(q_max, i_finger, axis=0) elif mode == 'finger' and heat_flux_dim==2: q_max[i_finger-1] = np.maximum(q_max[i_finger-1], q_max[i_finger]) q_max = np.delete(q_max, i_finger, axis=0) # sum up # 'average' mode: sum all fingers over all torus modules --> wetted area of all divertors # divide by n_ports --> get average wetted area per divertor # 'module' mode: in each torus module sum over wetted area on all fingers # --> individual wetted areas per divertor # 'finger' mode: do not sum, since each finger was normalized with a differen q_max # --> individual wetted areas per finger # if only one divertors heat flux is given, proceed as in local mode if mode == 'finger': total_wetted_area = finger_wetted_area else: total_wetted_area = np.sum(finger_wetted_area, axis=0) # if requested, make some visalization if plot_it: if ports_loaded is None: if heat_flux_dim==3: ports_label = ['port index {0}'.format(i) for i in range(n_ports)] elif heat_flux_dim==2: ports_label = ['A_w'] elif isinstance(ports_loaded, list) or isinstance(ports_loaded, np.ndarray): ports_label = ['module {0} {1}'.format(int(port/10), ['L','U'][port%10]) for port in ports_loaded] elif isinstance(ports_loaded, int) or isinstance(ports_loaded, np.int_): ports_label = ['module {0} {1}'.format(int(int(ports_loaded)/10), ['L','U'][int(ports_loaded)%10])] elif isinstance(ports_loaded, str): ports_label = [ports_loaded] # wetted areas vs fingers plt.figure() if heat_flux_dim==3 and mode != 'average': for i_port in range(n_ports): plt.plot(new_finger_ID, finger_wetted_area[:,i_port]*1E4, label=ports_label[i_port]) elif heat_flux_dim==3 and mode == 'average': plt.plot(new_finger_ID, finger_wetted_area*1E4, label='mean heat flux') else: plt.plot(new_finger_ID, finger_wetted_area*1E4, label=ports_label[0]) plt.title('normalized with q_max of {0}'.format(mode)) plt.xlabel('finger no.') plt.ylabel('wetted area [cm²]') plt.legend() plt.show() # wetted areas vs torus modules if heat_flux_dim==3: if mode == 'module': i_ports = np.arange(n_ports) plt.figure() plt.plot(i_ports, total_wetted_area*1E2) plt.xticks(i_ports, [port[-3:] for port in ports_label], rotation='vertical') plt.xlabel('module') plt.ylabel('wetted area [dm²]') plt.tight_layout() plt.show() # central finger profiles and the rectangle with the same integral # (the width represented to contribution to the wetted area) plt.figure() if heat_flux_dim==3: label_str = [ports_label[i] for i in i_samples] if mode=='module': plt.title('wetted area examples from fingers with max Int[q]\nintegrated on their respective divertor and normalized to a local q_max') elif mode == 'average': plt.title('wetted area examples from fingers with max Int[q]\nintegrated on the mean heat flux pattern and normalized to a global q_max') elif mode == 'finger': plt.title('wetted area examples from fingers with max Int[q]\nintegrated on their respective divertor and normalized to a finger q_max') elif heat_flux_dim==2: label_str = ['min(A_w)', 'median(A_w)', 'max(A_w)'] if mode == 'finger': plt.title('wetted area examples from fingers\nintegrated on their respective divertor and normalized to a finger q_max') elif mode == 'module': plt.title('wetted area examples from fingers\nintegrated on their respective divertor and normalized to a local q_max') elif mode == 'average': plt.title('wetted area examples from fingers\nintegrated on a mean heat flux pattern and normalized to a its q_max') color_str = ['b', 'g', 'r'] rect = [] for i in range(len(i_samples)): plt.plot(s_max[i], h_max[i]/1E3, label=label_str[i], color=color_str[i]) width = width_max[i] height = height_max[i]/1E3 start_point = (s_max[i][np.argmax(h_max[i])]-width/2,0) rect.append(patches.Rectangle(start_point,width,height,linewidth=1, edgecolor=color_str[i], facecolor='none')) plt.gca().add_patch(rect[-1]) plt.xlabel('s [m]') plt.ylabel('q [kW/m²]') plt.legend() plt.show() return total_wetted_area, q_max def convert_Bad_Pixels(Badpixels,map2D): """ transform the badpixel information from the camera images into a bad pixel information for the heat flux images INPUT ----- Badpixels: 2D array image of the bad pixels with 0 no bad pixel and >0 as bad pixel map2D: dictonary of the see downloadversionIRdata.download_heatflux_scene_model_reference() RESULT ----- heatflux_badpixel: numpy array good pixels have a zero, bad pixels have a 1 """ heatflux_badpixel=np.zeros(np.shape(map2D['Pixel_Y'])) PX=map2D['Pixel_X'] PY=map2D['Pixel_Y'] tlocos=np.where(np.asarray(Badpixels)>0) locos=[] for i in range(len(tlocos[0])): locos.append((tlocos[0][i],tlocos[1][i])) for ele in locos: Xloc=np.where(PX==ele[1]) Yloc=np.where(PY==ele[0]) if len(Xloc[0])>0 and len(Yloc[0])>0:#okay there are points in the heat flux image with the coordinates PointsX=[] # PointsY=[] for k in range(len(Xloc[0])): PointsX.append((Xloc[0][k],Xloc[1][k])) for L in range(len(Yloc[0])): YY=(Yloc[0][L],Yloc[1][L]) if YY in PointsX: heatflux_badpixel[YY[0],YY[1]]=1 # PointsY.append((Yloc[0][L],Yloc[1][L])) return heatflux_badpixel