Skip to content
Snippets Groups Projects
Commit 97af39ea authored by Benjamin Lindner2's avatar Benjamin Lindner2
Browse files

introduced monitor performance controls for large number of cores

parent 43069c23
No related branches found
No related tags found
No related merge requests found
......@@ -588,6 +588,26 @@ public:
LimitsServicesSignalTimesParameters times;
};
class LimitsServicesMonitorParameters {
private:
/////////////////// MPI related
// make this class serializable to
// allow sample to be transmitted via MPI
friend class boost::serialization::access;
template<class Archive> void serialize(Archive & ar, const unsigned int version)
{
ar & delay;
ar & sampling;
}
///////////////////
public:
size_t delay;
size_t sampling;
};
class LimitsServicesParameters {
private:
/////////////////// MPI related
......@@ -597,11 +617,13 @@ private:
template<class Archive> void serialize(Archive & ar, const unsigned int version)
{
ar & signal;
ar & monitor;
}
///////////////////
public:
LimitsServicesSignalParameters signal;
LimitsServicesMonitorParameters monitor;
};
class LimitsComputationMemoryParameters {
......
......@@ -71,6 +71,9 @@ class MonitorClient {
boost::asio::ip::tcp::endpoint m_endpoint;
std::queue<double> update_thresholds;
boost::posix_time::ptime lastupdate_;
size_t updatecounter_;
public:
MonitorClient(boost::asio::ip::tcp::endpoint server);
void reset_server();
......
......@@ -493,6 +493,9 @@ void Params::read_xml(std::string filename) {
limits.services.signal.memory.client = 10*1024*1024; // 10MB
limits.services.signal.times.serverflush = 600; // 600 seconds
limits.services.signal.times.clientflush = 600; // 600 seconds
limits.services.monitor.delay = 1; // 1 second
limits.services.monitor.sampling = 1; // seconds
limits.decomposition.utilization = 0.95; // 5% max loss
limits.decomposition.partitions.automatic = true; // pick number of independent partitions based on some heuristics
......@@ -550,6 +553,17 @@ void Params::read_xml(std::string filename) {
}
}
}
if (xmli.exists("//limits/services")) {
if (xmli.exists("//limits/services/monitor")) {
if (xmli.exists("//limits/services/monitor/delay")) {
limits.services.monitor.delay = xmli.get_value<size_t>("//limits/services/monitor/delay");
}
if (xmli.exists("//limits/services/monitor/sampling")) {
limits.services.monitor.sampling = xmli.get_value<size_t>("//limits/services/monitor/sampling");
}
}
}
if (xmli.exists("//limits/decomposition")) {
if (xmli.exists("//limits/decomposition/utilization")) {
limits.decomposition.utilization = xmli.get_value<double>("//limits/decomposition/utilization");
......
......@@ -128,7 +128,7 @@ void AbstractScatterDevice::runner() {
Timer& timer = timer_[boost::this_thread::get_id()];
start_workers();
p_monitor_->reset_server();
if (allcomm_.rank()==0) p_monitor_->reset_server();
while(status()==0) {
timer.start("sd:compute");
......
......@@ -14,6 +14,7 @@
#include <boost/lexical_cast.hpp>
#include <boost/filesystem.hpp>
#include <boost/random/uniform_int.hpp>
#include <log.hpp>
#include <control.hpp>
#include <report/timer.hpp>
......@@ -195,6 +196,9 @@ MonitorClient::MonitorClient(boost::asio::ip::tcp::endpoint server) : m_endpoint
p+=0.01;
}
update_thresholds.push(1.0);
lastupdate_ = boost::posix_time::second_clock::universal_time();
updatecounter_ = 0;
}
void MonitorClient::update(size_t rank,double progress) {
......@@ -209,17 +213,38 @@ void MonitorClient::update(size_t rank,double progress) {
update_thresholds.pop();
if (update_thresholds.size()<1) break;
}
updatecounter_++;
if (update_thresholds.size()!=0) {
// first test sampling criteria
if ((updatecounter_%Params::Inst()->limits.services.monitor.sampling)!=0) return;
// then test for minimum time delay
if ((boost::posix_time::second_clock::universal_time()-lastupdate_) <=
(boost::posix_time::seconds(Params::Inst()->limits.services.monitor.delay)) )
{
return;
}
}
if (update_thresholds.size()!=oldsize) {
// setup monitoring service
boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket( io_service );
socket.connect(m_endpoint);
MonitorTag tag = MONITOR_UPDATE;
socket.write_some(boost::asio::buffer(&tag,sizeof(MonitorTag)));
socket.write_some(boost::asio::buffer(&rank,sizeof(size_t)));
socket.write_some(boost::asio::buffer(&progress,sizeof(double)));
socket.close();
lastupdate_ = boost::posix_time::second_clock::universal_time();
try {
socket.connect(m_endpoint);
MonitorTag tag = MONITOR_UPDATE;
socket.write_some(boost::asio::buffer(&tag,sizeof(MonitorTag)));
socket.write_some(boost::asio::buffer(&rank,sizeof(size_t)));
socket.write_some(boost::asio::buffer(&progress,sizeof(double)));
socket.close();
} catch(...) {
Warn::Inst()->write("Unable to send update to monitor server");
Warn::Inst()->write("Increase debug.monitor.update.delay and/or debug.monitor.update.sampling");
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment