Enable cross-service propagation of otel trace metadata

This commit is contained in:
Grant Limberg 2025-10-30 13:47:50 +01:00
parent 30c4484731
commit 5c27068b80
No known key found for this signature in database
GPG key ID: 8F2F97D3BE8D7735
6 changed files with 684 additions and 498 deletions

View file

@ -23,12 +23,14 @@
#include "ControllerConfig.hpp" #include "ControllerConfig.hpp"
#include "CtlUtil.hpp" #include "CtlUtil.hpp"
#include "EmbeddedNetworkController.hpp" #include "EmbeddedNetworkController.hpp"
#include "OtelCarrier.hpp"
#include "PostgresStatusWriter.hpp" #include "PostgresStatusWriter.hpp"
#include "PubSubListener.hpp" #include "PubSubListener.hpp"
#include "PubSubWriter.hpp" #include "PubSubWriter.hpp"
#include "Redis.hpp" #include "Redis.hpp"
#include "RedisListener.hpp" #include "RedisListener.hpp"
#include "RedisStatusWriter.hpp" #include "RedisStatusWriter.hpp"
#include "opentelemetry/context/propagation/global_propagator.h"
#include "opentelemetry/trace/provider.h" #include "opentelemetry/trace/provider.h"
#include <chrono> #include <chrono>
@ -314,6 +316,9 @@ bool CentralDB::save(nlohmann::json& record, bool notifyListeners)
} }
const std::string objtype = record["objtype"]; const std::string objtype = record["objtype"];
if (objtype == "network") { if (objtype == "network") {
auto span = tracer->StartSpan("CentralDB::save::network");
auto scope = tracer->WithActiveSpan(span);
fprintf(stderr, "CentralDB network save %s\n", record["id"].get<std::string>().c_str()); fprintf(stderr, "CentralDB network save %s\n", record["id"].get<std::string>().c_str());
const uint64_t nwid = OSUtils::jsonIntHex(record["id"], 0ULL); const uint64_t nwid = OSUtils::jsonIntHex(record["id"], 0ULL);
if (nwid) { if (nwid) {
@ -322,12 +327,23 @@ bool CentralDB::save(nlohmann::json& record, bool notifyListeners)
if ((! old.is_object()) || (! _compareRecords(old, record))) { if ((! old.is_object()) || (! _compareRecords(old, record))) {
fprintf(stderr, "posting network change to commit queue\n"); fprintf(stderr, "posting network change to commit queue\n");
record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL; record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL;
_commitQueue.post(std::pair<nlohmann::json, bool>(record, notifyListeners)); _queueItem qi;
qi.jsonData = record;
qi.notifyListeners = notifyListeners;
OtelCarrier<std::map<std::string, std::string> > carrier(qi.traceContext);
auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
auto propagator =
opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
propagator->Inject(carrier, current_ctx);
_commitQueue.post(qi);
modified = true; modified = true;
} }
} }
} }
else if (objtype == "member") { else if (objtype == "member") {
auto span = tracer->StartSpan("CentralDB::save::member");
auto scope = tracer->WithActiveSpan(span);
std::string networkId = record["nwid"]; std::string networkId = record["nwid"];
std::string memberId = record["id"]; std::string memberId = record["id"];
const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"], 0ULL); const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"], 0ULL);
@ -339,7 +355,17 @@ bool CentralDB::save(nlohmann::json& record, bool notifyListeners)
if ((! old.is_object()) || (! _compareRecords(old, record))) { if ((! old.is_object()) || (! _compareRecords(old, record))) {
fprintf(stderr, "posting member change to commit queue\n"); fprintf(stderr, "posting member change to commit queue\n");
record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL; record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL;
_commitQueue.post(std::pair<nlohmann::json, bool>(record, notifyListeners));
_queueItem qi;
qi.jsonData = record;
qi.notifyListeners = notifyListeners;
OtelCarrier<std::map<std::string, std::string> > carrier(qi.traceContext);
auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
auto propagator =
opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
propagator->Inject(carrier, current_ctx);
_commitQueue.post(qi);
modified = true; modified = true;
} }
else { else {
@ -373,13 +399,19 @@ void CentralDB::eraseNetwork(const uint64_t networkId)
char tmp2[24]; char tmp2[24];
waitForReady(); waitForReady();
Utils::hex(networkId, tmp2); Utils::hex(networkId, tmp2);
std::pair<nlohmann::json, bool> tmp;
tmp.first["id"] = tmp2; _queueItem qi;
tmp.first["objtype"] = "_delete_network"; qi.jsonData["id"] = tmp2;
tmp.second = true; qi.jsonData["objtype"] = "_delete_network";
_commitQueue.post(tmp); qi.notifyListeners = true;
OtelCarrier<std::map<std::string, std::string> > carrier(qi.traceContext);
auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
propagator->Inject(carrier, current_ctx);
_commitQueue.post(qi);
nlohmann::json nullJson; nlohmann::json nullJson;
_networkChanged(tmp.first, nullJson, true); _networkChanged(qi.jsonData, nullJson, true);
} }
void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId) void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId)
@ -396,16 +428,20 @@ void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId)
fprintf(stderr, "CentralDB::eraseMember\n"); fprintf(stderr, "CentralDB::eraseMember\n");
char tmp2[24]; char tmp2[24];
waitForReady(); waitForReady();
std::pair<nlohmann::json, bool> tmp, nw;
Utils::hex(networkId, tmp2); _queueItem qi;
tmp.first["nwid"] = tmp2; qi.jsonData["nwid"] = tmp2;
Utils::hex(memberId, tmp2); qi.jsonData["id"] = tmp2;
tmp.first["id"] = tmp2; qi.jsonData["objtype"] = "_delete_member";
tmp.first["objtype"] = "_delete_member"; qi.notifyListeners = true;
tmp.second = true; OtelCarrier<std::map<std::string, std::string> > carrier(qi.traceContext);
_commitQueue.post(tmp); auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
propagator->Inject(carrier, current_ctx);
_commitQueue.post(qi);
nlohmann::json nullJson; nlohmann::json nullJson;
_memberChanged(tmp.first, nullJson, true); _memberChanged(qi.jsonData, nullJson, true);
} }
void CentralDB::nodeIsOnline( void CentralDB::nodeIsOnline(
@ -1108,15 +1144,29 @@ void CentralDB::heartbeat()
void CentralDB::commitThread() void CentralDB::commitThread()
{ {
fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str()); fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str());
std::pair<nlohmann::json, bool> qitem; _queueItem qitem;
while (_commitQueue.get(qitem) & (_run == 1)) { while (_commitQueue.get(qitem) & (_run == 1)) {
auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("CentralDB"); auto tracer = provider->GetTracer("CentralDB");
fprintf(stderr, "checking qitem trace context\n");
for (auto const& kv : qitem.traceContext) {
fprintf(stderr, "traceContext: %s: %s\n", kv.first.c_str(), kv.second.c_str());
}
auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
OtelCarrier<std::map<std::string, std::string> > carrier(qitem.traceContext);
auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
auto new_context = propagator->Extract(carrier, current_ctx);
auto remote_span = opentelemetry::trace::GetSpan(new_context);
auto remote_scope = tracer->WithActiveSpan(remote_span);
{
auto span = tracer->StartSpan("CentralDB::commitThread"); auto span = tracer->StartSpan("CentralDB::commitThread");
auto scope = tracer->WithActiveSpan(span); auto scope = tracer->WithActiveSpan(span);
fprintf(stderr, "commitThread tick\n"); fprintf(stderr, "commitThread tick\n");
if (! qitem.first.is_object()) { if (! qitem.jsonData.is_object()) {
fprintf(stderr, "not an object\n"); fprintf(stderr, "not an object\n");
continue; continue;
} }
@ -1137,7 +1187,7 @@ void CentralDB::commitThread()
Metrics::pgsql_commit_ticks++; Metrics::pgsql_commit_ticks++;
try { try {
nlohmann::json& config = (qitem.first); nlohmann::json& config = (qitem.jsonData);
const std::string objtype = config["objtype"]; const std::string objtype = config["objtype"];
if (objtype == "member") { if (objtype == "member") {
auto mspan = tracer->StartSpan("CentralDB::commitThread::member"); auto mspan = tracer->StartSpan("CentralDB::commitThread::member");
@ -1176,7 +1226,8 @@ void CentralDB::commitThread()
continue; continue;
} }
pqxx::row mrow = w.exec( pqxx::row mrow =
w.exec(
"SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 " "SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 "
"AND network_id = $2", "AND network_id = $2",
pqxx::params { memberId, networkId }) pqxx::params { memberId, networkId })
@ -1195,8 +1246,8 @@ void CentralDB::commitThread()
fprintf( fprintf(
stderr, "skipping member %s-%s update. change source: %s, frontend: %s\n", stderr, "skipping member %s-%s update. change source: %s, frontend: %s\n",
networkId.c_str(), memberId.c_str(), change_source.c_str(), frontend.c_str()); networkId.c_str(), memberId.c_str(), change_source.c_str(), frontend.c_str());
// if it is not a new member and the change source is not the controller and doesn't match the // if it is not a new member and the change source is not the controller and doesn't match
// frontend, don't apply the change. // the frontend, don't apply the change.
continue; continue;
} }
@ -1229,14 +1280,18 @@ void CentralDB::commitThread()
pqxx::result res = pqxx::result res =
w.exec( w.exec(
"INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, " "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, "
"active_bridge, "
"ip_assignments, " "ip_assignments, "
"no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, creation_time, " "no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, "
"creation_time, "
"identity, last_authorized_time, last_deauthorized_time, " "identity, last_authorized_time, last_deauthorized_time, "
"remote_trace_level, remote_trace_target, revision, tags, version_major, version_minor, " "remote_trace_level, remote_trace_target, revision, tags, version_major, "
"version_minor, "
"version_revision, version_protocol) " "version_revision, version_protocol) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, " "VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, "
"TO_TIMESTAMP($10::double precision/1000), $11, TO_TIMESTAMP($12::double precision/1000), " "TO_TIMESTAMP($10::double precision/1000), $11, TO_TIMESTAMP($12::double "
"precision/1000), "
"TO_TIMESTAMP($13::double precision/1000), $14, $15, $16, $17, $18, $19, $20, $21) " "TO_TIMESTAMP($13::double precision/1000), $14, $15, $16, $17, $18, $19, $20, $21) "
"ON CONFLICT (device_id, network_id) DO UPDATE SET " "ON CONFLICT (device_id, network_id) DO UPDATE SET "
"authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, " "authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, "
@ -1251,7 +1306,8 @@ void CentralDB::commitThread()
"EXCLUDED.remote_trace_target, " "EXCLUDED.remote_trace_target, "
"revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = " "revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = "
"EXCLUDED.version_major, " "EXCLUDED.version_major, "
"version_minor = EXCLUDED.version_minor, version_revision = EXCLUDED.version_revision, " "version_minor = EXCLUDED.version_minor, version_revision = "
"EXCLUDED.version_revision, "
"version_protocol = EXCLUDED.version_protocol", "version_protocol = EXCLUDED.version_protocol",
pqxx::params { memberId, pqxx::params { memberId,
networkId, networkId,
@ -1319,7 +1375,7 @@ void CentralDB::commitThread()
get(nwidInt, nwOrig, memberidInt, memOrig); get(nwidInt, nwOrig, memberidInt, memOrig);
_memberChanged(memOrig, memNew, qitem.second); _memberChanged(memOrig, memNew, qitem.notifyListeners);
} }
else { else {
fprintf( fprintf(
@ -1329,8 +1385,8 @@ void CentralDB::commitThread()
} }
catch (std::exception& e) { catch (std::exception& e) {
fprintf( fprintf(
stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(),
memberId.c_str(), e.what()); networkId.c_str(), memberId.c_str(), e.what());
mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
} }
} }
@ -1354,7 +1410,8 @@ void CentralDB::commitThread()
if (! isNewNetwork) { if (! isNewNetwork) {
pqxx::row nwrow = pqxx::row nwrow =
w.exec("SELECT frontend FROM networks_ctl WHERE id = $1", pqxx::params { id }).one_row(); w.exec("SELECT frontend FROM networks_ctl WHERE id = $1", pqxx::params { id })
.one_row();
frontend = nwrow[0].as<std::string>(); frontend = nwrow[0].as<std::string>();
} }
@ -1364,10 +1421,11 @@ void CentralDB::commitThread()
} }
if (! isNewNetwork && change_source != "controller" && frontend != change_source) { if (! isNewNetwork && change_source != "controller" && frontend != change_source) {
// if it is not a new network and the change source is not the controller and doesn't match the // if it is not a new network and the change source is not the controller and doesn't match
// frontend, don't apply the change. // the frontend, don't apply the change.
fprintf( fprintf(
stderr, "Skipping network update %s. isNewNetwork: %s, change_source: %s, frontend: %s\n", stderr,
"Skipping network update %s. isNewNetwork: %s, change_source: %s, frontend: %s\n",
id.c_str(), isNewNetwork ? "true" : "false", change_source.c_str(), frontend.c_str()); id.c_str(), isNewNetwork ? "true" : "false", change_source.c_str(), frontend.c_str());
continue; continue;
} }
@ -1376,7 +1434,8 @@ void CentralDB::commitThread()
"INSERT INTO networks_ctl (id, name, configuration, controller_id, revision, frontend) " "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision, frontend) "
"VALUES ($1, $2, $3, $4, $5, $6) " "VALUES ($1, $2, $3, $4, $5, $6) "
"ON CONFLICT (id) DO UPDATE SET " "ON CONFLICT (id) DO UPDATE SET "
"name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1, " "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = "
"EXCLUDED.revision+1, "
"frontend = EXCLUDED.frontend", "frontend = EXCLUDED.frontend",
pqxx::params { id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1), pqxx::params { id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1),
_myAddressStr, ((uint64_t)config["revision"]), change_source }); _myAddressStr, ((uint64_t)config["revision"]), change_source });
@ -1402,7 +1461,7 @@ void CentralDB::commitThread()
get(nwidInt, nwOrig); get(nwidInt, nwOrig);
_networkChanged(nwOrig, nwNew, qitem.second); _networkChanged(nwOrig, nwNew, qitem.notifyListeners);
} }
else { else {
fprintf( fprintf(
@ -1450,7 +1509,7 @@ void CentralDB::commitThread()
json oldConfig; json oldConfig;
get(nwidInt, oldConfig); get(nwidInt, oldConfig);
json empty; json empty;
_networkChanged(oldConfig, empty, qitem.second); _networkChanged(oldConfig, empty, qitem.notifyListeners);
} }
catch (std::exception& e) { catch (std::exception& e) {
dspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); dspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
@ -1505,7 +1564,7 @@ void CentralDB::commitThread()
get(nwidInt, networkConfig, memberidInt, oldConfig); get(nwidInt, networkConfig, memberidInt, oldConfig);
json empty; json empty;
_memberChanged(oldConfig, empty, qitem.second); _memberChanged(oldConfig, empty, qitem.notifyListeners);
} }
catch (std::exception& e) { catch (std::exception& e) {
mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
@ -1543,6 +1602,7 @@ void CentralDB::commitThread()
_pool->unborrow(c); _pool->unborrow(c);
c.reset(); c.reset();
} }
}
fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str()); fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
} }
@ -1561,6 +1621,7 @@ void CentralDB::onlineNotificationThread()
{ {
waitForReady(); waitForReady();
while (_run == 1) { while (_run == 1) {
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("CentralDB"); auto tracer = provider->GetTracer("CentralDB");
auto span = tracer->StartSpan("CentralDB::onlineNotificationThread"); auto span = tracer->StartSpan("CentralDB::onlineNotificationThread");
@ -1596,9 +1657,10 @@ void CentralDB::onlineNotificationThread()
try { try {
// check if the member exists first. // check if the member exists first.
// //
// exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's the // exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's
// case, skip this record and move on. // the case, skip this record and move on.
pqxx::row r = w.exec( pqxx::row r =
w.exec(
"SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = " "SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = "
"$1 AND device_id " "$1 AND device_id "
"= $2", "= $2",
@ -1629,7 +1691,8 @@ void CentralDB::onlineNotificationThread()
version = "unknown"; version = "unknown";
} }
else { else {
version = "v" + std::to_string(vMajor) + "." + std::to_string(vMinor) + "." + std::to_string(vRev); version =
"v" + std::to_string(vMajor) + "." + std::to_string(vMinor) + "." + std::to_string(vRev);
} }
if (osArchSplit.size() == 2) { if (osArchSplit.size() == 2) {
os = osArchSplit[0]; os = osArchSplit[0];
@ -1646,6 +1709,7 @@ void CentralDB::onlineNotificationThread()
catch (std::exception& e) { catch (std::exception& e) {
fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what());
} }
}
std::this_thread::sleep_for(std::chrono::seconds(10)); std::this_thread::sleep_for(std::chrono::seconds(10));
} }

View file

@ -112,7 +112,20 @@ class CentralDB : public DB {
std::string _myAddressStr; std::string _myAddressStr;
std::string _connString; std::string _connString;
BlockingQueue<std::pair<nlohmann::json, bool> > _commitQueue; struct _queueItem {
_queueItem() : jsonData(), notifyListeners(false), traceContext()
{
}
~_queueItem()
{
}
nlohmann::json jsonData;
bool notifyListeners;
std::map<std::string, std::string> traceContext;
};
BlockingQueue<_queueItem> _commitQueue;
std::thread _heartbeatThread; std::thread _heartbeatThread;
std::shared_ptr<NotificationListener> _membersDbWatcher; std::shared_ptr<NotificationListener> _membersDbWatcher;

View file

@ -0,0 +1,47 @@
#ifndef OTEL_CARRIER_HPP
#define OTEL_CARRIER_HPP
#include "opentelemetry/context/propagation/text_map_propagator.h"
#include "opentelemetry/trace/propagation/http_trace_context.h"
namespace nostd = opentelemetry::nostd;
namespace ZeroTier {
template <typename T> class OtelCarrier : public opentelemetry::context::propagation::TextMapCarrier {
public:
OtelCarrier<T>(T& headers) : headers_(headers)
{
}
OtelCarrier() = default;
virtual nostd::string_view Get(nostd::string_view key) const noexcept override
{
std::string key_to_compare = key.data();
if (key == opentelemetry::trace::propagation::kTraceParent) {
key_to_compare = "traceparent";
}
else if (key == opentelemetry::trace::propagation::kTraceState) {
key_to_compare = "tracestate";
}
auto it = headers_.find(key_to_compare);
if (it != headers_.end()) {
return it->second;
}
return "";
}
virtual void Set(nostd::string_view key, nostd::string_view value) noexcept override
{
headers_.insert(std::pair<std::string, std::string>(std::string(key), std::string(value)));
}
T& headers_;
};
} // namespace ZeroTier
#endif // OTEL_CARRIER_HPP

View file

@ -4,11 +4,16 @@
#include "ControllerConfig.hpp" #include "ControllerConfig.hpp"
#include "CtlUtil.hpp" #include "CtlUtil.hpp"
#include "DB.hpp" #include "DB.hpp"
#include "OtelCarrier.hpp"
#include "member.pb.h" #include "member.pb.h"
#include "network.pb.h" #include "network.pb.h"
#include "opentelemetry/context/propagation/global_propagator.h"
#include "opentelemetry/trace/propagation/http_trace_context.h"
#include "opentelemetry/trace/provider.h" #include "opentelemetry/trace/provider.h"
#include "opentelemetry/trace/tracer.h"
#include "rustybits.h" #include "rustybits.h"
#include <google/cloud/opentelemetry_options.h>
#include <google/cloud/pubsub/admin/subscription_admin_client.h> #include <google/cloud/pubsub/admin/subscription_admin_client.h>
#include <google/cloud/pubsub/admin/subscription_admin_connection.h> #include <google/cloud/pubsub/admin/subscription_admin_connection.h>
#include <google/cloud/pubsub/admin/topic_admin_client.h> #include <google/cloud/pubsub/admin/topic_admin_client.h>
@ -50,7 +55,9 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s
create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id); create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id);
} }
_subscriber = std::make_shared<pubsub::Subscriber>(pubsub::MakeSubscriberConnection(*_subscription)); _subscriber = std::make_shared<pubsub::Subscriber>(
pubsub::MakeSubscriberConnection(*_subscription),
google::cloud::Options {}.set<google::cloud::OpenTelemetryTracingOption>(true));
_run = true; _run = true;
_subscriberThread = std::thread(&PubSubListener::subscribe, this); _subscriberThread = std::thread(&PubSubListener::subscribe, this);
@ -77,6 +84,23 @@ void PubSubListener::subscribe()
auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) { auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) {
auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("PubSubListener"); auto tracer = provider->GetTracer("PubSubListener");
auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
auto attrs = m.attributes();
std::map<std::string, std::string> attrs_map;
for (auto const& kv : m.attributes()) {
fprintf(stderr, "Message attribute: %s=%s\n", kv.first.c_str(), kv.second.c_str());
attrs_map.emplace(kv.first, kv.second);
}
OtelCarrier<std::map<std::string, std::string> > carrier(attrs_map);
auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
auto new_context = propagator->Extract(carrier, current_ctx);
auto remote_span = opentelemetry::trace::GetSpan(new_context);
auto remote_scope = tracer->WithActiveSpan(remote_span);
{
auto span = tracer->StartSpan("PubSubListener::onMessage"); auto span = tracer->StartSpan("PubSubListener::onMessage");
auto scope = tracer->WithActiveSpan(span); auto scope = tracer->WithActiveSpan(span);
span->SetAttribute("message_id", m.message_id()); span->SetAttribute("message_id", m.message_id());
@ -92,6 +116,7 @@ void PubSubListener::subscribe()
span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed"); span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed");
return false; return false;
} }
}
}); });
auto result = session.wait_for(std::chrono::seconds(10)); auto result = session.wait_for(std::chrono::seconds(10));

View file

@ -2,9 +2,11 @@
#include "../../osdep/OSUtils.hpp" #include "../../osdep/OSUtils.hpp"
#include "CtlUtil.hpp" #include "CtlUtil.hpp"
#include "OtelCarrier.hpp"
#include "member.pb.h" #include "member.pb.h"
#include "member_status.pb.h" #include "member_status.pb.h"
#include "network.pb.h" #include "network.pb.h"
#include "opentelemetry/context/propagation/global_propagator.h"
#include <chrono> #include <chrono>
#include <google/cloud/options.h> #include <google/cloud/options.h>
@ -42,7 +44,8 @@ PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string c
::google::cloud::Options {} ::google::cloud::Options {}
.set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone()) .set<pubsub::RetryPolicyOption>(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone())
.set<pubsub::BackoffPolicyOption>( .set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone()); pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone())
.set<pubsub::MessageOrderingOption>(true);
auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options)); auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options));
_publisher = std::make_shared<pubsub::Publisher>(std::move(publisher)); _publisher = std::make_shared<pubsub::Publisher>(std::move(publisher));
} }
@ -56,10 +59,26 @@ bool PubSubWriter::publishMessage(
const std::string& frontend, const std::string& frontend,
const std::string& orderingKey) const std::string& orderingKey)
{ {
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("PubSubWriter");
auto span = tracer->StartSpan("PubSubWriter::publishMessage");
auto scope = tracer->WithActiveSpan(span);
fprintf(stderr, "Publishing message to %s\n", _topic.c_str()); fprintf(stderr, "Publishing message to %s\n", _topic.c_str());
std::vector<std::pair<std::string, std::string> > attributes; std::vector<std::pair<std::string, std::string> > attributes;
attributes.emplace_back("controller_id", _controller_id); attributes.emplace_back("controller_id", _controller_id);
std::map<std::string, std::string> attrs_map;
OtelCarrier<std::map<std::string, std::string> > carrier(attrs_map);
auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator();
auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent();
propagator->Inject(carrier, current_ctx);
for (const auto& kv : attrs_map) {
fprintf(stderr, "Attributes injected: %s=%s\n", kv.first.c_str(), kv.second.c_str());
attributes.emplace_back(kv.first, kv.second);
}
if (! frontend.empty()) { if (! frontend.empty()) {
attributes.emplace_back("frontend", frontend); attributes.emplace_back("frontend", frontend);
} }

View file

@ -49,6 +49,10 @@
#endif #endif
#ifdef ZT_OPENTELEMETRY_ENABLED #ifdef ZT_OPENTELEMETRY_ENABLED
#include "opentelemetry/baggage/propagation/baggage_propagator.h"
#include "opentelemetry/context/propagation/composite_propagator.h"
#include "opentelemetry/context/propagation/global_propagator.h"
#include "opentelemetry/context/propagation/text_map_propagator.h"
#include "opentelemetry/exporters/memory/in_memory_data.h" #include "opentelemetry/exporters/memory/in_memory_data.h"
#include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h" #include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h"
#include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h" #include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h"
@ -68,6 +72,7 @@
#include "opentelemetry/sdk/trace/tracer.h" #include "opentelemetry/sdk/trace/tracer.h"
#include "opentelemetry/sdk/trace/tracer_context.h" #include "opentelemetry/sdk/trace/tracer_context.h"
#include "opentelemetry/sdk/trace/tracer_provider.h" #include "opentelemetry/sdk/trace/tracer_provider.h"
#include "opentelemetry/trace/propagation/http_trace_context.h"
#include "opentelemetry/trace/provider.h" #include "opentelemetry/trace/provider.h"
namespace sdktrace = opentelemetry::v1::sdk::trace; namespace sdktrace = opentelemetry::v1::sdk::trace;
@ -1169,6 +1174,19 @@ class OneServiceImpl : public OneService {
_traceProvider = opentelemetry::nostd::shared_ptr<sdktrace::TracerProvider>( _traceProvider = opentelemetry::nostd::shared_ptr<sdktrace::TracerProvider>(
new sdktrace::TracerProvider(std::move(tracer_context))); new sdktrace::TracerProvider(std::move(tracer_context)));
sdktrace::Provider::SetTracerProvider(_traceProvider); sdktrace::Provider::SetTracerProvider(_traceProvider);
std::vector<std::unique_ptr<opentelemetry::context::propagation::TextMapPropagator> > propagators;
propagators.push_back(
std::unique_ptr<opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));
propagators.push_back(
std::unique_ptr<opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::baggage::propagation::BaggagePropagator()));
auto p = opentelemetry::nostd::shared_ptr<opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::context::propagation::CompositePropagator(std::move(propagators)));
opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(p);
} }
} }