# -*- coding: utf-8 -*- """ Created on Wed May 9 14:56:32 2018 Version: 3.1.0 @author: Holger Niemann, Peter Drewelow, Yu Gao mainly to clean up the downloadversionIRdata code Tools for: checking IR images, calculate gain and offset again from check backgroundframes check coldframes ... """ import numpy as np import matplotlib.pyplot as plt import matplotlib.patches as patches from IR_config_constants import portcamdict,IRCamRefImagespath,IRCAMBadPixels_path,parameter_file_path import h5py import os import glob import datetime # set working directory to local directory of script os.chdir(os.path.dirname(__file__)) def get_OP_by_time(time_ns=None, shot_no=None, program_str=None): '''Derives operation phase (OP) of W7-X based on either: a nanosacond time stamp, a MDSplus style shot no. or a program ID. IN: time_ns - integer of nanosecond time stamp, e.g. 1511972727249834301 (OPTIONAL) shot_no - integer of MDSplus style shot number, e.g. 171207022 (OPTIONAL) program_str - string of CoDaQ ArchiveDB style prgram number or date, e.g. '20171207.022' or '20171207' (OPTIONAL) RETURN: conn - MDSplus connection object, to be used in e.g. 1511972727249834301 read_MDSplus_image_simple(), read_MDSplus_metadata() ''' # derive operation phase (OP) from time as nanosecond time stamp or string if time_ns is not None: dateOP = datetime.datetime.utcfromtimestamp(time_ns/1e9) elif shot_no is not None: dateOP = datetime.datetime.strptime(str(shot_no)[:6], '%y%m%d') elif program_str is not None: dateOP = datetime.datetime.strptime(program_str[:8], '%Y%m%d') else: raise Exception('get_OP_by_time: ERROR! neither time, shot no. or program ID provided') if dateOP.year == 2017: if dateOP.month>8 and dateOP.month<12: return "OP1.2a" elif dateOP.month==8 and dateOP.day>=28: return "OP1.2a" elif dateOP.month==12 and dateOP.day<8: return "OP1.2a" else: return None elif dateOP.year == 2018: return "OP1.2b" elif dateOP.year <= 2016 and dateOP.year >= 2015: if (dateOP.year == 2016 and dateOP.month <= 3) or (dateOP.year == 2015 and dateOP.month == 12): return "OP1.1" else: return None def bestimmtheitsmass_general(data,fit): R=0 if len(fit)==len(data): mittel=np.sum(data)/len(data) qam=quad_abweich_mittel(fit,mittel) R=qam/(qam+quad_abweich(data,fit)) else: print("bestimmtheitsmass_general: Arrays must have same dimensions") return R def bestimmheitsmass_linear(data,fit,debugmode=False): R2=0 if len(fit)==len(data): mittel_D=np.mean(data)#np.sum(data)/len(data) mittel_F=np.mean(fit) R2=quad_abweich_mittel(fit,mittel_D)/quad_abweich_mittel(data,mittel_D) if debugmode: print(mittel_D,mittel_F,quad_abweich_mittel(fit,mittel_D),quad_abweich_mittel(data,mittel_D),R2) else: print("bestimmtheitsmass_linear: Arrays must have same dimensions") return R2 def quad_abweich_mittel(data,mittel): R=0 for i in data: R=R+(i-mittel)**2 return R def quad_abweich(data,fit): R=0 if len(fit)==len(data): for i in range(len(data)): R=R+(data[i]-fit[i])**2 else: print("quad_abweich: Arrays must have same dimensions") return R def find_nearest(array,value): #a=array a = [x - value for x in array] mini = np.min(np.abs(a)) try: idx= a.index(mini) except: idx= a.index(-mini) return idx#array[idx] def check_coldframe(coldframe,references=None,threshold=0.5,plot_it=False): ''' return true/false and the quality factor ''' shapi=np.shape(coldframe) ##function (np.arange(0,768)-384)**(2)/900-50 datasets=[] for i in [int(shapi[1]//4),int(shapi[1]//2),int(shapi[1]//4*3)]: dataline=coldframe[0:shapi[0],i] datasets.append(dataline-np.mean(dataline)) if references==None: references=[] for dat in datasets: mini=np.mean(dat[shapi[0]/2-50:shapi[0]/2+50]) a=(np.mean(dat[0:50])-mini)/(int(shapi[0]/2))**2 reference=a*(np.arange(0,shapi[0])-int(shapi[0]/2))**(2)+mini references.append(reference) bestimmtheit=[] if plot_it: plt.figure() plt.imshow(coldframe,vmin=np.mean(coldframe)-500,vmax=np.mean(coldframe)+500) plt.figure() for i_dat in range(len(datasets)): dat=datasets[i_dat] reference=references[i_dat] bestimmtheit.append(bestimmtheitsmass_general(dat,reference)) if plot_it: plt.plot(dat,label='data') plt.plot(reference,label='reference') # print(int(shapi[0]/2),1*(np.max(datasets[-1])-mini),mini) plt.legend() if np.mean(bestimmtheit)>threshold: return True,bestimmtheit else: return False,bestimmtheit def check_coldframe_by_refframe(coldframe,reference_frame,threshold=0.8,plot_it=False): ''' ''' references=[] shapi=np.shape(reference_frame) for i in [int(shapi[1]//5),int(shapi[1]//2),int(shapi[1]//4*3)]: dataline=reference_frame[0:shapi[0],i] references.append(dataline-np.mean(dataline)) return check_coldframe(coldframe,references,threshold,plot_it) def check_backgroundframe(backgroundframe,threshold=50): ''' return true or false ''' shapi=np.shape(backgroundframe) valid=True dataset=[] for i in [int(shapi[1]//4),int(shapi[1]//2),int(shapi[1]//4*3)]: referenceline=backgroundframe[0:shapi[0],i] meanref=referenceline-np.mean(referenceline) dataset.append(np.max(meanref)-np.min(meanref)) if np.mean(dataset)threshold)) ) bad_pixels = (bad_pixels).tolist() bad_pixels = [tuple(l) for l in bad_pixels] if plot_it: plt.figure() plt.imshow(frame) for i in range(len(bad_pixels)): plt.scatter(bad_pixels[i][1],bad_pixels[i][0],c='None') plt.show() return bad_pixels def correct_images(images,badpixels,verbose=0): ''' ''' if type(badpixels)!=int: if type(images) == list: # return corrected images also as list of 2D arrays for i in range(len(images)): images[i]=restore_bad_pixels(images[i], np.invert(badpixels==1), verbose=verbose-1) else: # keep shape images = restore_bad_pixels(images, np.invert(badpixels==1), verbose=verbose-1) # for i in range(len(images)): # images[i]=(restore_pixels(images[i],np.invert(badpixels==1))).astype(np.float32) if verbose>0: print("correct_images: done") return images def restore_bad_pixels(frames, bad_pixel, by_list=True, check_neighbours=True, plot_it=False,verbose=0): """Restore bad pixel by interpolation of adjacent pixels. Optionally make sure that adjacent pixels are not bad (time consuming). Default is to use a list of bad pixels and a for loop. For many bad pixels consider using the optinal alternative using a bad pixel mask. IN: frames - either list of frames as 2D numpy array, or 3D numpy array (frame number, n_rows, n_cols), or 2D numpy array (n_rows, n_cols) bad_pixel - either list of tuples of bad pixel coordinates, or mask of pixel status (good=True, bad=False) by_list - boolean of whether to use a list and a for loop (True), or to use a mask of bad pixel and array operations (False) (OPTIONAL: if not provided, True (list) is default) check_neighbours - boolean of whether to check if neighbours of a bad pixel are not bad either before computing a mean (works only in list mode!) (OPTIONAL: if not provided, check is on) plot_it - boolean to decide whether to plot intermediate results or not (OPTIONAL: if not provided, switched off) verbose - integer of feedback level (amount of prints) (OPTIONAL: if not provided, only ERROR output) RETURN: frames - 3D numpy array (frame number, n_rows, n_cols) of corrected frames """ # make sure frames is correctly shaped if type(frames) == list: frames = np.array(frames) frame_shape = 'list' else: if len(np.shape(frames)) == 2: frames = np.array([frames]) frame_shape = '2D' elif len(np.shape(frames)) == 3: frame_shape = '3D' pass else: raise Exception('restore_bad_pixels: ERROR! Unexpected shape of frames.') frame_dtype = frames.dtype # frames = frames.astype(float) n_frames, n_rows, n_cols = np.shape(frames) if plot_it: start_frame = np.copy(frames[0]) # make sure bad pixel are provided as mask and list if type(bad_pixel) is list: blist = bad_pixel bmask = np.ones([n_rows, n_cols],dtype=bool) for pix in blist: try: bmask[pix] = False except Exception as E: Warning(E) bmask = np.invert(bmask) else: if np.shape(bad_pixel)[0] == n_rows and np.shape(bad_pixel)[1] == n_cols: bmask = np.invert(bad_pixel) x,y = np.where(bmask) blist = list(zip(x,y)) else: raise Exception('restore_bad_pixels: ERROR! bad_pixel in bad shape {0}'.format(np.shape(bad_pixel))) if verbose > 0: print('restore_bad_pixels: {0} bad pixels to be restored: {1} ... '.format(len(blist), blist[:3])) # expand frame by rows and columns of zeros to simplify treatment of edges frames = np.dstack([np.zeros([n_frames,n_rows], dtype=frame_dtype), frames, np.zeros([n_frames,n_rows], dtype=frame_dtype)]) frames = np.hstack([np.zeros([n_frames,1,n_cols+2], dtype=frame_dtype), frames, np.zeros([n_frames,1,n_cols+2], dtype=frame_dtype)]) bmask = np.vstack([np.zeros([1,n_cols], dtype=bool), bmask, np.zeros([1,n_cols], dtype=bool)]) bmask = np.hstack([np.zeros([n_rows+2,1], dtype=bool), bmask, np.zeros([n_rows+2,1], dtype=bool)]) # define number of neighbours (up to 4) ina an array of expanded frame size n_neighbours = np.ones([n_frames, n_rows+2, n_cols+2])*4 n_neighbours[:,1,:] = 3 n_neighbours[:,-2,:] = 3 n_neighbours[:,:,1] = 3 n_neighbours[:,:,-2] = 3 n_neighbours[:,1,1] = 2 n_neighbours[:,1,-2] = 2 n_neighbours[:,-2,1] = 2 n_neighbours[:,-2,-2] = 2 if by_list: # ===== correct bad pixels using the list of bad pixels ===== # for pos in blist: # Note: # pos points to real frame coordinates, while bmask, n_neighbours have been expanded! if check_neighbours: # takes only neighbours that are not bad pos_l = np.where(bmask[pos[0]+1,:pos[1]+1]==False)[0] if len(pos_l) != 0: pos_l = pos_l[-1] else: pos_l = pos[1]+1 pos_r = np.where(bmask[pos[0]+1,pos[1]+1:]==False)[0] if len(pos_r) != 0: pos_r = pos_r[0] + pos[1]+1 else: pos_r = pos[1]+2 pos_t = np.where(bmask[:pos[0]+1,pos[1]+1]==False)[0] if len(pos_t) != 0: pos_t = pos_t[-1] else: pos_t = pos[0]+1 pos_b = np.where(bmask[pos[0]+1:,pos[1]+1]==False)[0] if len(pos_b) != 0: pos_b = pos_b[0] + pos[0]+1 else: pos_b = pos[0]+2 else: # insensitive to neighbours being bad as well! pos_l = pos[1] pos_r = pos[1]+2 pos_t = pos[0] pos_b = pos[0]+2 average = (frames[:,pos[0]+1,pos_l].astype(float) + frames[:,pos[0]+1,pos_r].astype(float) + frames[:,pos_t,pos[1]+1].astype(float) + frames[:,pos_b,pos[1]+1].astype(float)) / n_neighbours[:,pos[0]+1,pos[1]+1] frames[:,pos[0]+1,pos[1]+1] = average.astype(frame_dtype) frames = frames[:,1:-1,1:-1] else: # ======= correct bad pixels using the bad pixel mask ======= # # (insensitive to neighbours being bad as well!) # prepare mask arrays for neighbours by shifting it to left, right, top and bottom bmask_l = np.hstack([bmask[:,1:], np.zeros([n_rows+2,1], dtype=bool)]) bmask_r = np.hstack([np.zeros([n_rows+2,1], dtype=bool), bmask[:,:-1]]) bmask_t = np.vstack([bmask[1:,:], np.zeros([1,n_cols+2], dtype=bool)]) bmask_b = np.vstack([np.zeros([1,n_cols+2], dtype=bool), bmask[:-1,:]]) # ----------------------------------- # restore by mask # frames[:,bmask] = ( (frames[:,bmask_l].astype(float) + frames[:,bmask_r].astype(float) + frames[:,bmask_t].astype(float) + frames[:,bmask_b].astype(float)) / n_neighbours[:,bmask] ).astype(frame_dtype) frames = frames[:,1:-1,1:-1] # plot comparison if plot_it: plt.figure() plt.title('bad pixel correction of first frame') m = np.mean(start_frame) s = np.std(start_frame) plt.imshow(start_frame, vmin=m-s, vmax=m+s) plt.colorbar() x,y = zip(*blist) plt.scatter(y,x, marker='o', s=5, c='r', linewidths=1) plt.tight_layout() plt.show() if frame_shape == 'list': frames = list(frames) elif frame_shape == '2D' and len(np.shape(frames))==3: frames = frames[0] return frames def generate_new_hot_image(cold,reference_cold,reference_hot): ''' ''' if cold is None or reference_cold is None or reference_hot is None: raise Exception("generate_new_hot_image: Cannot Calculate new Hot image, if images are missing!") else: return reference_hot+(cold-reference_cold) def calculate_gain_offset_image_pix(cold_image,hot_image=None,reference_cold=None,reference_hot=None,verbose=0): ''' ''' if hot_image is None: hot_image=generate_new_hot_image(cold_image,reference_cold,reference_hot) if verbose>0: print("calculate_gain_offset_image_pix: calculate gain and offset") Sh_ref = hot_image[ ( np.int( np.shape(hot_image)[0] /2 ) ) ][np.int( (np.shape(hot_image)[1] /2 ) ) ] Sc_ref = cold_image[ ( np.int( (np.shape(cold_image)[0]) /2 ) ) ][( np.int( (np.shape(cold_image)[1]) /2 ) ) ] Gain_rel = ( Sh_ref - Sc_ref ) / ( hot_image - cold_image) Off_h_rel = Sh_ref - hot_image*Gain_rel Off_c_rel = Sc_ref - cold_image*Gain_rel Offset_rel = ( Off_h_rel + Off_c_rel ) /2 return Gain_rel,Offset_rel def calculate_gain_offset_image(cold_image,hot_image=None,reference_cold=None,reference_hot=None,verbose=0): if hot_image is None: hot_image=generate_new_hot_image(cold_image,reference_cold,reference_hot) if verbose>0: print("calculate_gain_offset_image: calculate gain and offset") # Sh_ref = hot_image[ ( np.int( np.shape(hot_image)[0] /2 ) ) ][np.int( (np.shape(hot_image)[1] /2 ) ) ] # Sc_ref = cold_image[ ( np.int( (np.shape(cold_image)[0]) /2 ) ) ][( np.int( (np.shape(cold_image)[1]) /2 ) ) ] # print(hot_image[( np.int( np.shape(hot_image)[0]/2) )-2: (np.int( np.shape(hot_image)[0]/2))+3,np.int((np.shape(hot_image)[1]/2))-2:np.int((np.shape(hot_image)[1]/2))+3 ]) # print(cold_image[( np.int( np.shape(hot_image)[0]/2) )-2: (np.int( np.shape(hot_image)[0]/2))+3,np.int((np.shape(hot_image)[1]/2))-2:np.int((np.shape(hot_image)[1]/2))+3 ]) Sh_ref = np.mean( hot_image[( np.int( np.shape(hot_image)[0]/2) )-2: (np.int( np.shape(hot_image)[0]/2))+3,np.int((np.shape(hot_image)[1]/2))-2:np.int((np.shape(hot_image)[1]/2))+3 ]) Sc_ref = np.mean(cold_image[( np.int( np.shape(cold_image)[0]/2) )-2: (np.int( np.shape(cold_image)[0]/2))+3,np.int((np.shape(cold_image)[1]/2))-2:np.int((np.shape(cold_image)[1]/2))+3 ]) difference_image=hot_image - cold_image indexlist=np.where(difference_image==0) difference_image[indexlist]=0.001 Gain_rel = ( Sh_ref - Sc_ref ) / ( difference_image) Gain_rel[indexlist]=0 Off_h_rel = Sh_ref - hot_image*Gain_rel Off_c_rel = Sc_ref - cold_image*Gain_rel Offset_rel = ( Off_h_rel + Off_c_rel ) /2 return Gain_rel,Offset_rel #%% functions from Yu Gao """ functions by Yu Gao""" def load_ref_images(port, exposuretime, verbose=0): ''' load the reference cold and hot frame during calibration from local files. @port: e.g. 'AEF10' @exposuretime: int number. ''' cameraname = portcamdict['OP1.2a'][port] foldername = cameraname.split('_')[0] + '_' + cameraname.split('_')[2] scanpath = os.path.join(IRCamRefImagespath, foldername) coldref, hotref = [], [] for filename in glob.iglob(scanpath + '\*' + str(int(exposuretime)) + 'us.h5', recursive=True): if 'hot' in filename: if verbose>0: print('load_ref_images: read from ',filename) with h5py.File(filename, 'r') as h5in: hotref = h5in[os.path.basename(filename)].value elif 'cold' in filename: if verbose>0: print('load_ref_images: read from ',filename) with h5py.File(filename, 'r') as h5in: coldref = h5in[os.path.basename(filename)].value return coldref, hotref def reconstruct_coldframe (exposuretime, sT, a, bnew, coldref): cirebuild = a * sT + bnew * exposuretime + coldref return cirebuild #%% other functions def check_dublicates(array): a = array import collections return [item for item, count in collections.Counter(a).items() if count > 1] def check_dublicates_2(array): seen = set() uniq = [] for x in array: if x not in seen: uniq.append(x) seen.add(x) return uniq,seen #%% functions regarding wetted area calculation def read_finger_info(file_name=None, OP='OP1.2b', verbose=0): '''Read divertor finger information from file. The referenced fingers are those defined in the IR profile mapping, i.e. the target modlues 5 and 6 are divided in inner and outer fingers (see 'finger_part' in result dictionary). INPUT ----- file_name: str, optional file name of csv fiel with finger information (OPTIONAL: default is None, i.e. decide by OP) OP: str, optional label of operation phase of interest, e.g. 'OP1.2b' (OPTIONAL: default is 'OP1.2b', i.e. load TDU file) verbose: integer, optional feedback level (details of print messages) (OPTIONAL: if not provided, only ERROR output) RESULT ------ finger_dic: dictionary dictionary with keys 'ID', 'target', 'target_element','n_profiles', 'width', 'finger_part' (see NOTES) NOTES ----- contents of result dictionary: * 'ID' numpy array of integers of continuous finger number * 'target' list of strings of target identifier ('h_l' horizontal low-iota, 'h_m' horizontal middle part, 'h_h' horizontal high-iota, 'v' vertical) * 'target_element' numpy array of integers of target module number (1..9 on horizontal target, 1..3 on vertical target) * 'n_profiles' numpy array of integers of number of profiles defined on this finger * 'width' numpy array of floats of centre width of finger in meters * 'finger_part' numpy array of integers indicating with 0 this is a full finger and with 1 this is the second part of the previous finger ''' if file_name is None: # assume OP is given if OP.startswith('OP1'): file_name='finger_info_TDU.csv' elif OP.startswith('OP2'): file_name='finger_info_HHF.csv' full_path = os.path.join(parameter_file_path, file_name) print(full_path) if verbose>0: print('read_finger_info: reading from file {0} in {1}'.format(file_name, parameter_file_path)) if not os.path.isfile(full_path): raise Exception('read_finger_info: ERROR! file not found') finger_dic = {'ID': [], 'target': [], 'target_element':[], 'n_profiles':[], 'width':[], 'finger_part':[]} data = np.genfromtxt(full_path, delimiter=';', dtype=(int, "|S3", int, int, float, int)) for i in range(len(data)): finger_dic['ID'].append( data[i][0] ) finger_dic['target'].append( data[i][1].decode('UTF-8') ) finger_dic['target_element'].append( data[i][2] ) finger_dic['n_profiles'].append( data[i][3] ) finger_dic['width'].append( data[i][4] ) finger_dic['finger_part'].append( data[i][5] ) finger_dic['ID'] = np.array(finger_dic['ID']) finger_dic['target_element'] = np.array(finger_dic['target_element']) finger_dic['n_profiles'] = np.array(finger_dic['n_profiles']) finger_dic['width'] = np.array(finger_dic['width']) finger_dic['finger_part'] = np.array(finger_dic['finger_part']) return finger_dic def derive_wetted_area_per_module(heat_flux, mapping, mode='average', q_max=None, profile_average_range=3, noise_threshold=2E5, ports_loaded=None, plot_it=False, verbose=0): ''' Derive wetted area of heat flux array by integrating the total power load and dividing with a peak heat flux value. The peak heat flux value is either: 1) the maximum within each divertor finger (mode='finger') 2) the maximum of a whole divertor module (mode='module') 3) the maximum of the mean divertor averaged toroidally (mode='average'). Mode average is default. In case the input heat flux has only 2 dimensions (from one divertor), mode 'average' and 'module' result in the same. Returned are the wetted area(s) and the corresponding q_max value(s). INPUT ----- heat_flux: numpy array array of heat fluxes from THEODOR on the profiles defined in the IR mapping; can be 2D (one divertor), or 3D (multiple divertor modules) mapping: dictionary IR profile mapping information as returned by downloadversionIRdara.download_heatflux_mapping_reference(); minimum necessary keys are 'finger_ID' and 's' mode: str, optional label to identify the normalization mode, either 'module' (normalize by peak heat flux per torus module), 'average' (normalize by peak heat flux in the toroidally mean heat flux pattern), 'finger' (normalize by peak heat flux per finger in each torus module) (OPTIONAL: default is 'average') q_max: float or numpy array, optional either single peak heat flux value or a peak heat flux for each divertor module or each finger (depends on mode) (OPTIONAL: default is None, i.e. derived based on mode) profile_average_range: int, optional number of central profiles on each finger to average the integral heat flux on (this avoids hot leading edges and shadowed edges) (OPTIONAL: default is 3 profiles) noise_threshold: float, optional minimum heat flux level to crop heat_flux to, if heat flux has negative values (OPTIONAL: default is 200kW/m²) ports_loaded: list or str or int, optinal label of divertor modules provided in heat_flux array for plots; int of port number for single divertor data and list of port numbers for heat flux from multiple divertor modules; # gets renamed if a mean heat flux pattern is used (mode 'average') (OPTIONAL: default is None, i.e. label will be 'A_w') plot_it: bool, optional switch of whether to plot intermediate results or not (OPTIONAL: deafult is NOT to plot) verbose: integer, optional feedback level (details of print messages) (OPTIONAL: if not provided, only ERROR output) RESULT ------ total_wetted_area: float or numpy array wetted area in a shape that depends on the mode (see NOTES) q_max: float or numpy array peak heat flux used for normalizatin in a shape that depends on the mode (see NOTES) NOTES ----- The shape of the results varies depending on the dimension of the input * 2D: singel divertor modules heat flux * 3D: heat flux from multiple divertor modules and the mode to derive q_max ('module', 'average', 'finger'): * '2D' + 'module' or 'average' --> one value for total_wetted_area and q_max * '3D' + 'average' --> one value for total_wetted_area and q_max * '3D' + 'module' --> 1D numpy arays with a value for each torus module (first dimension of heat_flux) * '2D' + 'finger' --> 1D numpy arays with a value for each divertor finger * '3D' + 'finger' --> 2D numpy arays with a value for each torus module and each divertor finger ''' # prepare mapping and finger information finger_dic = read_finger_info(verbose=verbose-1) finger_ID = finger_dic['ID'] profile_no = mapping['Finger_ID'][0] # find profile IDs of central profiles on each finger central_profiles_on_finger = [] is_central_profile = [] for i_finger in range(len(finger_ID)): n_profiles = finger_dic['n_profiles'][i_finger] i_profile_start = n_profiles//2 - profile_average_range//2 -1 central_profiles = i_finger*100 + np.arange(i_profile_start, i_profile_start+profile_average_range) central_profiles_on_finger.append(central_profiles) is_central_profile.append( np.logical_or.reduce([profile_no == centre_profile for centre_profile in central_profiles]) ) central_profiles_on_finger = np.array(central_profiles_on_finger) if np.nanmin(heat_flux) < 0: heat_flux[heat_flux0: print('derive_wetted_area_per_module: set heat_flux < {0:.1f}kW/m² to 0'.format(noise_threshold/1E3)) heat_flux_dim = len(np.shape(heat_flux)) # reduce dimension of heat_flux if in 'average' mode if heat_flux_dim == 2 and mode == 'average': mode = 'module' elif heat_flux_dim == 3 and mode == 'average': heat_flux = np.nanmean(heat_flux, axis=0) heat_flux_dim = 2 ports_loaded = 'mean heat flux' if verbose>0: print('derive_wetted_area_per_module: averaged 3D heat flux array over first dimension') if heat_flux_dim==3: # assume dimensions: toroidal index (camera ports), row, column n_ports, n_rows, n_cols = np.shape(heat_flux) if verbose>0: print('derive_wetted_area_per_module: deriving wetted area on {0} divertor modules in {1} mode...'.format(n_ports, mode)) # derive q_max for normalization of integral if q_max is None: # derive q_max on every finger q_max_on_finger = [] for i_finger in range(len(finger_ID)): q_max_on_finger.append([np.nanmax(h[is_central_profile[i_finger]]) for h in heat_flux]) q_max_on_finger = np.array(q_max_on_finger) q_max_on_finger[q_max_on_finger==0] = 1 if mode == 'module': # one value per torus half module q_max = np.nanmax(q_max_on_finger, axis=0) elif mode == 'finger': # one value per finger in each torus half module q_max = q_max_on_finger q_max[q_max==0] = 1 # integrate over profiles finger_wetted_area = np.zeros([len(finger_ID), n_ports]) for i_finger in range(len(finger_ID)): # initialize temporary line integral for each module central_line_integral = np.zeros(n_ports) # integrate over each central profile and average for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) s = mapping['s'][ij_profile] h = heat_flux[:,ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num( np.trapz(h, x=s, axis=1) ) # complete averaging process central_line_integral = central_line_integral / profile_average_range # normalize by q_max and multiply with width of finger if mode == 'module': finger_wetted_area[i_finger,:] = central_line_integral / q_max * finger_dic['width'][i_finger] elif mode == 'finger': finger_wetted_area[i_finger,:] = central_line_integral / q_max[i_finger] * finger_dic['width'][i_finger] # get example profiles and wetted area widths at local maxima i_finger_max = np.argmax(q_max_on_finger, axis=0) h_max = [] s_max = [] width_max = [] height_max = [] for i in range(len(i_finger_max)): central_profiles = central_profiles_on_finger[i_finger_max[i]] h_profiles = [ heat_flux[i][profile_no == cp] for cp in central_profiles ] i_max = np.argmax([np.sum(h) for h in h_profiles]) # i_max = np.argsort([np.sum(h) for h in h_profiles])[profile_average_range//2] h_max.append( h_profiles[ i_max ] ) s_max.append( mapping['s'][profile_no == central_profiles[i_max]] ) if mode == 'module': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max[i])) height_max.append(q_max[i]) elif mode == 'finger': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max[i_finger_max[i],i])) height_max.append(q_max[i_finger_max[i],i]) i_samples = [np.argmin(width_max), np.argsort(width_max)[len(width_max)//2], np.argmax(width_max)] i_centre = np.argsort(width_max)[len(width_max)//2] s_max = [s_max[np.argmin(width_max)], s_max[i_centre], s_max[np.argmax(width_max)]] h_max = [h_max[np.argmin(width_max)], h_max[i_centre], h_max[np.argmax(width_max)]] height_max = [height_max[np.argmin(width_max)], height_max[i_centre], height_max[np.argmax(width_max)]] width_max = [width_max[np.argmin(width_max)], width_max[i_centre], width_max[np.argmax(width_max)]] elif heat_flux_dim==2: # assume dimensions: row, column if verbose>0: print('derive_wetted_area_per_module: deriving wetted area on single divertor module in {0} mode...'.format(mode)) # derive q_max for normalization of integral if q_max is None: if mode == 'average' or mode == 'module': # one value q_max = np.nanmax(heat_flux[np.logical_or.reduce(is_central_profile)]) elif mode == 'finger': # one value per finger q_max = [] for i_finger in range(len(finger_ID)): q_max.append( np.nanmax(heat_flux[is_central_profile[i_finger]]) ) q_max = np.array(q_max) q_max[q_max==0] = 1 # integrate over profiles finger_wetted_area = np.zeros([len(finger_ID)]) for i_finger in range(len(finger_ID)): # integrate over each central profile and average central_line_integral = 0 for i_profile in central_profiles_on_finger[i_finger]: ij_profile = np.where(profile_no==i_profile) s = mapping['s'][ij_profile] h = heat_flux[ij_profile[0],ij_profile[1]] central_line_integral += np.nan_to_num( np.trapz(h, x=s, axis=0) ) central_line_integral = central_line_integral / profile_average_range if mode == 'average' or mode == 'module': finger_wetted_area[i_finger] = central_line_integral / q_max * finger_dic['width'][i_finger] elif mode == 'finger': finger_wetted_area[i_finger] = central_line_integral / q_max[i_finger] * finger_dic['width'][i_finger] # get example profiles and wetted area widths at local maxima i_samples = [np.nanargmin(finger_wetted_area), np.argsort(finger_wetted_area)[len(finger_wetted_area)//2], np.nanargmax(finger_wetted_area)] h_max = [] s_max = [] width_max = [] height_max = [] for i in range(len(i_samples)): central_profiles = central_profiles_on_finger[i_samples[i]] h_profiles = [ heat_flux[profile_no == cp] for cp in central_profiles ] i_max = np.argmax([np.sum(h) for h in h_profiles]) h_max.append( h_profiles[ i_max ] ) s_max.append( mapping['s'][profile_no == central_profiles[i_max]] ) if mode == 'module' or mode == 'average': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max)) height_max.append(q_max) elif mode == 'finger': width_max.append(np.nan_to_num(np.trapz(h_max[-1], x=s_max[-1]) / q_max[i_samples[i]])) height_max.append(q_max[i_samples[i]]) # merge half-fingers of TM5 and TM6 if np.any(finger_dic['finger_part']): if verbose>0: print('derive_wetted_area_per_module: merge wetted area on half fingers of TM05 and TM06') new_finger_ID = np.copy(finger_ID) # scan backwards over fingers, merge and delete second finger halfs for i_finger in finger_ID[:0:-1]: if finger_dic['finger_part'][i_finger]: new_finger_ID = np.delete(new_finger_ID, i_finger) if heat_flux_dim==3 and mode != 'average': finger_wetted_area[i_finger-1,:] = finger_wetted_area[i_finger-1,:] + finger_wetted_area[i_finger,:] else: finger_wetted_area[i_finger-1] = finger_wetted_area[i_finger-1] + finger_wetted_area[i_finger] finger_wetted_area = np.delete(finger_wetted_area, i_finger, axis=0) if mode == 'finger' and heat_flux_dim==3: q_max[i_finger-1,:] = np.maximum(q_max[i_finger-1,:], q_max[i_finger,:]) q_max = np.delete(q_max, i_finger, axis=0) elif mode == 'finger' and heat_flux_dim==2: q_max[i_finger-1] = np.maximum(q_max[i_finger-1], q_max[i_finger]) q_max = np.delete(q_max, i_finger, axis=0) # sum up # 'average' mode: sum all fingers over all torus modules --> wetted area of all divertors # divide by n_ports --> get average wetted area per divertor # 'module' mode: in each torus module sum over wetted area on all fingers # --> individual wetted areas per divertor # 'finger' mode: do not sum, since each finger was normalized with a differen q_max # --> individual wetted areas per finger # if only one divertors heat flux is given, proceed as in local mode if mode == 'finger': total_wetted_area = finger_wetted_area else: total_wetted_area = np.sum(finger_wetted_area, axis=0) # if requested, make some visalization if plot_it: if ports_loaded is None: if heat_flux_dim==3: ports_label = ['port index {0}'.format(i) for i in range(n_ports)] elif heat_flux_dim==2: ports_label = ['A_w'] elif isinstance(ports_loaded, list) or isinstance(ports_loaded, np.ndarray): ports_label = ['module {0} {1}'.format(int(port/10), ['L','U'][port%10]) for port in ports_loaded] elif isinstance(ports_loaded, int) or isinstance(ports_loaded, np.int_): ports_label = ['module {0} {1}'.format(int(int(ports_loaded)/10), ['L','U'][int(ports_loaded)%10])] elif isinstance(ports_loaded, str): ports_label = [ports_loaded] # wetted areas vs fingers plt.figure() if heat_flux_dim==3 and mode != 'average': for i_port in range(n_ports): plt.plot(new_finger_ID, finger_wetted_area[:,i_port]*1E4, label=ports_label[i_port]) elif heat_flux_dim==3 and mode == 'average': plt.plot(new_finger_ID, finger_wetted_area*1E4, label='mean heat flux') else: plt.plot(new_finger_ID, finger_wetted_area*1E4, label=ports_label[0]) plt.title('normalized with q_max of {0}'.format(mode)) plt.xlabel('finger no.') plt.ylabel('wetted area [cm²]') plt.legend() plt.show() # wetted areas vs torus modules if heat_flux_dim==3: if mode == 'module': i_ports = np.arange(n_ports) plt.figure() plt.plot(i_ports, total_wetted_area*1E2) plt.xticks(i_ports, [port[-3:] for port in ports_label], rotation='vertical') plt.xlabel('module') plt.ylabel('wetted area [dm²]') plt.tight_layout() plt.show() # central finger profiles and the rectangle with the same integral # (the width represented to contribution to the wetted area) plt.figure() if heat_flux_dim==3: label_str = [ports_label[i] for i in i_samples] if mode=='module': plt.title('wetted area examples from fingers with max Int[q]\nintegrated on their respective divertor and normalized to a local q_max') elif mode == 'average': plt.title('wetted area examples from fingers with max Int[q]\nintegrated on the mean heat flux pattern and normalized to a global q_max') elif mode == 'finger': plt.title('wetted area examples from fingers with max Int[q]\nintegrated on their respective divertor and normalized to a finger q_max') elif heat_flux_dim==2: label_str = ['min(A_w)', 'median(A_w)', 'max(A_w)'] if mode == 'finger': plt.title('wetted area examples from fingers\nintegrated on their respective divertor and normalized to a finger q_max') elif mode == 'module': plt.title('wetted area examples from fingers\nintegrated on their respective divertor and normalized to a local q_max') elif mode == 'average': plt.title('wetted area examples from fingers\nintegrated on a mean heat flux pattern and normalized to a its q_max') color_str = ['b', 'g', 'r'] rect = [] for i in range(len(i_samples)): plt.plot(s_max[i], h_max[i]/1E3, label=label_str[i], color=color_str[i]) width = width_max[i] height = height_max[i]/1E3 start_point = (s_max[i][np.argmax(h_max[i])]-width/2,0) rect.append(patches.Rectangle(start_point,width,height,linewidth=1, edgecolor=color_str[i], facecolor='none')) plt.gca().add_patch(rect[-1]) plt.xlabel('s [m]') plt.ylabel('q [kW/m²]') plt.legend() plt.show() return total_wetted_area, q_max