PoC prometheus exporter implementation

This commit is contained in:
Lev Dragunov 2019-08-01 17:20:26 +03:00 committed by Lev Dragunov
parent 9f80f6d64d
commit 8845b7133c
20 changed files with 363 additions and 18 deletions

View File

@ -9,6 +9,7 @@
#include "engine/api/trip_parameters.hpp"
#include "engine/datafacade_provider.hpp"
#include "engine/engine_config.hpp"
#include "engine/engine_info.hpp"
#include "engine/plugins/match.hpp"
#include "engine/plugins/nearest.hpp"
#include "engine/plugins/table.hpp"
@ -39,6 +40,8 @@ class EngineInterface
virtual Status Trip(const api::TripParameters &parameters, api::ResultT &result) const = 0;
virtual Status Match(const api::MatchParameters &parameters, api::ResultT &result) const = 0;
virtual Status Tile(const api::TileParameters &parameters, api::ResultT &result) const = 0;
virtual const EngineInfo &GetInfo() const = 0;
};
template <typename Algorithm> class Engine final : public EngineInterface
@ -76,6 +79,9 @@ template <typename Algorithm> class Engine final : public EngineInterface
<< routing_algorithms::name<Algorithm>();
facade_provider = std::make_unique<ImmutableProvider<Algorithm>>(config.storage_config);
}
info["algorithm"] = routing_algorithms::name<Algorithm>();
info["code_version"] = OSRM_VERSION;
info["data_version"] = GetAlgorithms(api::RouteParameters()).GetFacade().GetTimestamp();
}
Engine(Engine &&) noexcept = delete;
@ -115,11 +121,14 @@ template <typename Algorithm> class Engine final : public EngineInterface
return tile_plugin.HandleRequest(GetAlgorithms(params), params, result);
}
const EngineInfo &GetInfo() const override final { return info; };
private:
template <typename ParametersT> auto GetAlgorithms(const ParametersT &params) const
{
return RoutingAlgorithms<Algorithm>{heaps, facade_provider->Get(params)};
}
EngineInfo info;
std::unique_ptr<DataFacadeProvider<Algorithm>> facade_provider;
mutable SearchEngineData<Algorithm> heaps;
@ -130,7 +139,7 @@ template <typename Algorithm> class Engine final : public EngineInterface
const plugins::MatchPlugin match_plugin;
const plugins::TilePlugin tile_plugin;
};
}
}
} // namespace engine
} // namespace osrm
#endif // OSRM_IMPL_HPP

View File

@ -0,0 +1,48 @@
/*
Copyright (c) 2019, Project OSRM contributors
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list
of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef ENGINE_INFO_HPP
#define ENGINE_INFO_HPP
#include <unordered_map>
namespace osrm
{
namespace engine
{
/**
* EngineInfo to return information about engine state into the monitoring
* \see OSRM
*/
using EngineInfo = std::unordered_map<std::string, std::string>;
} // namespace engine
} // namespace osrm
#endif

View File

@ -0,0 +1,103 @@
#ifndef MONITORING_HPP
#define MONITORING_HPP
#include "monitoring/monitoring_request_handler.hpp"
#include "server/connection.hpp"
#include "util/log.hpp"
#include <boost/asio.hpp>
#include <boost/bind.hpp>
namespace osrm
{
namespace server
{
class Monitoring
{
public:
static std::shared_ptr<Monitoring>
CreateMonitoring(std::string &ip_address, const int ip_port, const unsigned working_threads)
{
return std::make_shared<Monitoring>(ip_address, ip_port, working_threads);
}
explicit Monitoring(const std::string &address, const int port, const unsigned working_threads)
: acceptor(io_service), request_handler(working_threads),
new_connection(std::make_shared<Connection>(io_service, request_handler))
{
if (port == 0)
{
working = false;
return;
}
working = true;
const auto port_string = std::to_string(port);
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query query(address, port_string);
boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
acceptor.open(endpoint.protocol());
#ifdef SO_REUSEPORT
const int option = 1;
setsockopt(acceptor.native_handle(), SOL_SOCKET, SO_REUSEPORT, &option, sizeof(option));
#endif
acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen();
util::Log() << "Monitoring endpoint listening on: " << acceptor.local_endpoint();
acceptor.async_accept(
new_connection->socket(),
boost::bind(&Monitoring::HandleAccept, this, boost::asio::placeholders::error));
}
void Run()
{
if (working)
{
io_service.run();
}
}
void Stop()
{
if (working)
{
io_service.stop();
}
}
void RegisterServiceHandler(std::shared_ptr<ServiceHandlerInterface> service_handler_)
{
request_handler.RegisterServiceHandler(service_handler_);
}
private:
void HandleAccept(const boost::system::error_code &e)
{
if (!e)
{
new_connection->start();
new_connection = std::make_shared<Connection>(io_service, request_handler);
acceptor.async_accept(
new_connection->socket(),
boost::bind(&Monitoring::HandleAccept, this, boost::asio::placeholders::error));
}
}
boost::asio::io_service io_service;
boost::asio::ip::tcp::acceptor acceptor;
MonitoringRequestHandler request_handler;
std::shared_ptr<Connection> new_connection;
std::shared_ptr<std::thread> working_thread;
bool working;
};
} // namespace server
} // namespace osrm
#endif // MONITORING_HPP

View File

@ -0,0 +1,54 @@
#ifndef MONITORING_REQUEST_HANDLER_HPP
#define MONITORING_REQUEST_HANDLER_HPP
#include "server/request_handler.hpp"
#include "util/log.hpp"
#include <sstream>
#include <string>
namespace osrm
{
namespace server
{
class MonitoringRequestHandler : public RequestHandler
{
public:
MonitoringRequestHandler(const unsigned _working_threads) : working_threads(_working_threads) {}
void HandleRequest(const http::request &, http::reply &current_reply)
{
std::stringstream out_stream;
out_stream << "osrm_routed_instance_info{";
auto engine_info = service_handler->GetEngineInfo();
for (auto record : engine_info)
{
out_stream << record.first << "=\"" << record.second << '"';
out_stream << ',';
}
out_stream << "working_threads=\"" << working_threads << '"';
out_stream << "} 1\n";
auto counters = service_handler->GetUsage();
for (auto counter : counters)
{
out_stream << "http_requests_count{plugin=\"" << counter.first << "\"} "
<< counter.second << "\n";
}
out_stream << "workers_busy " << service_handler->GetLoad() << "\n";
auto result = out_stream.str();
current_reply.content.resize(result.size());
std::copy(result.cbegin(), result.cend(), current_reply.content.begin());
}
private:
unsigned working_threads;
};
} // namespace server
} // namespace osrm
#endif // MONITORING_REQUEST_HANDLER_HPP

View File

@ -0,0 +1,38 @@
/*
Copyright (c) 2017, Project OSRM contributors
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
Redistributions of source code must retain the above copyright notice, this list
of conditions and the following disclaimer.
Redistributions in binary form must reproduce the above copyright notice, this
list of conditions and the following disclaimer in the documentation and/or
other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef OSRM_ENGINE_INFO_HPP
#define OSRM_ENGINE_INFO_HPP
#include "engine/engine_info.hpp"
namespace osrm
{
using engine::EngineInfo;
}
#endif

View File

@ -31,6 +31,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "engine/api/base_result.hpp"
#include "osrm/osrm_fwd.hpp"
#include "osrm/status.hpp"
#include "osrm/engine_info.hpp"
#include <memory>
#include <string>
@ -131,6 +132,11 @@ class OSRM final
*/
Status Tile(const TileParameters &parameters, osrm::engine::api::ResultT &result) const;
/**
* Tile: engine state information for monitoring purposes
*/
const EngineInfo & Info() const;
private:
std::unique_ptr<engine::EngineInterface> engine_;
};

View File

@ -24,12 +24,13 @@ class RequestHandler
RequestHandler(const RequestHandler &) = delete;
RequestHandler &operator=(const RequestHandler &) = delete;
void RegisterServiceHandler(std::unique_ptr<ServiceHandlerInterface> service_handler);
void RegisterServiceHandler(std::shared_ptr<ServiceHandlerInterface> service_handler);
void HandleRequest(const http::request &current_request, http::reply &current_reply);
//TODO make interface and two siblings
virtual void HandleRequest(const http::request &current_request, http::reply &current_reply);
private:
std::unique_ptr<ServiceHandlerInterface> service_handler;
protected:
std::shared_ptr<ServiceHandlerInterface> service_handler;
};
}
}

View File

@ -85,11 +85,13 @@ class Server
void Stop() { io_service.stop(); }
void RegisterServiceHandler(std::unique_ptr<ServiceHandlerInterface> service_handler_)
void RegisterServiceHandler(std::shared_ptr<ServiceHandlerInterface> service_handler_)
{
request_handler.RegisterServiceHandler(std::move(service_handler_));
request_handler.RegisterServiceHandler(service_handler_);
}
unsigned GetWorkerThreadsCount() { return thread_pool_size; }
private:
void HandleAccept(const boost::system::error_code &e)
{

View File

@ -7,6 +7,7 @@
#include <mapbox/variant.hpp>
#include <atomic>
#include <string>
#include <vector>
@ -20,16 +21,18 @@ namespace service
class BaseService
{
public:
BaseService(OSRM &routing_machine) : routing_machine(routing_machine) {}
BaseService(OSRM &routing_machine) : routing_machine(routing_machine), usage(0) {}
virtual ~BaseService() = default;
virtual engine::Status
RunQuery(std::size_t prefix_length, std::string &query, osrm::engine::api::ResultT &result) = 0;
virtual unsigned GetVersion() = 0;
uint32_t GetUsage() {return usage;}
protected:
OSRM &routing_machine;
std::atomic_uint usage;
};
}
}

View File

@ -5,7 +5,9 @@
#include "engine/api/base_api.hpp"
#include "osrm/osrm.hpp"
#include "engine/engine_info.hpp"
#include <atomic>
#include <unordered_map>
namespace osrm
@ -24,12 +26,17 @@ namespace api
struct ParsedURL;
}
using HandlersCounter = std::unordered_map<std::string, std::uint32_t>;
class ServiceHandlerInterface
{
public:
virtual ~ServiceHandlerInterface() {}
virtual engine::Status RunQuery(api::ParsedURL parsed_url,
osrm::engine::api::ResultT &result) = 0;
virtual const engine::EngineInfo & GetEngineInfo() const = 0;
virtual const HandlersCounter GetUsage() const = 0;
virtual std::uint32_t GetLoad() const = 0;
};
class ServiceHandler final : public ServiceHandlerInterface
@ -40,9 +47,15 @@ class ServiceHandler final : public ServiceHandlerInterface
virtual engine::Status RunQuery(api::ParsedURL parsed_url, ResultT &result) override;
virtual const engine::EngineInfo & GetEngineInfo() const override;
virtual const HandlersCounter GetUsage() const override;
virtual std::uint32_t GetLoad() const override;
private:
std::unordered_map<std::string, std::unique_ptr<service::BaseService>> service_map;
OSRM routing_machine;
std::atomic_uint errors;
std::atomic_uint load;
};
}
}

View File

@ -92,4 +92,9 @@ engine::Status OSRM::Tile(const engine::api::TileParameters &params,
return engine_->Tile(params, result);
}
const engine::EngineInfo & OSRM::Info() const
{
return engine_->GetInfo();
}
} // ns osrm

View File

@ -33,9 +33,9 @@ namespace server
{
void RequestHandler::RegisterServiceHandler(
std::unique_ptr<ServiceHandlerInterface> service_handler_)
std::shared_ptr<ServiceHandlerInterface> service_handler_)
{
service_handler = std::move(service_handler_);
service_handler = service_handler_;
}
void RequestHandler::HandleRequest(const http::request &current_request, http::reply &current_reply)

View File

@ -45,6 +45,7 @@ engine::Status MatchService::RunQuery(std::size_t prefix_length,
std::string &query,
osrm::engine::api::ResultT &result)
{
usage++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();

View File

@ -39,6 +39,7 @@ engine::Status NearestService::RunQuery(std::size_t prefix_length,
std::string &query,
osrm::engine::api::ResultT &result)
{
usage++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();

View File

@ -43,6 +43,7 @@ engine::Status RouteService::RunQuery(std::size_t prefix_length,
std::string &query,
osrm::engine::api::ResultT &result)
{
usage++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();

View File

@ -74,6 +74,7 @@ engine::Status TableService::RunQuery(std::size_t prefix_length,
std::string &query,
osrm::engine::api::ResultT &result)
{
usage++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();

View File

@ -19,6 +19,7 @@ engine::Status TileService::RunQuery(std::size_t prefix_length,
std::string &query,
osrm::engine::api::ResultT &result)
{
usage++;
auto query_iterator = query.begin();
auto parameters =
api::parseParameters<engine::api::TileParameters>(query_iterator, query.end());

View File

@ -45,6 +45,7 @@ engine::Status TripService::RunQuery(std::size_t prefix_length,
std::string &query,
osrm::engine::api::ResultT &result)
{
usage++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();

View File

@ -12,11 +12,14 @@
#include <memory>
#define BAD_SERVICE_NAME "invalid"
namespace osrm
{
namespace server
{
ServiceHandler::ServiceHandler(osrm::EngineConfig &config) : routing_machine(config)
ServiceHandler::ServiceHandler(osrm::EngineConfig &config)
: routing_machine(config), errors(0), load(0)
{
service_map["route"] = std::make_unique<service::RouteService>(routing_machine);
service_map["table"] = std::make_unique<service::TableService>(routing_machine);
@ -29,27 +32,49 @@ ServiceHandler::ServiceHandler(osrm::EngineConfig &config) : routing_machine(con
engine::Status ServiceHandler::RunQuery(api::ParsedURL parsed_url,
osrm::engine::api::ResultT &result)
{
load++;
const auto &service_iter = service_map.find(parsed_url.service);
if (service_iter == service_map.end())
{
errors++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();
json_result.values["code"] = "InvalidService";
json_result.values["message"] = "Service " + parsed_url.service + " not found!";
load--;
return engine::Status::Error;
}
auto &service = service_iter->second;
if (service->GetVersion() != parsed_url.version)
{
errors++;
result = util::json::Object();
auto &json_result = result.get<util::json::Object>();
json_result.values["code"] = "InvalidVersion";
json_result.values["message"] = "Service " + parsed_url.service + " not found!";
load--;
return engine::Status::Error;
}
return service->RunQuery(parsed_url.prefix_length, parsed_url.query, result);
}
const auto status = service->RunQuery(parsed_url.prefix_length, parsed_url.query, result);
load--;
return status;
}
const engine::EngineInfo &ServiceHandler::GetEngineInfo() const { return routing_machine.Info(); }
std::uint32_t ServiceHandler::GetLoad() const { return load; }
const HandlersCounter ServiceHandler::GetUsage() const
{
HandlersCounter handlers_counter;
for (const auto &service : service_map)
{
handlers_counter[service.first] = service.second->GetUsage();
}
handlers_counter[BAD_SERVICE_NAME] = errors;
return handlers_counter;
}
} // namespace server
} // namespace osrm

View File

@ -1,3 +1,4 @@
#include "monitoring/monitoring.hpp"
#include "server/server.hpp"
#include "util/exception_utils.hpp"
#include "util/log.hpp"
@ -29,6 +30,7 @@
#ifdef _WIN32
boost::function0<void> console_ctrl_function;
boost::function0<void> console_ctrl_function_monitoring;
BOOL WINAPI console_ctrl_handler(DWORD ctrl_type)
{
@ -39,6 +41,7 @@ BOOL WINAPI console_ctrl_handler(DWORD ctrl_type)
case CTRL_CLOSE_EVENT:
case CTRL_SHUTDOWN_EVENT:
console_ctrl_function();
console_ctrl_function_monitoring();
return TRUE;
default:
return FALSE;
@ -79,6 +82,7 @@ inline unsigned generateServerProgramOptions(const int argc,
boost::filesystem::path &base_path,
std::string &ip_address,
int &ip_port,
int &ip_port_metrics,
bool &trial,
EngineConfig &config,
int &requested_thread_num)
@ -111,6 +115,11 @@ inline unsigned generateServerProgramOptions(const int argc,
("port,p",
value<int>(&ip_port)->default_value(5000),
"TCP/IP port") //
(
"metrics_port,P",
value<int>(&ip_port_metrics)->default_value(0),
"TCP/IP port for prometheus metrics exporter. Leave 0 if do not start server for "
"metrics") //
("threads,t",
value<int>(&requested_thread_num)->default_value(hardware_threads),
"Number of threads to use") //
@ -227,13 +236,21 @@ int main(int argc, const char *argv[]) try
bool trial_run = false;
std::string ip_address;
int ip_port;
int ip_port_metrics;
EngineConfig config;
boost::filesystem::path base_path;
int requested_thread_num = 1;
const unsigned init_result = generateServerProgramOptions(
argc, argv, base_path, ip_address, ip_port, trial_run, config, requested_thread_num);
const unsigned init_result = generateServerProgramOptions(argc,
argv,
base_path,
ip_address,
ip_port,
ip_port_metrics,
trial_run,
config,
requested_thread_num);
if (init_result == INIT_OK_DO_NOT_START_ENGINE)
{
return EXIT_SUCCESS;
@ -284,10 +301,13 @@ int main(int argc, const char *argv[]) try
pthread_sigmask(SIG_BLOCK, &wait_mask, nullptr); // only block necessary signals
#endif
auto service_handler = std::make_unique<server::ServiceHandler>(config);
auto service_handler = std::make_shared<server::ServiceHandler>(config);
auto routing_server = server::Server::CreateServer(ip_address, ip_port, requested_thread_num);
auto monitoring_server = server::Monitoring::CreateMonitoring(
ip_address, ip_port_metrics, routing_server->GetWorkerThreadsCount());
routing_server->RegisterServiceHandler(std::move(service_handler));
routing_server->RegisterServiceHandler(service_handler);
monitoring_server->RegisterServiceHandler(service_handler);
if (trial_run)
{
@ -299,8 +319,13 @@ int main(int argc, const char *argv[]) try
routing_server->Run();
return 0;
});
std::packaged_task<int()> monitoring_task([&] {
monitoring_server->Run();
return 0;
});
auto future = server_task.get_future();
std::thread server_thread(std::move(server_task));
std::thread monitoring_thread(std::move(monitoring_task));
#ifndef _WIN32
util::Log() << "running and waiting for requests";
@ -313,12 +338,17 @@ int main(int argc, const char *argv[]) try
#else
// Set console control handler to allow server to be stopped.
console_ctrl_function = std::bind(&server::Server::Stop, routing_server);
console_ctrl_function_monitoring =
std::bind(&monitoring::Monitoring::Stop, monitoring_server);
SetConsoleCtrlHandler(console_ctrl_handler, TRUE);
SetConsoleCtrlHandler(console_ctrl_handler_monitoring, TRUE);
util::Log() << "running and waiting for requests";
monitoring_server->Run();
routing_server->Run();
#endif
util::Log() << "initiating shutdown";
routing_server->Stop();
monitoring_server->Stop();
util::Log() << "stopping threads";
auto status = future.wait_for(std::chrono::seconds(2));
@ -326,6 +356,7 @@ int main(int argc, const char *argv[]) try
if (status == std::future_status::ready)
{
server_thread.join();
monitoring_thread.join();
}
else
{
@ -336,6 +367,7 @@ int main(int argc, const char *argv[]) try
util::Log() << "freeing objects";
routing_server.reset();
monitoring_server.reset();
util::Log() << "shutdown completed";
}
catch (const osrm::RuntimeError &e)