Skip to content
Snippets Groups Projects
Commit aa72945a authored by Benjamin Lindner2's avatar Benjamin Lindner2
Browse files

introduced monitor performance controls for large number of cores

parent 94332e2d
No related branches found
No related tags found
No related merge requests found
......@@ -32,7 +32,7 @@
// other headers
#include "math/coor3d.hpp"
enum MonitorTag {MONITOR_HANGUP,MONITOR_UPDATE,MONITOR_RESET};
enum MonitorTag {MONITOR_HANGUP,MONITOR_UPDATE,MONITOR_RESET,MONITOR_SAMPLINGFACTOR};
class MonitorService {
boost::asio::io_service& m_io_service;
......@@ -55,7 +55,9 @@ class MonitorService {
void print();
void reset_timer();
void reset_state();
size_t samplingfactor_;
public:
MonitorService(boost::asio::io_service& io_service,double from,double to);
......@@ -73,11 +75,14 @@ class MonitorClient {
std::queue<double> update_thresholds;
boost::posix_time::ptime lastupdate_;
size_t updatecounter_;
size_t samplingfactor_;
public:
MonitorClient(boost::asio::ip::tcp::endpoint server);
void reset_server();
void set_samplingfactor(size_t samplingfactor);
void set_samplingfactor_server(size_t samplingfactor);
void update(size_t rank,double progress);
};
......
......@@ -128,7 +128,17 @@ void AbstractScatterDevice::runner() {
Timer& timer = timer_[boost::this_thread::get_id()];
start_workers();
if (allcomm_.rank()==0) p_monitor_->reset_server();
size_t samplingfactor = Params::Inst()->limits.services.monitor.sampling;
if (samplingfactor==0) {
samplingfactor = allcomm_.size()/100;
if (allcomm_.size()<100) samplingfactor=1;
}
p_monitor_->set_samplingfactor(samplingfactor);
if (allcomm_.rank()==0) {
p_monitor_->set_samplingfactor_server(samplingfactor);
p_monitor_->reset_server();
}
while(status()==0) {
timer.start("sd:compute");
......
......@@ -30,7 +30,7 @@ MonitorService::MonitorService(boost::asio::io_service& io_service,double from,d
m_listener = NULL;
m_listener_status = false;
m_timerlabel = "progress";
reset_state();
}
......@@ -65,6 +65,10 @@ void MonitorService::listener() {
reset();
}
if (tag==MONITOR_SAMPLINGFACTOR) {
socket.read_some(boost::asio::buffer(&samplingfactor_,sizeof(size_t)));
}
if (tag==MONITOR_UPDATE) {
socket.read_some(boost::asio::buffer(&rank,sizeof(size_t)));
socket.read_some(boost::asio::buffer(&progress,sizeof(double)));
......@@ -75,7 +79,7 @@ void MonitorService::listener() {
for(std::map<size_t,double>::iterator pi = progresses.begin();pi!=progresses.end();pi++) {
sum+=pi->second;
}
m_current = sum;
m_current = sum*samplingfactor_;
update();
}
......@@ -197,14 +201,18 @@ MonitorClient::MonitorClient(boost::asio::ip::tcp::endpoint server) : m_endpoint
}
update_thresholds.push(1.0);
lastupdate_ = boost::posix_time::second_clock::universal_time();
updatecounter_ = 0;
lastupdate_ = boost::posix_time::second_clock::universal_time();
samplingfactor_ = 1;
}
void MonitorClient::update(size_t rank,double progress) {
if (!Params::Inst()->debug.monitor.update) return;
// first test sampling criteria
if ((rank%samplingfactor_)!=0) return;
size_t oldsize = update_thresholds.size();
if (oldsize<1) return;
......@@ -214,12 +222,7 @@ void MonitorClient::update(size_t rank,double progress) {
if (update_thresholds.size()<1) break;
}
updatecounter_++;
if (update_thresholds.size()!=0) {
// first test sampling criteria
if ((updatecounter_%Params::Inst()->limits.services.monitor.sampling)!=0) return;
if (update_thresholds.size()!=0) {
// then test for minimum time delay
if ((boost::posix_time::second_clock::universal_time()-lastupdate_) <=
(boost::posix_time::seconds(Params::Inst()->limits.services.monitor.delay)) )
......@@ -243,7 +246,8 @@ void MonitorClient::update(size_t rank,double progress) {
socket.close();
} catch(...) {
Warn::Inst()->write("Unable to send update to monitor server");
Warn::Inst()->write("Increase debug.monitor.update.delay and/or debug.monitor.update.sampling");
Warn::Inst()->write("Increase limits.services.monitor.delay and/or limits.services.monitor.sampling");
Warn::Inst()->write(string("Current sampling factor: ")+boost::lexical_cast<string>(samplingfactor_));
}
}
}
......@@ -258,4 +262,21 @@ void MonitorClient::reset_server() {
socket.close();
}
void MonitorClient::set_samplingfactor(size_t samplingfactor) {
samplingfactor_=samplingfactor;
}
void MonitorClient::set_samplingfactor_server(size_t samplingfactor) {
boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket( io_service );
socket.connect(m_endpoint);
MonitorTag tag = MONITOR_SAMPLINGFACTOR;
socket.write_some(boost::asio::buffer(&tag,sizeof(MonitorTag)));
socket.write_some(boost::asio::buffer(&samplingfactor,sizeof(size_t)));
socket.close();
}
// end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment