From e5f8538cb2fd58fb67aaaef64526ca132af01fb7 Mon Sep 17 00:00:00 2001
From: Ultima <theo.steininger@ultimanet.de>
Date: Thu, 8 Oct 2015 20:57:15 +0200
Subject: [PATCH] First working and performant version of new los_response!!
 Fixed allreduce and Allreduce in MPI_dummy.py. Added Option to d2o to skip
 the parsing of the input parameters. Added cumsum function to d2o.

---
 dummys/MPI_dummy.py           |   4 +-
 nifty_mpi_data.py             |  59 ++++++++-
 operators/line_integrator.pyx | 208 +++++++++++++++++++++++++-----
 operators/nifty_los.py        | 234 +++++++++++++++++++++++++++-------
 setup.py                      |   8 +-
 test/test_nifty_mpi_data.py   |  14 ++
 6 files changed, 441 insertions(+), 86 deletions(-)

diff --git a/dummys/MPI_dummy.py b/dummys/MPI_dummy.py
index 90c697ed7..797b866b7 100644
--- a/dummys/MPI_dummy.py
+++ b/dummys/MPI_dummy.py
@@ -79,11 +79,11 @@ class Intracomm(Comm):
         return self._scattergather_helper(*args, **kwargs)
 
     def Allreduce(self, sendbuf, recvbuf, op, **kwargs):
-        recvbuf[:] = op(sendbuf)
+        recvbuf[:] = sendbuf
         return recvbuf
 
     def allreduce(self, sendbuf, recvbuf, op, **kwargs):
-        recvbuf[:] = op(sendbuf)
+        recvbuf[:] = sendbuf
         return recvbuf
 
     def sendrecv(self, sendobj, **kwargs):
diff --git a/nifty_mpi_data.py b/nifty_mpi_data.py
index ef2d05e60..559096421 100644
--- a/nifty_mpi_data.py
+++ b/nifty_mpi_data.py
@@ -87,7 +87,6 @@ class distributed_data_object(object):
             If the supplied distribution strategy is not known.
 
     """
-
     def __init__(self, global_data=None, global_shape=None, dtype=None,
                  local_data=None, local_shape=None,
                  distribution_strategy='fftw', hermitian=False,
@@ -904,6 +903,14 @@ class distributed_data_object(object):
         else:
             return work_d2o
 
+    def cumsum(self, axis=None):
+        cumsum_data = self.distributor.cumsum(self.data, axis=axis)
+        result_d2o = self.copy_empty()
+        if axis is None:
+            result_d2o = result_d2o.flatten(inplace=True)
+        result_d2o.set_local_data(cumsum_data)
+        return result_d2o
+
     def save(self, alias, path=None, overwriteQ=True):
         """
             Saves a distributed_data_object to disk utilizing h5py.
@@ -948,10 +955,20 @@ class _distributor_factory(object):
                      global_data=None, global_shape=None,
                      local_data=None, local_shape=None,
                      alias=None, path=None,
-                     dtype=None, **kwargs):
+                     dtype=None, skip_parsing=False, **kwargs):
+
+        if skip_parsing:
+            return_dict = {'comm': comm,
+                           'dtype': dtype,
+                           'name': distribution_strategy
+                           }
+            if distribution_strategy in STRATEGIES['global']:
+                return_dict['global_shape'] = global_shape
+            elif distribution_strategy in STRATEGIES['local']:
+                return_dict['local_shape'] = local_shape
+            return return_dict
 
         return_dict = {}
-
         # Check that all nodes got the same distribution_strategy
         strat_list = comm.allgather(distribution_strategy)
         if all(x == strat_list[0] for x in strat_list) == False:
@@ -998,7 +1015,7 @@ class _distributor_factory(object):
             else:
                 dtype = np.dtype(dtype)
 
-        elif distribution_strategy in ['freeform']:
+        elif distribution_strategy in STRATEGIES['local']:
             if dtype is None:
                 if isinstance(global_data, distributed_data_object):
                     dtype = global_data.dtype
@@ -1021,7 +1038,7 @@ class _distributor_factory(object):
 
         # Parse the shape
         # Case 1: global-type slicer
-        if distribution_strategy in ['not', 'equal', 'fftw']:
+        if distribution_strategy in STRATEGIES['global']:
             if dset is not None:
                 global_shape = dset.shape
             elif global_data is not None and np.isscalar(global_data) == False:
@@ -1376,6 +1393,9 @@ class _slicing_distributor(distributor):
                 hermitian = True
             elif local_data is None:
                 local_data = np.empty(self.local_shape, dtype=self.dtype)
+            elif isinstance(local_data, np.ndarray):
+                local_data = local_data.astype(
+                               self.dtype, copy=copy).reshape(self.local_shape)
             else:
                 local_data = np.array(local_data).astype(
                     self.dtype, copy=copy).reshape(self.local_shape)
@@ -2179,6 +2199,22 @@ class _slicing_distributor(distributor):
                            local_where)
         return global_where
 
+    def cumsum(self, data, axis):
+        # compute the local np.cumsum
+        local_cumsum = np.cumsum(data, axis=axis)
+        if axis is None or axis == 0:
+            # communicate the highest value from node to node
+            rank = self.comm.rank
+            if local_cumsum.shape[0] == 0:
+                local_shift = np.zeros((), dtype=local_cumsum.dtype)
+            else:
+                local_shift = local_cumsum[-1]
+            local_shift_list = self.comm.allgather(local_shift)
+            local_sum_of_shift = np.sum(local_shift_list[:rank],
+                                        axis=0)
+            local_cumsum += local_sum_of_shift
+        return local_cumsum
+
     def _sliceify(self, inp):
         sliceified = []
         result = []
@@ -2196,7 +2232,10 @@ class _slicing_distributor(distributor):
             else:
                 if x[i] >= self.global_shape[i]:
                     raise IndexError('Index out of bounds!')
-                result += [slice(x[i], x[i] + 1), ]
+                if x[i] == -1:
+                    result += [slice(-1, None)]
+                else:
+                    result += [slice(x[i], x[i] + 1), ]
                 sliceified += [True, ]
 
         return (tuple(result), sliceified)
@@ -2412,7 +2451,6 @@ class _slicing_distributor(distributor):
                     "the distributed_data_object."))
             # check dtype
             if dset.dtype != self.dtype:
-                print ('dsets', dset.dtype, self.dtype)
                 raise TypeError(about._errors.cstring(
                     "ERROR: The datatype of the given dataset does not " +
                     "match the one of the distributed_data_object."))
@@ -2542,6 +2580,8 @@ class _not_distributor(distributor):
             return np.empty(self.global_shape, dtype=self.dtype)
         elif isinstance(data, distributed_data_object):
             new_data = data.get_full_data()
+        elif isinstance(data, np.ndarray):
+            new_data = data
         else:
             new_data = np.array(data)
         return new_data.astype(self.dtype,
@@ -2608,6 +2648,11 @@ class _not_distributor(distributor):
                            local_where)
         return global_where
 
+    def cumsum(self, data, axis):
+        # compute the local results from np.cumsum
+        cumsum = np.cumsum(data, axis=axis)
+        return cumsum
+
     if 'h5py' in gdi:
         def save_data(self, data, alias, path=None, overwriteQ=True):
             comm = self.comm
diff --git a/operators/line_integrator.pyx b/operators/line_integrator.pyx
index a5f84e244..5482cb408 100644
--- a/operators/line_integrator.pyx
+++ b/operators/line_integrator.pyx
@@ -1,11 +1,12 @@
-# -*- coding: utf-8 -*-
 #cython: nonecheck=False
 #cython: boundscheck=False
 #cython: wraparound=False
+#cython: cdivision=True
 
 import numpy as np
 cimport numpy as np
 cimport cython
+from scipy.special import erf
 
 FLOAT = np.float
 ctypedef np.float_t FLOAT_t
@@ -13,16 +14,18 @@ ctypedef np.float_t FLOAT_t
 INT = np.int
 ctypedef np.int_t INT_t
 
+ctypedef np.ndarray[FLOAT_t, ndim=1] (*error_function_type)(np.ndarray[FLOAT_t,
+                                                                       ndim=1])
 cdef extern from "numpy/npy_math.h":
     bint isnan(double x)
     bint signbit(double x)
     double ceil(double x)
     double floor(double x)
     double sqrt(double x)
+#from libc.math cimport isnan, signbit, ceil, floor, sqrt
 
 cdef FLOAT_t NAN = float("NaN")
 
-
 cdef class line_integrator(object):
     cdef tuple shape
     cdef list start
@@ -37,22 +40,57 @@ cdef class line_integrator(object):
         assert(np.all(np.array(self.shape) != 0))
         assert(len(self.shape) == len(self.start) == len(self.end))
 
-    cpdef tuple integrate(self):
+    cpdef tuple integrate(self, bint with_cumsum=False):
+        cdef list indices
+        cdef np.ndarray[FLOAT_t, ndim=1] weights, normalized_cumsum_of_weights
+
         if list_equal_Q(self.start, self.end):
-            return self._empty_results()
+            return self._empty_results(with_cumsum)
         try:
             projected_start = self._project_to_cuboid('start')
             projected_end = self._project_to_cuboid('end')
         except ValueError:
-            return self._empty_results()
+            return self._empty_results(with_cumsum)
 
         (indices, weights) = self._integrate_through_cuboid(projected_start,
                                                             projected_end)
-        return (indices, weights)
+        if with_cumsum:
+            normalized_cumsum_of_weights = self._cumsum(weights,
+                                                        projected_start)
+            return (indices, weights, normalized_cumsum_of_weights)
+        else:
+            return (indices, weights)
 
-    def _empty_results(self):
-        return ([np.array([], dtype=INT)] * len(self.shape),
-                np.array([], dtype=FLOAT))
+    def _empty_results(self, with_cumsum):
+        if with_cumsum:
+            return ([np.array([], dtype=INT)] * len(self.shape),
+                     np.array([], dtype=FLOAT),
+                     np.array([], dtype=FLOAT))
+
+        else:
+            return ([np.array([], dtype=INT)] * len(self.shape),
+                     np.array([], dtype=FLOAT))
+
+    cpdef np.ndarray[FLOAT_t, ndim=1] _cumsum(self,
+                                           np.ndarray[FLOAT_t, ndim=1] weights,
+                                           list projected_start):
+        cdef list distance = list_sub(self.end, self.start)
+        cdef FLOAT_t total_length = list_norm(distance)
+        cdef list initial_skip = list_sub(projected_start, self.start)
+        cdef FLOAT_t skipped_length = list_norm(initial_skip)
+
+        cdef int i
+        cdef np.ndarray[FLOAT_t, ndim=1] cumsum = np.empty_like(weights)
+        cdef FLOAT_t[:] cumsum_view = cumsum
+        cdef FLOAT_t[:] weights_view = weights
+
+        cumsum_view[0] = (skipped_length + weights_view[0]/2.)/total_length
+        for i in xrange(1, len(weights)):
+            cumsum_view[i] = (cumsum_view[i-1] +
+                              (weights_view[i] +
+                               weights_view[i-1])/2./total_length)
+
+        return cumsum
 
     cpdef list _project_to_cuboid(self, str mode):
         cdef list a, b, c, p, surface_list, translator_list,\
@@ -153,23 +191,25 @@ cdef class line_integrator(object):
         cdef np.ndarray[FLOAT_t, ndim=1] weight_list = np.empty(num_estimate,
                                                                 FLOAT)
 
-
         current_position = start
         i = 0
         while True:
             next_position, weight = self._get_next_position(current_position,
                                                             end)
             floor_current_position = list_floor(current_position)
+            floor_next_position = list_floor(next_position)
             for j in xrange(len(start)):
-                index_list[i, j] = floor_current_position[j]
+                if floor_current_position[j] < floor_next_position[j]:
+                    index_list[i, j] = floor_current_position[j]
+                else:
+                    index_list[i, j] = floor_next_position[j]
             weight_list[i] = weight
-
-            if floor_current_position == list_floor(end):
+            if next_position == end:
                 break
 
             current_position = next_position
             i += 1
-        return (list(index_list[:i].T), weight_list[:i])
+        return (list(index_list[:i+1].T), weight_list[:i+1])
 
 
     cdef tuple _get_next_position(self,
@@ -209,15 +249,114 @@ cdef class line_integrator(object):
         if isnan(best_translator_norm):
             floor_position = list_floor(position)
             # check if position is in the same cell as the endpoint
-            assert(floor_position == list_floor(end_position))
+            #assert(floor_position == list_floor(end_position))
             weight = list_norm(list_sub(end_position, position))
-            next_position = position
+            next_position = end_position
         else:
             next_position = list_add(position, best_translator)
             weight = list_norm(best_translator)
         return (next_position, weight)
 
 
+
+cpdef list multi_integrator(list starts,
+                             list ends,
+                             list sigmas_low,
+                             list sigmas_up,
+                             tuple shape,
+                             list distances,
+                             object error_function):
+
+    cdef INT_t i, j, total_number = len(starts[0]), dim = len(shape)
+    cdef list indices_and_weights_list = []
+    cdef list start = [None]*dim, end = [None]*dim
+    cdef list sigma_low = [None]*dim, sigma_up = [None]*dim
+
+
+    for i in xrange(total_number):
+        for j in xrange(dim):
+            start[j] = starts[j][i]
+            end[j] = ends[j][i]
+            sigma_low[j] = sigmas_low[j][i]
+            sigma_up[j] = sigmas_up[j][i]
+
+        indices_and_weights_list += single_integrator(i,
+                                                      start,
+                                                      end,
+                                                      sigma_low,
+                                                      sigma_up,
+                                                      shape,
+                                                      distances,
+                                                      error_function)
+
+    return indices_and_weights_list
+
+cdef list single_integrator(INT_t index,
+                             list start,
+                             list end,
+                             list sigma_low,
+                             list sigma_up,
+                             tuple shape,
+                             list distances,
+                             object error_function):
+    cdef np.ndarray[FLOAT_t, ndim=1] pure_weights, low_weights, up_weights, \
+        low_lengths, up_lengths
+    cdef list pure_indices, low_indices, up_indices
+    # compute the three parts of a full line of sight
+    (pure_indices, pure_weights) = line_integrator(
+                                        shape, start, sigma_low).integrate()
+
+    (low_indices, low_weights, low_lengths) = line_integrator(
+                            shape, sigma_low, end).integrate(True)
+
+    (up_indices, up_weights, up_lengths) = line_integrator(
+                            shape, end, sigma_up).integrate(True)
+
+    # apply the error function on the sigma_low and sigma_up intervalls
+    low_weights = _apply_error_function(low_weights, low_lengths, False,
+                                        error_function)
+    up_weights = _apply_error_function(up_weights, up_lengths, True,
+                                       error_function)
+
+    # correct the volume distortion
+    cdef list direction = list_sub(end, start)
+    cdef FLOAT_t rescaler = (list_norm(list_mult(direction, distances))/
+                             list_norm(direction))
+    pure_weights *= rescaler
+    low_weights *= rescaler
+    up_weights *= rescaler
+
+    # construct the result tuple
+    cdef list result_list = []
+    if pure_weights.shape[0] != 0:
+        result_list += [[index, pure_indices, pure_weights],]
+    if low_weights.shape[0] != 0:
+        result_list += [[index, low_indices, low_weights],]
+    if up_weights.shape[0] != 0:
+        result_list += [[index, up_indices, up_weights],]
+
+    return result_list
+
+
+cdef np.ndarray[FLOAT_t, ndim=1] _apply_error_function(
+                                        np.ndarray[FLOAT_t, ndim=1] weights,
+                                        np.ndarray[FLOAT_t, ndim=1] lengths,
+                                        bint up_Q,
+                                        object error_function):
+    cdef np.ndarray[FLOAT_t, ndim=1] output_weights
+    if up_Q:
+        output_weights = weights * error_function(lengths)
+    else:
+        output_weights = weights * error_function(-1 + lengths)
+    return output_weights
+
+cpdef np.ndarray[FLOAT_t, ndim=1] gaussian_error_function(
+                                                np.ndarray[FLOAT_t, ndim=1] x):
+    cdef FLOAT_t sigma = 0.5
+    return 0.5*(1 - erf(x / (sqrt(2.)*sigma)))
+
+
+
 cdef INT_t strong_floor(FLOAT_t x):
     cdef FLOAT_t floor_x
     floor_x = floor(x)
@@ -226,7 +365,7 @@ cdef INT_t strong_floor(FLOAT_t x):
     else:
         return INT(floor_x)
 
-cpdef INT_t strong_ceil(FLOAT_t x):
+cdef INT_t strong_ceil(FLOAT_t x):
     cdef FLOAT_t ceil_x
     ceil_x = ceil(x)
     if ceil_x == x:
@@ -278,33 +417,40 @@ cdef bint list_all_le(list list1, list list2):
     return True
 
 cdef list list_add(list list1, list list2):
-    cdef int ndim = len(list1)
+    cdef int i, ndim = len(list1)
     cdef list result = [None]*ndim
     for i in xrange(ndim):
         result[i] = list1[i] + list2[i]
     return result
 
 cdef list list_sub(list list1, list list2):
-    cdef int ndim = len(list1)
+    cdef int i, ndim = len(list1)
     cdef list result = [None]*ndim
     for i in xrange(ndim):
         result[i] = list1[i] - list2[i]
     return result
 
+cdef list list_mult(list list1, list list2):
+    cdef int i, ndim = len(list1)
+    cdef list result = [None]*ndim
+    for i in xrange(ndim):
+        result[i] = list1[i] * list2[i]
+    return result
 
-#def test2():
-#    print ceil(1.5)
-#    print floor(1.5)
-#
-#def test():
-#    l = los_integrator_pure((1000,1000,1000), (-1,-1,-10), (1001, 1002, 1003))
-#    for i in xrange(30000):
-#        l._get_next_position([1.,1.1,1.2], [10.,10.,11.])
-#
-#def test3():
-#    print sqrt(3)
-#
 
+cdef list list_scalar_mult(list list1, FLOAT_t scaler):
+    cdef int ndim = len(list1)
+    cdef list result = [None]*ndim
+    for i in xrange(ndim):
+        result[i] = list1[i]*scaler
+    return result
+
+cdef list list_scalar_div(list list1, FLOAT_t scaler):
+    cdef int ndim = len(list1)
+    cdef list result = [None]*ndim
+    for i in xrange(ndim):
+        result[i] = list1[i]/scaler
+    return result
 
 
 
diff --git a/operators/nifty_los.py b/operators/nifty_los.py
index c03a40a47..61379280f 100644
--- a/operators/nifty_los.py
+++ b/operators/nifty_los.py
@@ -1,23 +1,36 @@
 # -*- coding: utf-8 -*-
 
 import numpy as np
-from line_integrator import line_integrator
 
-from nifty.keepers import about
+from line_integrator import multi_integrator, \
+                            gaussian_error_function
+
+from nifty.keepers import about,\
+                          global_dependency_injector as gdi
+from nifty.nifty_mpi_data import distributed_data_object,\
+                                 STRATEGIES
+from nifty.nifty_core import point_space,\
+                             field
 from nifty.rg import rg_space
 from nifty.operators import operator
 
+MPI = gdi['MPI']
 
 class los_response(operator):
 
     def __init__(self, domain, starts, ends, sigmas_low=None, sigmas_up=None,
-                 zero_point=None, error_function=lambda x: 0.5):
+                 zero_point=None, error_function=gaussian_error_function,
+                 target=None):
+
         if not isinstance(domain, rg_space):
             raise TypeError(about._errors.cstring(
                 "ERROR: The domain must be a rg_space instance."))
         self.domain = domain
+        self.codomain = self.domain.get_codomain()
 
-        if not callable(error_function):
+        if callable(error_function):
+            self.error_function = error_function
+        else:
             raise ValueError(about._errors.cstring(
                 "ERROR: error_function must be callable."))
 
@@ -29,6 +42,20 @@ class los_response(operator):
                                                     starts, ends, sigmas_low,
                                                     sigmas_up, zero_point)
 
+        self.local_weights_and_indices = self._compute_weights_and_indices()
+
+        self.number_of_los = len(self.sigmas_low)
+
+        if target is None:
+            self.target = point_space(num=self.number_of_los,
+                                      dtype=self.domain.dtype,
+                                      datamodel=self.domain.datamodel,
+                                      comm=self.domain.comm)
+        else:
+            self.target = target
+
+        self.cotarget = self.target.get_codomain()
+
         self.imp = True
         self.uni = False
         self.sym = False
@@ -96,12 +123,11 @@ class los_response(operator):
         parsed_ends = self._parse_startsends(ends, number_of_los)
 
         # check that sigmas_up/lows have the right shape and parse scalars
-        parsed_sigmas_up = self._parse_sigmas_uplows(sigmas_up, number_of_los)
         parsed_sigmas_low = self._parse_sigmas_uplows(sigmas_low,
                                                       number_of_los)
-
-        return (parsed_starts, parsed_ends, parsed_sigmas_up,
-                parsed_sigmas_low, parsed_zero_point)
+        parsed_sigmas_up = self._parse_sigmas_uplows(sigmas_up, number_of_los)
+        return (parsed_starts, parsed_ends, parsed_sigmas_low,
+                parsed_sigmas_up, parsed_zero_point)
 
     def _parse_startsends(self, coords, number_of_los):
         result_coords = [None]*len(coords)
@@ -129,42 +155,162 @@ class los_response(operator):
                 "numpy ndarray."))
         return parsed_sig
 
-    def convert_indices_to_physical(self, pixel_coordinates):
-        # first of all, compute the phyiscal distance of the given pixel
-        # from the zeroth-pixel
-        phyiscal_distance = np.array(pixel_coordinates) * \
-                            np.array(self.domain.distances)
-        # add the offset of the zeroth pixel with respect to the coordinate
-        # system
-        physical_position = phyiscal_distance + np.array(self.zero_point)
-        return physical_position.tolist()
-
-    def convert_physical_to_indices(self, physical_position):
-        # compute the distance to the zeroth pixel
-        relative_position = np.array(physical_position) - \
-                            np.array(self.zero_point)
-        # rescale the coordinates to the uniform grid
-        pixel_coordinates = relative_position / np.array(self.domain.distances)
-        return pixel_coordinates.tolist()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+    def convert_physical_to_indices(self, physical_positions):
+        pixel_coordinates = [None]*len(physical_positions)
+        local_zero_point = self._get_local_zero_point()
+
+        for i in xrange(len(pixel_coordinates)):
+            # Compute the distance to the zeroth pixel.
+            # Then rescale the coordinates to the uniform grid.
+            pixel_coordinates[i] = ((physical_positions[i] -
+                                     local_zero_point[i]) /
+                                    self.domain.distances[i]) + 0.5
+
+        return pixel_coordinates
+
+    def _convert_physical_to_pixel_lengths(self, lengths, starts, ends):
+        directions = np.array(ends) - np.array(starts)
+        distances = np.array(self.domain.distances)[:, None]
+        rescalers = (np.linalg.norm(directions / distances, axis=0) /
+                     np.linalg.norm(directions, axis=0))
+        return lengths * rescalers
+
+    def _convert_sigmas_to_physical_coordinates(self, starts, ends,
+                                                sigmas_low, sigmas_up):
+        starts = np.array(starts)
+        ends = np.array(ends)
+        c = ends - starts
+        abs_c = np.linalg.norm(c, axis=0)
+        sigmas_low_coords = list(starts + (abs_c - sigmas_low)*c/abs_c)
+        sigmas_up_coords = list(starts + (abs_c + sigmas_up)*c/abs_c)
+        return (sigmas_low_coords, sigmas_up_coords)
+
+    def _get_local_zero_point(self):
+        if self.domain.datamodel == 'np':
+            return self.zero_point
+        elif self.domain.datamodel in STRATEGIES['not']:
+            return self.zero_point
+        elif self.domain.datamodel in STRATEGIES['slicing']:
+            dummy_d2o = distributed_data_object(
+                                global_shape=self.domain.get_shape(),
+                                dtype=np.dtype('int16'),
+                                distribution_strategy=self.domain.datamodel,
+                                skip_parsing=True)
+
+            pixel_offset = dummy_d2o.distributor.local_start
+            distance_offset = pixel_offset * self.domain.distances[0]
+            local_zero_point = self.zero_point[:]
+            local_zero_point[0] += distance_offset
+            return local_zero_point
+        else:
+            raise NotImplementedError(about._errors.cstring(
+                "ERROR: The space's datamodel is not supported:" +
+                str(self.domain.datamodel)))
+
+    def _get_local_shape(self):
+        if self.domain.datamodel == 'np':
+            return self.domain.get_shape()
+        elif self.domain.datamodel in STRATEGIES['not']:
+            return self.domain.get_shape()
+        elif self.domain.datamodel in STRATEGIES['slicing']:
+            dummy_d2o = distributed_data_object(
+                                global_shape=self.domain.get_shape(),
+                                dtype=np.dtype('int'),
+                                distribution_strategy=self.domain.datamodel,
+                                skip_parsing=True)
+            return dummy_d2o.distributor.local_shape
+
+    def _compute_weights_and_indices(self):
+        # compute the local pixel coordinates for the starts and ends
+        localized_pixel_starts = self.convert_physical_to_indices(self.starts)
+        localized_pixel_ends = self.convert_physical_to_indices(self.ends)
+
+        # Convert the sigmas from physical distances to pixel coordinates
+        # Therefore transform the distances to physical coordinates...
+        (sigmas_low_coords, sigmas_up_coords) = \
+            self._convert_sigmas_to_physical_coordinates(self.starts,
+                                                         self.ends,
+                                                         self.sigmas_low,
+                                                         self.sigmas_up)
+        # ...and then transform them to pixel coordinates
+        localized_pixel_sigmas_low = self.convert_physical_to_indices(
+                                                             sigmas_low_coords)
+        localized_pixel_sigmas_up = self.convert_physical_to_indices(
+                                                             sigmas_up_coords)
+
+        # get the shape of the local data slice
+        local_shape = self._get_local_shape()
+        # let the cython function do the hard work of integrating over
+        # the individual lines
+        local_indices_and_weights_list = multi_integrator(
+                                                  localized_pixel_starts,
+                                                  localized_pixel_ends,
+                                                  localized_pixel_sigmas_low,
+                                                  localized_pixel_sigmas_up,
+                                                  local_shape,
+                                                  list(self.domain.distances),
+                                                  self.error_function)
+        return local_indices_and_weights_list
+
+    def _multiply(self, input_field):
+        # extract the local data array from the input field
+        try:
+            local_input_data = input_field.val.data
+        except AttributeError:
+            local_input_data = input_field.val
+
+        local_result = np.zeros(self.number_of_los, dtype=self.target.dtype)
+
+        for i in xrange(len(self.local_weights_and_indices)):
+            current_weights_and_indices = self.local_weights_and_indices[i]
+            (los_index, indices, weights) = current_weights_and_indices
+            local_result[los_index] += \
+                np.sum(local_input_data[indices]*weights)
+
+        if self.domain.datamodel == 'np':
+            global_result = local_result
+        elif self.domain.datamodel is STRATEGIES['not']:
+            global_result = local_result
+        if self.domain.datamodel in STRATEGIES['slicing']:
+            global_result = np.empty_like(local_result)
+            self.domain.comm.Allreduce(local_result, global_result, op=MPI.SUM)
+
+        result_field = field(self.target, val=global_result)
+        return result_field
+
+    def _adjoint_multiply(self, input_field):
+        # get the full data as np.ndarray from the input field
+        try:
+            full_input_data = input_field.val.get_full_data()
+        except AttributeError:
+            full_input_data = input_field.val
+
+        # produce a data_object suitable to domain
+        global_result_data_object = self.domain.cast(0)
+
+        # set the references to the underlying np arrays
+        try:
+            local_result_data = global_result_data_object.data
+        except AttributeError:
+            local_result_data = global_result_data_object
+
+        for i in xrange(len(self.local_weights_and_indices)):
+            current_weights_and_indices = self.local_weights_and_indices[i]
+            (los_index, indices, weights) = current_weights_and_indices
+            local_result_data[indices] += \
+                (full_input_data[los_index]*weights)
+
+        # weight the result
+        local_result_data /= self.domain.get_vol()
+
+        # construct the result field
+        result_field = field(self.domain)
+        try:
+            result_field.val.data = local_result_data
+        except AttributeError:
+            result_field.val = local_result_data
+
+        return result_field
 
 
 
diff --git a/setup.py b/setup.py
index 6532f3227..62631a8cf 100644
--- a/setup.py
+++ b/setup.py
@@ -26,9 +26,13 @@ from Cython.Distutils import build_ext
 import sys
 import os
 
+#os.environ["CC"] = "g++-4.8"
+#os.environ["CXX"] = "g++-4.8"
+
 ext_modules=[Extension(
-                   "line_integrator_vector",
-                   ["operators/line_integrator.pyx"])]
+                   "line_integrator",
+                   ["operators/line_integrator.pyx"],)]# "vector.pxd"],
+                   #language='c++')]
 
 
 setup(name="ift_nifty",
diff --git a/test/test_nifty_mpi_data.py b/test/test_nifty_mpi_data.py
index e3e544598..6ce762ff1 100644
--- a/test/test_nifty_mpi_data.py
+++ b/test/test_nifty_mpi_data.py
@@ -1676,6 +1676,20 @@ class Test_special_methods(unittest.TestCase):
         assert_equal(obj.real.get_full_data(), a.real)
         assert_equal(obj.imag.get_full_data(), a.imag)
 
+###############################################################################
+
+    @parameterized.expand(
+        itertools.product([None, 0, 1, 2],
+                          all_distribution_strategies),
+        testcase_func_name=custom_name_func)
+    def test_cumsum(self, axis, distribution_strategy):
+        global_shape = (3, 4, 5)
+        dtype = np.dtype('float')
+        (a, obj) = generate_data(global_shape, dtype,
+                                 distribution_strategy)
+        assert_equal(obj.cumsum(axis=axis).get_full_data(),
+                     a.cumsum(axis=axis))
+
 ###############################################################################
 ###############################################################################
 
-- 
GitLab