Commit 981f15b5 authored by Thomas Purcell's avatar Thomas Purcell
Browse files

Begin allowing distributed nodes

phi stored across all nodes not stored on all nodes
parent d9e086e6
......@@ -62,13 +62,13 @@ void FeatureSpace::generate_feature_space()
std::vector<node_ptr> next_phi;
_n_feat = _phi.size();
std::array<int, 2> start_end = _mpi_comm->get_start_end_for_iterator(_phi.size() - _start_gen[_start_gen.size()-1], _start_gen[_start_gen.size()-1]);
// std::array<int, 2> start_end = _mpi_comm->get_start_end_for_iterator(_phi.size() - _start_gen[_start_gen.size()-1], _start_gen[_start_gen.size()-1]);
// next_phi.reserve(node_value_arrs::get_max_number_features(_allowed_ops, 1, start_end[1] - start_end[0]));
int feat_ind = _phi.size();
for(auto feat_1 = _phi.begin() + start_end[0]; feat_1 != _phi.begin() + start_end[1]; ++feat_1)
for(auto feat_1 = _phi.begin() + _start_gen.back() + _mpi_comm->rank(); feat_1 != _phi.end(); feat_1 += _mpi_comm->size())
{
for(auto& op : _un_operators)
{
......@@ -126,27 +126,150 @@ void FeatureSpace::generate_feature_space()
}
_mpi_comm->barrier();
_start_gen.push_back(_phi.size());
std::vector<std::vector<node_ptr>> next_phi_gathered;
mpi::all_gather(*_mpi_comm, next_phi, next_phi_gathered);
feat_ind = _phi.size();
for(auto& next_phi_vec : next_phi_gathered)
if(nn < _max_phi)
{
_phi.reserve(_phi.size() + next_phi_vec.size());
for(auto& feat : next_phi_vec)
std::vector<std::vector<node_ptr>> next_phi_gathered;
mpi::all_gather(*_mpi_comm, next_phi, next_phi_gathered);
feat_ind = _phi.size();
for(auto& next_phi_vec : next_phi_gathered)
{
feat->reindex(feat_ind);
_phi.push_back(feat);
++feat_ind;
_phi.reserve(_phi.size() + next_phi_vec.size());
for(auto& feat : next_phi_vec)
{
feat->reindex(feat_ind);
_phi.push_back(feat);
++feat_ind;
}
}
if(nn <= _n_rung_store)
{
// bool use_temp = (nn != _max_phi) || (_max_phi > _n_rung_store);
bool use_temp = true;
node_value_arrs::resize_values_arr(_n_rung_store, _phi.size(), use_temp);
for(int ff = _start_gen[0]; ff < _phi.size(); ++ff)
_phi[ff]->set_value();
}
}
if(nn <= _n_rung_store)
else
{
bool use_temp = (nn != _max_phi) || (_max_phi > _n_rung_store);
node_value_arrs::resize_values_arr(_n_rung_store, _phi.size(), use_temp);
for(int ff = _start_gen[0]; ff < _phi.size(); ++ff)
_phi[ff]->set_value();
std::cout << "HERE" << std::endl;
std::vector<size_t> next_phi_sizes;
mpi::all_gather(*_mpi_comm, next_phi.size(), next_phi_sizes);
size_t n_feat = std::accumulate(next_phi_sizes.begin(), next_phi_sizes.end(), _phi.size());
size_t n_feat_rank = n_feat / _mpi_comm->size();
size_t n_feat_below_rank = _mpi_comm->rank() * n_feat_rank;
if(_mpi_comm->rank() < n_feat % _mpi_comm->size())
{
++n_feat_rank;
n_feat_below_rank += _mpi_comm->rank();
}
else
{
n_feat_below_rank += n_feat % _mpi_comm->size();
}
if(n_feat_below_rank + n_feat_rank <= _phi.size())
{
_phi.erase(_phi.begin(), _phi.begin() + n_feat_below_rank);
_phi.erase(_phi.begin() + n_feat_rank, _phi.end());
}
else if(n_feat_below_rank <= _phi.size())
{
_phi.erase(_phi.begin(), _phi.begin() + n_feat_below_rank);
}
else
{
_phi = {};
}
while((_phi.size() < n_feat_rank) && (next_phi.size() > 0))
{
next_phi.back()->reindex(_phi.size() + n_feat_below_rank);
_phi.push_back(next_phi.back());
next_phi.pop_back();
}
// This can be calculated without an all_gather, using it to not introduce too many things at one time
std::vector<size_t> next_phi_needed;
std::vector<size_t> next_phi_excess;
mpi::all_gather(*_mpi_comm, next_phi.size(), next_phi_excess);
mpi::all_gather(*_mpi_comm, n_feat_rank - _phi.size(), next_phi_needed);
std::vector<size_t> send_sizes(next_phi_sizes.size(), 0);
std::vector<size_t> recv_sizes(next_phi_sizes.size(), 0);
// Is this rank sending or receiving?
std::cout << "HERE 2" << std::endl;
std::cout << std::accumulate(next_phi_excess.begin(), next_phi_excess.end(), 0);
std::cout << std::accumulate(next_phi_needed.begin(), next_phi_needed.end(), 0);
if(next_phi_excess[_mpi_comm->rank()] > 0)
{
size_t total_sent = std::accumulate(next_phi_excess.begin(), next_phi_excess.begin() + _mpi_comm->rank(), 0);
size_t prev_sent_recv = 0;
size_t send_size = 0;
int ind = 0;
while((prev_sent_recv < total_sent) && (ind < _mpi_comm->size()))
{
prev_sent_recv += next_phi_needed[ind];
++ind;
}
send_size = std::min(next_phi.size(), next_phi_needed[ind-1]);
send_sizes[ind-1] = send_size;
total_sent = send_size;
while((total_sent < next_phi.size()) && (ind < _mpi_comm->size()))
{
send_size = std::min(next_phi.size() - total_sent, next_phi_needed[ind]);
send_sizes[ind] = send_size;
total_sent += send_size;
}
total_sent = 0;
for(int pp = 0; pp < send_sizes.size(); ++pp)
{
if(send_sizes[pp] == 0)
continue;
std::vector<node_ptr> to_send(send_sizes[pp]);
std::copy_n(next_phi.begin() + total_sent, send_sizes[pp], to_send.begin());
_mpi_comm->send(pp, _mpi_comm->cantorTagGen(_mpi_comm->rank(), pp, 1, 0), to_send);
total_sent += send_sizes[pp];
}
}
else
{
size_t total_recv = std::accumulate(next_phi_excess.begin(), next_phi_excess.begin() + _mpi_comm->rank(), 0);
size_t prev_recv_sent = 0;
size_t recv_size = 0;
int ind = 0;
while((prev_recv_sent < total_recv) && (ind < _mpi_comm->size()))
{
prev_recv_sent += next_phi_excess[ind];
++ind;
}
recv_size = std::min(next_phi.size(), next_phi_excess[ind-1]);
recv_sizes[ind-1] = recv_size;
total_recv = recv_size;
while((_phi.size() < n_feat_rank) && (ind < _mpi_comm->size()))
{
recv_size = std::min(_phi.size() + total_recv, next_phi_excess[ind]);
recv_sizes[ind] = recv_size;
total_recv += recv_size;
}
total_recv = 0;
for(int pp = 0; pp < recv_sizes.size(); ++pp)
{
if(recv_sizes[pp] == 0)
continue;
std::vector<node_ptr> to_recv;
_mpi_comm->recv(pp, _mpi_comm->cantorTagGen(_mpi_comm->rank(), pp, 1, 0), to_recv);
for(auto& feat : to_recv)
{
feat->reindex(_phi.size() + n_feat_below_rank);
_phi.push_back(feat);
}
}
}
}
}
_n_feat = _phi.size();
......@@ -154,39 +277,31 @@ void FeatureSpace::generate_feature_space()
void FeatureSpace::project_r(double* prop)
{
std::array<int, 2> start_end = _mpi_comm->get_start_end_for_iterator(_phi.size(), 0);
std::vector<double> scores(start_end[1] - start_end[0], 0.0);
for(int ff = start_end[0]; ff < start_end[1]; ++ff)
scores[ff - start_end[0]] = -1.0 * std::abs(util_funcs::r(prop, _phi[ff]->value_ptr(), _n_samp));
std::vector<std::vector<double>> all_scores;
mpi::all_gather(*_mpi_comm, scores, all_scores);
int iter_start = 0;
for(auto& score_vec : all_scores)
{
for(int ii = 0; ii < score_vec.size(); ++ii)
{
_scores[ii + iter_start] = score_vec[ii];
}
iter_start += score_vec.size();
}
std::vector<double> scores(_phi.size(), 0.0);
for(int ff = 0; ff < _phi.size(); ++ff)
_scores[ff] = -1.0 * std::abs(util_funcs::r(prop, _phi[ff]->value_ptr(), _n_samp));
}
void FeatureSpace::sis(std::vector<double>& prop)
{
int cur_feat = _D.size() / prop.size();
int previous_size = _D.size();
_D.resize(_D.size() + _n_sis_select * prop.size());
_D.reserve(_D.size());
_phi_selected.reserve(_phi_selected.size());
_phi_selected.reserve(_phi_selected.size() + _n_sis_select);
project_r(prop.data());
std::vector<int> inds = util_funcs::argsort(_scores);
std::vector<double> D_temp(_n_sis_select * prop.size());
std::vector<double> scores_selected(_n_sis_select, 0.0);
std::vector<node_ptr> phi_slected(_n_sis_select, nullptr);
int ii = 0;
while((cur_feat != _D.size() / prop.size()) && (ii < _scores.size()))
int cur_feat_local = 0;
while((cur_feat_local != D_temp.size() / prop.size()) && (ii < _scores.size()))
{
bool is_valid = true;
for(int dd = 0; dd < cur_feat; ++dd)
......@@ -197,16 +312,74 @@ void FeatureSpace::sis(std::vector<double>& prop)
break;
}
}
for(int dd = 0; dd < cur_feat_local; ++dd)
{
if(1.0 - std::abs(util_funcs::r(&D_temp[dd*prop.size()], _phi[inds[ii]]->value_ptr(), prop.size())) < 1e-13)
{
is_valid = false;
break;
}
}
if(is_valid)
{
std::copy_n(_phi[inds[ii]]->value_ptr(), prop.size(), &D_temp[cur_feat_local * prop.size()]);
scores_selected[cur_feat_local] = _scores[inds[ii]];
phi_slected[cur_feat_local] = _phi[inds[ii]];
++cur_feat_local;
}
++ii;
}
phi_slected.resize(cur_feat_local);
scores_selected.resize(cur_feat_local);
D_temp.resize(cur_feat_local * prop.size());
std::vector<std::vector<double>> all_scores;
std::vector<std::vector<double>> all_D_temp;
std::vector<std::vector<node_ptr>> all_phi_selected;
mpi::all_gather(*_mpi_comm, scores_selected, all_scores);
mpi::all_gather(*_mpi_comm, D_temp, all_scores);
mpi::all_gather(*_mpi_comm, phi_slected, all_phi_selected);
int iter_start = 0;
scores_selected = {};
D_temp = {};
phi_slected = {};
for(int sv = 0; sv < _mpi_comm->size(); ++sv)
{
scores_selected.resize(all_scores[sv].size() + scores_selected.size());
D_temp.resize(all_D_temp[sv].size() + D_temp.size());
std::copy_n(all_scores[sv].begin(), all_scores[sv].size(), scores_selected.end() - all_scores[sv].size());
std::copy_n(all_D_temp[sv].begin(), all_D_temp[sv].size(), D_temp.end() - all_D_temp[sv].size());
std::copy_n(all_phi_selected[sv].begin(), all_phi_selected[sv].size(), phi_slected.end() - all_phi_selected[sv].size());
}
inds = util_funcs::argsort(scores_selected);
cur_feat_local = 0;
while((cur_feat != _D.size() / prop.size()) && (ii < scores_selected.size()))
{
bool is_valid = true;
for(int dd = 0; dd < cur_feat_local; ++dd)
{
if(1.0 - std::abs(util_funcs::r(&_D[previous_size + dd * prop.size()], &D_temp[inds[ii] * prop.size()], prop.size())) < 1e-13)
{
is_valid = false;
break;
}
}
if(is_valid)
{
std::copy_n(_phi[inds[ii]]->value_ptr(), prop.size(), &_D[cur_feat * prop.size()]);
std::copy_n(&D_temp[inds[ii] * prop.size()], prop.size(), &_D[cur_feat * prop.size()]);
_phi_selected.push_back(phi_slected[inds[ii]]);
_phi_selected.back()->set_value();
++cur_feat_local;
++cur_feat;
_phi_selected.push_back(_phi[inds[ii]]);
}
++ii;
}
if(ii >= _scores.size())
if(cur_feat != _D.size() / prop.size())
throw std::logic_error("SIS went through all features and did not select enough.");
}
......@@ -41,7 +41,6 @@ class FeatureSpace
std::vector<node_ptr> _phi_selected; //!< selected features
std::vector<node_ptr> _phi; //!< all features
std::vector<node_ptr> _phi_0; //!< initial feature space
public:
/**
* @brief Constructor for the feature space
......@@ -117,6 +116,15 @@ public:
* @param prop The property to calculate SIS from
*/
void sis(std::vector<double>& prop);
/**
* @brief Is a feature in this process' _phi?
*
* @param ind index
* @return True if feature is in this _phi
*/
inline bool feat_in_phi(int ind){return (ind >= _phi[0]->feat_ind()) && (ind <= _phi.back()->feat_ind());}
};
#endif
\ No newline at end of file
......@@ -15,6 +15,9 @@ namespace util_funcs
inline double mean(double* start, int size){return std::accumulate(start, start + size, 0.0) / size;};
inline double mean(std::vector<int>& vec){return static_cast<double>(std::accumulate(vec.begin(), vec.end(), 0)) / vec.size();};
inline double mean(int* start, int size){return static_cast<double>(std::accumulate(start, start + size, 0)) / size;};
inline double stand_dev(std::vector<double>& vec)
{
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment