Newer
Older

Benjamin Lindner2
committed
/*
* This file is part of the software sassena

Benjamin Lindner2
committed
*
* Authors:
* Benjamin Lindner, ben@benlabs.net
*
* Copyright 2008-2010 Benjamin Lindner

Benjamin Lindner2
committed
*
*/
// direct header
#include "scatter_devices/abstract_scatter_device.hpp"
// standard header
#include <complex>
#include <fstream>
// special library headers
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
#include <boost/lexical_cast.hpp>
// other headers
#include "math/coor3d.hpp"
#include "decomposition/assignment.hpp"

Benjamin Lindner2
committed
#include "decomposition/decompose.hpp"
#include <fftw3.h>
#include "control.hpp"
#include "log.hpp"
#include "sample.hpp"
#include "scatter_devices/data_stager.hpp"
using namespace std;
AbstractScatterDevice::AbstractScatterDevice(
boost::mpi::communicator allcomm,
boost::mpi::communicator partitioncomm,
Sample& sample,
size_t NAF,

Benjamin Lindner2
committed
boost::asio::ip::tcp::endpoint fileservice_endpoint,
boost::asio::ip::tcp::endpoint monitorservice_endpoint
) :
allcomm_(allcomm),
partitioncomm_(partitioncomm),
sample_(sample),

Benjamin Lindner2
committed
current_vector_(0),
assignment_(partitioncomm.size(),partitioncomm.rank(),NAF)

Benjamin Lindner2
committed
{
p_hdf5writer_ = boost::shared_ptr<HDF5WriterClient>(new HDF5WriterClient(fileservice_endpoint));
p_monitor_ = boost::shared_ptr<MonitorClient>(new MonitorClient(monitorservice_endpoint));
NN = partitioncomm_.size();
NF = sample_.coordinate_sets.size();
std::string target = Params::Inst()->scattering.target;

Benjamin Lindner2
committed
NA = sample_.atoms.selections[target]->size(); // Number of Atoms

Benjamin Lindner2
committed
sample_.coordinate_sets.set_selection(sample.atoms.selections[target]);
scatterfactors.set_sample(sample_);
scatterfactors.set_selection(sample_.atoms.selections[target]);
scatterfactors.set_background(true);
// defer memory allocation for scattering data
// atfinal_.resize(NF);
// but check limits nevertheless:
size_t scattering_databytesize = 3*NF*sizeof(fftw_complex); // peak consumption: 2*NF (padded signal) + 1*NF (integral)
if (Params::Inst()->limits.computation.memory.signal<scattering_databytesize) {
if (allcomm_.rank()==0) {
Err::Inst()->write("Insufficient Buffer size for scattering (limits.computation.memory.signal)");
Err::Inst()->write(string("Requested (bytes): ")+boost::lexical_cast<string>(scattering_databytesize));
}
throw;
}
Timer blank_timer;
timer_.insert(map<boost::thread::id,Timer>::value_type(boost::this_thread::get_id(),blank_timer));

Benjamin Lindner2
committed
}
AbstractScatterDevice::~AbstractScatterDevice() {
if (atfinal_!=NULL) {
fftw_free(atfinal_);
atfinal_=NULL;
}
}

Benjamin Lindner2
committed
void AbstractScatterDevice::run() {
Timer& timer = timer_[boost::this_thread::get_id()];

Benjamin Lindner2
committed
print_pre_stage_info();
allcomm_.barrier();
timer.start("sd:stage");

Benjamin Lindner2
committed
stage_data();
timer.stop("sd:stage");
allcomm_.barrier();

Benjamin Lindner2
committed
print_post_stage_info();
if (allcomm_.rank()==0) {
scatterfactors.update(CartesianCoor3D(0,0,0));
Info::Inst()->write("Target initialized. ");
Info::Inst()->write(string("Target produces a background scattering length density of ")+boost::lexical_cast<string>(scatterfactors.compute_background(CartesianCoor3D(0,0,0))));
}

Benjamin Lindner2
committed
// allocate memory for computation now.
atfinal_ = (fftw_complex*) fftw_malloc(NF*sizeof(fftw_complex));
memset(atfinal_,0,NF*sizeof(fftw_complex));

Benjamin Lindner2
committed
print_pre_runner_info();
allcomm_.barrier();
timer.start("sd:runner");

Benjamin Lindner2
committed
runner();
timer.stop("sd:runner");
allcomm_.barrier();

Benjamin Lindner2
committed
print_post_runner_info();
// notify the services to finalize
p_monitor_->update(allcomm_.rank(),1.0);
p_hdf5writer_->flush();
}
void AbstractScatterDevice::runner() {
Timer& timer = timer_[boost::this_thread::get_id()];

Benjamin Lindner2
committed

Benjamin Lindner2
committed
start_workers();

Benjamin Lindner2
committed
while(status()==0) {
timer.start("sd:compute");
compute();
timer.stop("sd:compute");
timer.start("sd:write");
write();
timer.stop("sd:write");

Benjamin Lindner2
committed
next();
}
stop_workers();

Benjamin Lindner2
committed
}
void AbstractScatterDevice::start_workers() {
size_t numthreads = Params::Inst()->limits.computation.threads;
// if (worker_threads>NM) {
// if (allcomm_.rank()==0) {
// Warn::Inst()->write("Number of threads limited by resolution for averaging");
// Warn::Inst()->write(string("Number of moments (resolution): ")+boost::lexical_cast<string>(NM));
// Warn::Inst()->write(string("Number of threads requested: ")+boost::lexical_cast<string>(worker_threads));
// Warn::Inst()->write(string("Setting limits.computation.threads.worker=")+boost::lexical_cast<string>(NM));
// }
// worker_threads = NM;
// }
if ( (partitioncomm_.size()!=1) && (partitioncomm_.size()<numthreads) ) {
if (allcomm_.rank()==0) {
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
Warn::Inst()->write("Partition smaller than number of threads.");
Warn::Inst()->write("Can not utilize more threads than partition size.");
Warn::Inst()->write(string("Setting limits.computation.threads.worker=")+boost::lexical_cast<string>(partitioncomm_.size()));
}
numthreads = partitioncomm_.size();
}
workerbarrier = new boost::barrier(numthreads+1);
Timer blank_timer;
for(size_t i = 0; i < numthreads; ++i)
{
boost::thread* p_t = new boost::thread(boost::bind(&AbstractScatterDevice::worker,this));
worker_threads.push(p_t);
timer_.insert(map<boost::thread::id,Timer>::value_type(p_t->get_id(),blank_timer));
}
workerbarrier->wait();
}
void AbstractScatterDevice::stop_workers() {
while (worker_threads.size()>0) {
boost::thread* p_thread = worker_threads.front();
worker_threads.pop();
p_thread->interrupt();
delete p_thread;
}
delete workerbarrier;
}

Benjamin Lindner2
committed
void AbstractScatterDevice::next() {
if (current_vector_>=vectors_.size()) return;

Benjamin Lindner2
committed
current_vector_++;
}
double AbstractScatterDevice::progress() {

Benjamin Lindner2
committed
double base = current_vector_*scale;
return base;
}
size_t AbstractScatterDevice::status() {
if (current_vector_==vectors_.size()) return 1; else return 0;

Benjamin Lindner2
committed
}
void AbstractScatterDevice::write() {
if (partitioncomm_.rank()==0) {
CartesianCoor3D vector = vectors_[current_vector_];
p_hdf5writer_->write(vector,atfinal_,NF);
}
std::map<boost::thread::id,Timer>& AbstractScatterDevice::getTimer() {

Benjamin Lindner2
committed
// collect timer information from allcomm_
return timer_;
}