From 5c27068b801cad779fa42884a344ca12fbaf2a6d Mon Sep 17 00:00:00 2001 From: Grant Limberg Date: Thu, 30 Oct 2025 13:47:50 +0100 Subject: [PATCH] Enable cross-service propagation of otel trace metadata --- nonfree/controller/CentralDB.cpp | 1030 +++++++++++++------------ nonfree/controller/CentralDB.hpp | 15 +- nonfree/controller/OtelCarrier.hpp | 47 ++ nonfree/controller/PubSubListener.cpp | 51 +- nonfree/controller/PubSubWriter.cpp | 21 +- service/OneService.cpp | 18 + 6 files changed, 684 insertions(+), 498 deletions(-) create mode 100644 nonfree/controller/OtelCarrier.hpp diff --git a/nonfree/controller/CentralDB.cpp b/nonfree/controller/CentralDB.cpp index 3f1a93c03..5646f8c77 100644 --- a/nonfree/controller/CentralDB.cpp +++ b/nonfree/controller/CentralDB.cpp @@ -23,12 +23,14 @@ #include "ControllerConfig.hpp" #include "CtlUtil.hpp" #include "EmbeddedNetworkController.hpp" +#include "OtelCarrier.hpp" #include "PostgresStatusWriter.hpp" #include "PubSubListener.hpp" #include "PubSubWriter.hpp" #include "Redis.hpp" #include "RedisListener.hpp" #include "RedisStatusWriter.hpp" +#include "opentelemetry/context/propagation/global_propagator.h" #include "opentelemetry/trace/provider.h" #include @@ -314,6 +316,9 @@ bool CentralDB::save(nlohmann::json& record, bool notifyListeners) } const std::string objtype = record["objtype"]; if (objtype == "network") { + auto span = tracer->StartSpan("CentralDB::save::network"); + auto scope = tracer->WithActiveSpan(span); + fprintf(stderr, "CentralDB network save %s\n", record["id"].get().c_str()); const uint64_t nwid = OSUtils::jsonIntHex(record["id"], 0ULL); if (nwid) { @@ -322,12 +327,23 @@ bool CentralDB::save(nlohmann::json& record, bool notifyListeners) if ((! old.is_object()) || (! _compareRecords(old, record))) { fprintf(stderr, "posting network change to commit queue\n"); record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL; - _commitQueue.post(std::pair(record, notifyListeners)); + _queueItem qi; + qi.jsonData = record; + qi.notifyListeners = notifyListeners; + OtelCarrier > carrier(qi.traceContext); + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + auto propagator = + opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + propagator->Inject(carrier, current_ctx); + _commitQueue.post(qi); modified = true; } } } else if (objtype == "member") { + auto span = tracer->StartSpan("CentralDB::save::member"); + auto scope = tracer->WithActiveSpan(span); + std::string networkId = record["nwid"]; std::string memberId = record["id"]; const uint64_t nwid = OSUtils::jsonIntHex(record["nwid"], 0ULL); @@ -339,7 +355,17 @@ bool CentralDB::save(nlohmann::json& record, bool notifyListeners) if ((! old.is_object()) || (! _compareRecords(old, record))) { fprintf(stderr, "posting member change to commit queue\n"); record["revision"] = OSUtils::jsonInt(record["revision"], 0ULL) + 1ULL; - _commitQueue.post(std::pair(record, notifyListeners)); + + _queueItem qi; + qi.jsonData = record; + qi.notifyListeners = notifyListeners; + OtelCarrier > carrier(qi.traceContext); + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + auto propagator = + opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + propagator->Inject(carrier, current_ctx); + _commitQueue.post(qi); + modified = true; } else { @@ -373,13 +399,19 @@ void CentralDB::eraseNetwork(const uint64_t networkId) char tmp2[24]; waitForReady(); Utils::hex(networkId, tmp2); - std::pair tmp; - tmp.first["id"] = tmp2; - tmp.first["objtype"] = "_delete_network"; - tmp.second = true; - _commitQueue.post(tmp); + + _queueItem qi; + qi.jsonData["id"] = tmp2; + qi.jsonData["objtype"] = "_delete_network"; + qi.notifyListeners = true; + OtelCarrier > carrier(qi.traceContext); + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + propagator->Inject(carrier, current_ctx); + _commitQueue.post(qi); + nlohmann::json nullJson; - _networkChanged(tmp.first, nullJson, true); + _networkChanged(qi.jsonData, nullJson, true); } void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId) @@ -396,16 +428,20 @@ void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId) fprintf(stderr, "CentralDB::eraseMember\n"); char tmp2[24]; waitForReady(); - std::pair tmp, nw; - Utils::hex(networkId, tmp2); - tmp.first["nwid"] = tmp2; - Utils::hex(memberId, tmp2); - tmp.first["id"] = tmp2; - tmp.first["objtype"] = "_delete_member"; - tmp.second = true; - _commitQueue.post(tmp); + + _queueItem qi; + qi.jsonData["nwid"] = tmp2; + qi.jsonData["id"] = tmp2; + qi.jsonData["objtype"] = "_delete_member"; + qi.notifyListeners = true; + OtelCarrier > carrier(qi.traceContext); + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + propagator->Inject(carrier, current_ctx); + _commitQueue.post(qi); + nlohmann::json nullJson; - _memberChanged(tmp.first, nullJson, true); + _memberChanged(qi.jsonData, nullJson, true); } void CentralDB::nodeIsOnline( @@ -1108,440 +1144,464 @@ void CentralDB::heartbeat() void CentralDB::commitThread() { fprintf(stderr, "%s: commitThread start\n", _myAddressStr.c_str()); - std::pair qitem; + _queueItem qitem; while (_commitQueue.get(qitem) & (_run == 1)) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("CentralDB"); - auto span = tracer->StartSpan("CentralDB::commitThread"); - auto scope = tracer->WithActiveSpan(span); - fprintf(stderr, "commitThread tick\n"); - if (! qitem.first.is_object()) { - fprintf(stderr, "not an object\n"); - continue; + fprintf(stderr, "checking qitem trace context\n"); + for (auto const& kv : qitem.traceContext) { + fprintf(stderr, "traceContext: %s: %s\n", kv.first.c_str(), kv.second.c_str()); } - std::shared_ptr c; - try { - c = _pool->borrow(); - } - catch (std::exception& e) { - fprintf(stderr, "ERROR: %s\n", e.what()); - continue; - } + auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + OtelCarrier > carrier(qitem.traceContext); + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + auto new_context = propagator->Extract(carrier, current_ctx); + auto remote_span = opentelemetry::trace::GetSpan(new_context); + auto remote_scope = tracer->WithActiveSpan(remote_span); - if (! c) { - fprintf(stderr, "Error getting database connection\n"); - continue; - } + { + auto span = tracer->StartSpan("CentralDB::commitThread"); + auto scope = tracer->WithActiveSpan(span); - Metrics::pgsql_commit_ticks++; - try { - nlohmann::json& config = (qitem.first); - const std::string objtype = config["objtype"]; - if (objtype == "member") { - auto mspan = tracer->StartSpan("CentralDB::commitThread::member"); - auto mscope = tracer->WithActiveSpan(mspan); - - // fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str()); - std::string memberId; - std::string networkId; - - try { - pqxx::work w(*c->c); - - memberId = config["id"]; - networkId = config["nwid"]; - fprintf(stderr, "commit member %s-%s\n", networkId.c_str(), memberId.c_str()); - - std::string target = "NULL"; - if (! config["remoteTraceTarget"].is_null()) { - target = config["remoteTraceTarget"]; - } - - // get network and the frontend it is assigned to - // if network does not exist, skip member update - pqxx::row nwrow = - w.exec( - "SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend", - pqxx::params { networkId }) - .one_row(); - int nwcount = nwrow[0].as(); - std::string frontend = nwrow[1].as(); - - if (nwcount != 1) { - fprintf(stderr, "network %s does not exist. skipping member upsert\n", networkId.c_str()); - w.abort(); - _pool->unborrow(c); - continue; - } - - pqxx::row mrow = w.exec( - "SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 " - "AND network_id = $2", - pqxx::params { memberId, networkId }) - .one_row(); - int membercount = mrow[0].as(); - bool isNewMember = (membercount == 0); - - std::string change_source; - if (! config["change_source"].is_null()) { - change_source = config["change_source"]; - } - else { - change_source = "controller"; - } - if (! isNewMember && change_source != "controller" && frontend != change_source) { - fprintf( - stderr, "skipping member %s-%s update. change source: %s, frontend: %s\n", - networkId.c_str(), memberId.c_str(), change_source.c_str(), frontend.c_str()); - // if it is not a new member and the change source is not the controller and doesn't match the - // frontend, don't apply the change. - continue; - } - - std::vector ipAssignments; - fprintf( - stderr, "Saving IP Assignments: \n\tipAssignments: %s\n", - OSUtils::jsonDump(config["ipAssignments"], -1).c_str()); - if (config["ipAssignments"].is_array()) { - for (auto& ip : config["ipAssignments"]) { - if (ip.is_string()) { - ipAssignments.push_back(ip.get()); - } - } - } - - fprintf(stderr, "member json: %s\n", config.dump().c_str()); - - int64_t vMajor = OSUtils::jsonUInt(config["vMajor"], 0); - int64_t vMinor = OSUtils::jsonUInt(config["vMinor"], 0); - int64_t vRev = OSUtils::jsonUInt(config["vRev"], 0); - int64_t vProto = OSUtils::jsonUInt(config["vProto"], 0); - if (vMajor < 0) - vMajor = 0; - if (vMinor < 0) - vMinor = 0; - if (vRev < 0) - vRev = 0; - if (vProto < 0) - vProto = 0; - - pqxx::result res = - w.exec( - "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, active_bridge, " - "ip_assignments, " - "no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, creation_time, " - "identity, last_authorized_time, last_deauthorized_time, " - "remote_trace_level, remote_trace_target, revision, tags, version_major, version_minor, " - "version_revision, version_protocol) " - "VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, " - "TO_TIMESTAMP($10::double precision/1000), $11, TO_TIMESTAMP($12::double precision/1000), " - "TO_TIMESTAMP($13::double precision/1000), $14, $15, $16, $17, $18, $19, $20, $21) " - "ON CONFLICT (device_id, network_id) DO UPDATE SET " - "authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, " - "ip_assignments = EXCLUDED.ip_assignments, no_auto_assign_ips = " - "EXCLUDED.no_auto_assign_ips, " - "sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = " - "EXCLUDED.authentication_expiry_time, " - "capabilities = EXCLUDED.capabilities, creation_time = EXCLUDED.creation_time, " - "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " - "last_deauthorized_time = EXCLUDED.last_deauthorized_time, " - "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = " - "EXCLUDED.remote_trace_target, " - "revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = " - "EXCLUDED.version_major, " - "version_minor = EXCLUDED.version_minor, version_revision = EXCLUDED.version_revision, " - "version_protocol = EXCLUDED.version_protocol", - pqxx::params { memberId, - networkId, - OSUtils::jsonBool(config["authorized"], false), - OSUtils::jsonBool(config["activeBridge"], false), - ipAssignments, - OSUtils::jsonBool(config["noAutoAssignIps"], false), - OSUtils::jsonBool(config["ssoExempt"], false), - OSUtils::jsonInt(config["authenticationExpiryTime"], 0), - OSUtils::jsonDump(config["capabilities"], -1), - OSUtils::jsonInt(config["creationTime"], OSUtils::now()), - OSUtils::jsonString(config["identity"], ""), - OSUtils::jsonInt(config["lastAuthorizedTime"], 0), - OSUtils::jsonInt(config["lastDeauthorizedTime"], 0), - OSUtils::jsonInt(config["remoteTraceLevel"], 0), - target, - OSUtils::jsonInt(config["revision"], 0), - OSUtils::jsonDump(config["tags"], -1), - vMajor, - vMinor, - vRev, - vProto }) - .no_rows(); - - w.commit(); - - if (_listenerMode == LISTENER_MODE_PUBSUB) { - // Publish change to pubsub stream - - if (config["change_source"].is_null() || config["change_source"] == "controller") { - nlohmann::json oldMember; - nlohmann::json newMember = config; - if (! isNewMember) { - oldMember = _getNetworkMember(w, networkId, memberId); - } - _changeNotifier->notifyMemberChange(oldMember, newMember, frontend); - } - } - - if (_smee != NULL && isNewMember) { - // TODO: Smee Notifications for New Members - // pqxx::row row = w.exec_params1( - // "SELECT " - // " count(h.hook_id) " - // "FROM " - // " ztc_hook h " - // " INNER JOIN ztc_org o ON o.org_id = h.org_id " - // " INNER JOIN ztc_network n ON n.owner_id = o.owner_id " - // " WHERE " - // "n.id = $1 ", - // networkId); - // int64_t hookCount = row[0].as(); - // if (hookCount > 0) { - // notifyNewMember(networkId, memberId); - // } - } - - const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); - const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL); - if (nwidInt && memberidInt) { - nlohmann::json nwOrig; - nlohmann::json memOrig; - - nlohmann::json memNew(config); - - get(nwidInt, nwOrig, memberidInt, memOrig); - - _memberChanged(memOrig, memNew, qitem.second); - } - else { - fprintf( - stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", - _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt); - } - } - catch (std::exception& e) { - fprintf( - stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), networkId.c_str(), - memberId.c_str(), e.what()); - mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - } + fprintf(stderr, "commitThread tick\n"); + if (! qitem.jsonData.is_object()) { + fprintf(stderr, "not an object\n"); + continue; } - else if (objtype == "network") { - auto nspan = tracer->StartSpan("CentralDB::commitThread::network"); - auto nscope = tracer->WithActiveSpan(nspan); - try { - // fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str()); - pqxx::work w(*c->c); + std::shared_ptr c; + try { + c = _pool->borrow(); + } + catch (std::exception& e) { + fprintf(stderr, "ERROR: %s\n", e.what()); + continue; + } - std::string id = config["id"]; - fprintf(stderr, "commit network %s\n", id.c_str()); + if (! c) { + fprintf(stderr, "Error getting database connection\n"); + continue; + } - pqxx::row nwrow = - w.exec("SELECT COUNT(id) frontend FROM networks_ctl WHERE id = $1", pqxx::params { id }) - .one_row(); - int nwcount = nwrow[0].as(); - bool isNewNetwork = (nwcount == 0); - std::string frontend = ""; + Metrics::pgsql_commit_ticks++; + try { + nlohmann::json& config = (qitem.jsonData); + const std::string objtype = config["objtype"]; + if (objtype == "member") { + auto mspan = tracer->StartSpan("CentralDB::commitThread::member"); + auto mscope = tracer->WithActiveSpan(mspan); - if (! isNewNetwork) { + // fprintf(stderr, "%s: commitThread: member\n", _myAddressStr.c_str()); + std::string memberId; + std::string networkId; + + try { + pqxx::work w(*c->c); + + memberId = config["id"]; + networkId = config["nwid"]; + fprintf(stderr, "commit member %s-%s\n", networkId.c_str(), memberId.c_str()); + + std::string target = "NULL"; + if (! config["remoteTraceTarget"].is_null()) { + target = config["remoteTraceTarget"]; + } + + // get network and the frontend it is assigned to + // if network does not exist, skip member update pqxx::row nwrow = - w.exec("SELECT frontend FROM networks_ctl WHERE id = $1", pqxx::params { id }).one_row(); - frontend = nwrow[0].as(); - } + w.exec( + "SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend", + pqxx::params { networkId }) + .one_row(); + int nwcount = nwrow[0].as(); + std::string frontend = nwrow[1].as(); - std::string change_source; - if (! config["change_source"].is_null()) { - change_source = config["change_source"]; - } + if (nwcount != 1) { + fprintf(stderr, "network %s does not exist. skipping member upsert\n", networkId.c_str()); + w.abort(); + _pool->unborrow(c); + continue; + } - if (! isNewNetwork && change_source != "controller" && frontend != change_source) { - // if it is not a new network and the change source is not the controller and doesn't match the - // frontend, don't apply the change. + pqxx::row mrow = + w.exec( + "SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 " + "AND network_id = $2", + pqxx::params { memberId, networkId }) + .one_row(); + int membercount = mrow[0].as(); + bool isNewMember = (membercount == 0); + + std::string change_source; + if (! config["change_source"].is_null()) { + change_source = config["change_source"]; + } + else { + change_source = "controller"; + } + if (! isNewMember && change_source != "controller" && frontend != change_source) { + fprintf( + stderr, "skipping member %s-%s update. change source: %s, frontend: %s\n", + networkId.c_str(), memberId.c_str(), change_source.c_str(), frontend.c_str()); + // if it is not a new member and the change source is not the controller and doesn't match + // the frontend, don't apply the change. + continue; + } + + std::vector ipAssignments; fprintf( - stderr, "Skipping network update %s. isNewNetwork: %s, change_source: %s, frontend: %s\n", - id.c_str(), isNewNetwork ? "true" : "false", change_source.c_str(), frontend.c_str()); - continue; - } - - pqxx::result res = w.exec( - "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision, frontend) " - "VALUES ($1, $2, $3, $4, $5, $6) " - "ON CONFLICT (id) DO UPDATE SET " - "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = EXCLUDED.revision+1, " - "frontend = EXCLUDED.frontend", - pqxx::params { id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1), - _myAddressStr, ((uint64_t)config["revision"]), change_source }); - - w.commit(); - - if (_listenerMode == LISTENER_MODE_PUBSUB) { - // Publish change to pubsub stream - if (config["change_source"].is_null() || config["change_source"] == "controller") { - nlohmann::json oldNetwork; - nlohmann::json newNetwork = config; - if (! isNewNetwork) { - oldNetwork = _getNetwork(w, id); + stderr, "Saving IP Assignments: \n\tipAssignments: %s\n", + OSUtils::jsonDump(config["ipAssignments"], -1).c_str()); + if (config["ipAssignments"].is_array()) { + for (auto& ip : config["ipAssignments"]) { + if (ip.is_string()) { + ipAssignments.push_back(ip.get()); + } } - _changeNotifier->notifyNetworkChange(oldNetwork, newNetwork, frontend); + } + + fprintf(stderr, "member json: %s\n", config.dump().c_str()); + + int64_t vMajor = OSUtils::jsonUInt(config["vMajor"], 0); + int64_t vMinor = OSUtils::jsonUInt(config["vMinor"], 0); + int64_t vRev = OSUtils::jsonUInt(config["vRev"], 0); + int64_t vProto = OSUtils::jsonUInt(config["vProto"], 0); + if (vMajor < 0) + vMajor = 0; + if (vMinor < 0) + vMinor = 0; + if (vRev < 0) + vRev = 0; + if (vProto < 0) + vProto = 0; + + pqxx::result res = + w.exec( + "INSERT INTO network_memberships_ctl (device_id, network_id, authorized, " + "active_bridge, " + "ip_assignments, " + "no_auto_assign_ips, sso_exempt, authentication_expiry_time, capabilities, " + "creation_time, " + "identity, last_authorized_time, last_deauthorized_time, " + "remote_trace_level, remote_trace_target, revision, tags, version_major, " + "version_minor, " + "version_revision, version_protocol) " + "VALUES ($1, $2, $3, $4, $5, $6, $7, TO_TIMESTAMP($8::double precision/1000), $9, " + "TO_TIMESTAMP($10::double precision/1000), $11, TO_TIMESTAMP($12::double " + "precision/1000), " + "TO_TIMESTAMP($13::double precision/1000), $14, $15, $16, $17, $18, $19, $20, $21) " + "ON CONFLICT (device_id, network_id) DO UPDATE SET " + "authorized = EXCLUDED.authorized, active_bridge = EXCLUDED.active_bridge, " + "ip_assignments = EXCLUDED.ip_assignments, no_auto_assign_ips = " + "EXCLUDED.no_auto_assign_ips, " + "sso_exempt = EXCLUDED.sso_exempt, authentication_expiry_time = " + "EXCLUDED.authentication_expiry_time, " + "capabilities = EXCLUDED.capabilities, creation_time = EXCLUDED.creation_time, " + "identity = EXCLUDED.identity, last_authorized_time = EXCLUDED.last_authorized_time, " + "last_deauthorized_time = EXCLUDED.last_deauthorized_time, " + "remote_trace_level = EXCLUDED.remote_trace_level, remote_trace_target = " + "EXCLUDED.remote_trace_target, " + "revision = EXCLUDED.revision, tags = EXCLUDED.tags, version_major = " + "EXCLUDED.version_major, " + "version_minor = EXCLUDED.version_minor, version_revision = " + "EXCLUDED.version_revision, " + "version_protocol = EXCLUDED.version_protocol", + pqxx::params { memberId, + networkId, + OSUtils::jsonBool(config["authorized"], false), + OSUtils::jsonBool(config["activeBridge"], false), + ipAssignments, + OSUtils::jsonBool(config["noAutoAssignIps"], false), + OSUtils::jsonBool(config["ssoExempt"], false), + OSUtils::jsonInt(config["authenticationExpiryTime"], 0), + OSUtils::jsonDump(config["capabilities"], -1), + OSUtils::jsonInt(config["creationTime"], OSUtils::now()), + OSUtils::jsonString(config["identity"], ""), + OSUtils::jsonInt(config["lastAuthorizedTime"], 0), + OSUtils::jsonInt(config["lastDeauthorizedTime"], 0), + OSUtils::jsonInt(config["remoteTraceLevel"], 0), + target, + OSUtils::jsonInt(config["revision"], 0), + OSUtils::jsonDump(config["tags"], -1), + vMajor, + vMinor, + vRev, + vProto }) + .no_rows(); + + w.commit(); + + if (_listenerMode == LISTENER_MODE_PUBSUB) { + // Publish change to pubsub stream + + if (config["change_source"].is_null() || config["change_source"] == "controller") { + nlohmann::json oldMember; + nlohmann::json newMember = config; + if (! isNewMember) { + oldMember = _getNetworkMember(w, networkId, memberId); + } + _changeNotifier->notifyMemberChange(oldMember, newMember, frontend); + } + } + + if (_smee != NULL && isNewMember) { + // TODO: Smee Notifications for New Members + // pqxx::row row = w.exec_params1( + // "SELECT " + // " count(h.hook_id) " + // "FROM " + // " ztc_hook h " + // " INNER JOIN ztc_org o ON o.org_id = h.org_id " + // " INNER JOIN ztc_network n ON n.owner_id = o.owner_id " + // " WHERE " + // "n.id = $1 ", + // networkId); + // int64_t hookCount = row[0].as(); + // if (hookCount > 0) { + // notifyNewMember(networkId, memberId); + // } + } + + const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); + const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL); + if (nwidInt && memberidInt) { + nlohmann::json nwOrig; + nlohmann::json memOrig; + + nlohmann::json memNew(config); + + get(nwidInt, nwOrig, memberidInt, memOrig); + + _memberChanged(memOrig, memNew, qitem.notifyListeners); + } + else { + fprintf( + stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n", + _myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt); } } - - const uint64_t nwidInt = OSUtils::jsonIntHex(config["id"], 0ULL); - if (nwidInt) { - nlohmann::json nwOrig; - nlohmann::json nwNew(config); - - get(nwidInt, nwOrig); - - _networkChanged(nwOrig, nwNew, qitem.second); - } - else { + catch (std::exception& e) { fprintf( - stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), - (unsigned long long)nwidInt); + stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(), + networkId.c_str(), memberId.c_str(), e.what()); + mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); } } - catch (std::exception& e) { - nspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what()); - } - if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { + else if (objtype == "network") { + auto nspan = tracer->StartSpan("CentralDB::commitThread::network"); + auto nscope = tracer->WithActiveSpan(nspan); + try { + // fprintf(stderr, "%s: commitThread: network\n", _myAddressStr.c_str()); + pqxx::work w(*c->c); + std::string id = config["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_cc->redisConfig->clusterMode) { - _cluster->sadd(key, id); + fprintf(stderr, "commit network %s\n", id.c_str()); + + pqxx::row nwrow = + w.exec("SELECT COUNT(id) frontend FROM networks_ctl WHERE id = $1", pqxx::params { id }) + .one_row(); + int nwcount = nwrow[0].as(); + bool isNewNetwork = (nwcount == 0); + std::string frontend = ""; + + if (! isNewNetwork) { + pqxx::row nwrow = + w.exec("SELECT frontend FROM networks_ctl WHERE id = $1", pqxx::params { id }) + .one_row(); + frontend = nwrow[0].as(); + } + + std::string change_source; + if (! config["change_source"].is_null()) { + change_source = config["change_source"]; + } + + if (! isNewNetwork && change_source != "controller" && frontend != change_source) { + // if it is not a new network and the change source is not the controller and doesn't match + // the frontend, don't apply the change. + fprintf( + stderr, + "Skipping network update %s. isNewNetwork: %s, change_source: %s, frontend: %s\n", + id.c_str(), isNewNetwork ? "true" : "false", change_source.c_str(), frontend.c_str()); + continue; + } + + pqxx::result res = w.exec( + "INSERT INTO networks_ctl (id, name, configuration, controller_id, revision, frontend) " + "VALUES ($1, $2, $3, $4, $5, $6) " + "ON CONFLICT (id) DO UPDATE SET " + "name = EXCLUDED.name, configuration = EXCLUDED.configuration, revision = " + "EXCLUDED.revision+1, " + "frontend = EXCLUDED.frontend", + pqxx::params { id, OSUtils::jsonString(config["name"], ""), OSUtils::jsonDump(config, -1), + _myAddressStr, ((uint64_t)config["revision"]), change_source }); + + w.commit(); + + if (_listenerMode == LISTENER_MODE_PUBSUB) { + // Publish change to pubsub stream + if (config["change_source"].is_null() || config["change_source"] == "controller") { + nlohmann::json oldNetwork; + nlohmann::json newNetwork = config; + if (! isNewNetwork) { + oldNetwork = _getNetwork(w, id); + } + _changeNotifier->notifyNetworkChange(oldNetwork, newNetwork, frontend); + } + } + + const uint64_t nwidInt = OSUtils::jsonIntHex(config["id"], 0ULL); + if (nwidInt) { + nlohmann::json nwOrig; + nlohmann::json nwNew(config); + + get(nwidInt, nwOrig); + + _networkChanged(nwOrig, nwNew, qitem.notifyListeners); } else { - _redis->sadd(key, id); + fprintf( + stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(), + (unsigned long long)nwidInt); } } - catch (sw::redis::Error& e) { + catch (std::exception& e) { nspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + fprintf(stderr, "%s ERROR: Error updating network: %s\n", _myAddressStr.c_str(), e.what()); + } + if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { + try { + std::string id = config["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_cc->redisConfig->clusterMode) { + _cluster->sadd(key, id); + } + else { + _redis->sadd(key, id); + } + } + catch (sw::redis::Error& e) { + nspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } } } - } - else if (objtype == "_delete_network") { - auto dspan = tracer->StartSpan("CentralDB::commitThread::_delete_network"); - auto dscope = tracer->WithActiveSpan(dspan); + else if (objtype == "_delete_network") { + auto dspan = tracer->StartSpan("CentralDB::commitThread::_delete_network"); + auto dscope = tracer->WithActiveSpan(dspan); - // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str()); - try { - pqxx::work w(*c->c); - std::string networkId = config["id"]; - fprintf(stderr, "Deleting network %s\n", networkId.c_str()); - w.exec("DELETE FROM network_memberships_ctl WHERE network_id = $1", pqxx::params { networkId }); - w.exec("DELETE FROM networks_ctl WHERE id = $1", pqxx::params { networkId }); - - w.commit(); - - uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); - json oldConfig; - get(nwidInt, oldConfig); - json empty; - _networkChanged(oldConfig, empty, qitem.second); - } - catch (std::exception& e) { - dspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what()); - } - if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { + // fprintf(stderr, "%s: commitThread: delete network\n", _myAddressStr.c_str()); try { - std::string id = config["id"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "networks:{" + controllerId + "}"; - if (_cc->redisConfig->clusterMode) { - _cluster->srem(key, id); - _cluster->del("network-nodes-online:{" + controllerId + "}:" + id); - } - else { - _redis->srem(key, id); - _redis->del("network-nodes-online:{" + controllerId + "}:" + id); - } + pqxx::work w(*c->c); + std::string networkId = config["id"]; + fprintf(stderr, "Deleting network %s\n", networkId.c_str()); + w.exec("DELETE FROM network_memberships_ctl WHERE network_id = $1", pqxx::params { networkId }); + w.exec("DELETE FROM networks_ctl WHERE id = $1", pqxx::params { networkId }); + + w.commit(); + + uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); + json oldConfig; + get(nwidInt, oldConfig); + json empty; + _networkChanged(oldConfig, empty, qitem.notifyListeners); } - catch (sw::redis::Error& e) { + catch (std::exception& e) { dspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + fprintf(stderr, "%s ERROR: Error deleting network: %s\n", _myAddressStr.c_str(), e.what()); + } + if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { + try { + std::string id = config["id"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "networks:{" + controllerId + "}"; + if (_cc->redisConfig->clusterMode) { + _cluster->srem(key, id); + _cluster->del("network-nodes-online:{" + controllerId + "}:" + id); + } + else { + _redis->srem(key, id); + _redis->del("network-nodes-online:{" + controllerId + "}:" + id); + } + } + catch (sw::redis::Error& e) { + dspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + fprintf(stderr, "ERROR: Error adding network to Redis: %s\n", e.what()); + } } } - } - else if (objtype == "_delete_member") { - auto mspan = tracer->StartSpan("CentralDB::commitThread::_delete_member"); - auto mscope = tracer->WithActiveSpan(mspan); + else if (objtype == "_delete_member") { + auto mspan = tracer->StartSpan("CentralDB::commitThread::_delete_member"); + auto mscope = tracer->WithActiveSpan(mspan); - // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str()); - try { - pqxx::work w(*c->c); - - std::string memberId = config["id"]; - std::string networkId = config["nwid"]; - - fprintf(stderr, "Deleting member %s-%s\n", networkId.c_str(), memberId.c_str()); - - pqxx::result res = - w.exec( - "DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2", - pqxx::params { memberId, networkId }) - .no_rows(); - - w.commit(); - - uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); - uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL); - - nlohmann::json networkConfig; - nlohmann::json oldConfig; - - get(nwidInt, networkConfig, memberidInt, oldConfig); - json empty; - _memberChanged(oldConfig, empty, qitem.second); - } - catch (std::exception& e) { - mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what()); - } - if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { + // fprintf(stderr, "%s commitThread: delete member\n", _myAddressStr.c_str()); try { + pqxx::work w(*c->c); + std::string memberId = config["id"]; std::string networkId = config["nwid"]; - std::string controllerId = _myAddressStr.c_str(); - std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; - if (_cc->redisConfig->clusterMode) { - _cluster->srem(key, memberId); - _cluster->del("member:{" + controllerId + "}:" + networkId + ":" + memberId); - } - else { - _redis->srem(key, memberId); - _redis->del("member:{" + controllerId + "}:" + networkId + ":" + memberId); - } + + fprintf(stderr, "Deleting member %s-%s\n", networkId.c_str(), memberId.c_str()); + + pqxx::result res = + w.exec( + "DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2", + pqxx::params { memberId, networkId }) + .no_rows(); + + w.commit(); + + uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL); + uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL); + + nlohmann::json networkConfig; + nlohmann::json oldConfig; + + get(nwidInt, networkConfig, memberidInt, oldConfig); + json empty; + _memberChanged(oldConfig, empty, qitem.notifyListeners); } - catch (sw::redis::Error& e) { + catch (std::exception& e) { mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + fprintf(stderr, "%s ERROR: Error deleting member: %s\n", _myAddressStr.c_str(), e.what()); + } + if (_listenerMode == LISTENER_MODE_REDIS && _redisMemberStatus) { + try { + std::string memberId = config["id"]; + std::string networkId = config["nwid"]; + std::string controllerId = _myAddressStr.c_str(); + std::string key = "network-nodes-all:{" + controllerId + "}:" + networkId; + if (_cc->redisConfig->clusterMode) { + _cluster->srem(key, memberId); + _cluster->del("member:{" + controllerId + "}:" + networkId + ":" + memberId); + } + else { + _redis->srem(key, memberId); + _redis->del("member:{" + controllerId + "}:" + networkId + ":" + memberId); + } + } + catch (sw::redis::Error& e) { + mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + fprintf(stderr, "ERROR: Error deleting member from Redis: %s\n", e.what()); + } } } + else { + fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str()); + } } - else { - fprintf(stderr, "%s ERROR: unknown objtype\n", _myAddressStr.c_str()); + catch (std::exception& e) { + span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); + fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what()); } + _pool->unborrow(c); + c.reset(); } - catch (std::exception& e) { - span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what()); - fprintf(stderr, "%s ERROR: Error getting objtype: %s\n", _myAddressStr.c_str(), e.what()); - } - _pool->unborrow(c); - c.reset(); } fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str()); @@ -1561,90 +1621,94 @@ void CentralDB::onlineNotificationThread() { waitForReady(); while (_run == 1) { - auto provider = opentelemetry::trace::Provider::GetTracerProvider(); - auto tracer = provider->GetTracer("CentralDB"); - auto span = tracer->StartSpan("CentralDB::onlineNotificationThread"); - auto scope = tracer->WithActiveSpan(span); + { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + auto tracer = provider->GetTracer("CentralDB"); + auto span = tracer->StartSpan("CentralDB::onlineNotificationThread"); + auto scope = tracer->WithActiveSpan(span); - try { - std::unordered_map, NodeOnlineRecord, _PairHasher> lastOnline; - { - std::lock_guard l(_lastOnline_l); - lastOnline.swap(_lastOnline); + try { + std::unordered_map, NodeOnlineRecord, _PairHasher> lastOnline; + { + std::lock_guard l(_lastOnline_l); + lastOnline.swap(_lastOnline); + } + + uint64_t updateCount = 0; + auto c = _pool->borrow(); + pqxx::work w(*c->c); + for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) { + updateCount += 1; + uint64_t nwid_i = i->first.first; + char nwidTmp[64]; + char memTmp[64]; + char ipTmp[64]; + OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); + OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", i->first.second); + nlohmann::json network, member; + + if (! get(nwid_i, network, i->first.second, member)) { + continue; // skip non existent networks/members + } + + std::string networkId(nwidTmp); + std::string memberId(memTmp); + + try { + // check if the member exists first. + // + // exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's + // the case, skip this record and move on. + pqxx::row r = + w.exec( + "SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = " + "$1 AND device_id " + "= $2", + pqxx::params { networkId, memberId }) + .one_row(); + } + catch (pqxx::unexpected_rows& e) { + continue; + } + + int64_t ts = i->second.lastSeen; + std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp); + std::string timestamp = std::to_string(ts); + std::string osArch = i->second.osArch; + std::vector osArchSplit = split(osArch, '/'); + std::string os = "unknown"; + std::string arch = "unknown"; + std::string frontend = OSUtils::jsonString(network["frontend"], ""); + + int vMajor = OSUtils::jsonInt(member["vMajor"], 0); + int vMinor = OSUtils::jsonInt(member["vMinor"], 0); + int vRev = OSUtils::jsonInt(member["vRev"], 0); + std::string version; + if (vMajor <= 0 && vMinor <= 0 && vRev <= 0) { + vMajor = 0; + vMinor = 0; + vRev = 0; + version = "unknown"; + } + else { + version = + "v" + std::to_string(vMajor) + "." + std::to_string(vMinor) + "." + std::to_string(vRev); + } + if (osArchSplit.size() == 2) { + os = osArchSplit[0]; + arch = osArchSplit[1]; + } + + _statusWriter->updateNodeStatus( + networkId, memberId, os, arch, version, i->second.physicalAddress, ts, frontend); + } + _statusWriter->writePending(); + w.commit(); + _pool->unborrow(c); } - - uint64_t updateCount = 0; - auto c = _pool->borrow(); - pqxx::work w(*c->c); - for (auto i = lastOnline.begin(); i != lastOnline.end(); ++i) { - updateCount += 1; - uint64_t nwid_i = i->first.first; - char nwidTmp[64]; - char memTmp[64]; - char ipTmp[64]; - OSUtils::ztsnprintf(nwidTmp, sizeof(nwidTmp), "%.16llx", nwid_i); - OSUtils::ztsnprintf(memTmp, sizeof(memTmp), "%.10llx", i->first.second); - nlohmann::json network, member; - - if (! get(nwid_i, network, i->first.second, member)) { - continue; // skip non existent networks/members - } - - std::string networkId(nwidTmp); - std::string memberId(memTmp); - - try { - // check if the member exists first. - // - // exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's the - // case, skip this record and move on. - pqxx::row r = w.exec( - "SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = " - "$1 AND device_id " - "= $2", - pqxx::params { networkId, memberId }) - .one_row(); - } - catch (pqxx::unexpected_rows& e) { - continue; - } - - int64_t ts = i->second.lastSeen; - std::string ipAddr = i->second.physicalAddress.toIpString(ipTmp); - std::string timestamp = std::to_string(ts); - std::string osArch = i->second.osArch; - std::vector osArchSplit = split(osArch, '/'); - std::string os = "unknown"; - std::string arch = "unknown"; - std::string frontend = OSUtils::jsonString(network["frontend"], ""); - - int vMajor = OSUtils::jsonInt(member["vMajor"], 0); - int vMinor = OSUtils::jsonInt(member["vMinor"], 0); - int vRev = OSUtils::jsonInt(member["vRev"], 0); - std::string version; - if (vMajor <= 0 && vMinor <= 0 && vRev <= 0) { - vMajor = 0; - vMinor = 0; - vRev = 0; - version = "unknown"; - } - else { - version = "v" + std::to_string(vMajor) + "." + std::to_string(vMinor) + "." + std::to_string(vRev); - } - if (osArchSplit.size() == 2) { - os = osArchSplit[0]; - arch = osArchSplit[1]; - } - - _statusWriter->updateNodeStatus( - networkId, memberId, os, arch, version, i->second.physicalAddress, ts, frontend); + catch (std::exception& e) { + fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } - _statusWriter->writePending(); - w.commit(); - _pool->unborrow(c); - } - catch (std::exception& e) { - fprintf(stderr, "%s: error in onlinenotification thread: %s\n", _myAddressStr.c_str(), e.what()); } std::this_thread::sleep_for(std::chrono::seconds(10)); diff --git a/nonfree/controller/CentralDB.hpp b/nonfree/controller/CentralDB.hpp index 932bd001f..ed4a16060 100644 --- a/nonfree/controller/CentralDB.hpp +++ b/nonfree/controller/CentralDB.hpp @@ -112,7 +112,20 @@ class CentralDB : public DB { std::string _myAddressStr; std::string _connString; - BlockingQueue > _commitQueue; + struct _queueItem { + _queueItem() : jsonData(), notifyListeners(false), traceContext() + { + } + + ~_queueItem() + { + } + + nlohmann::json jsonData; + bool notifyListeners; + std::map traceContext; + }; + BlockingQueue<_queueItem> _commitQueue; std::thread _heartbeatThread; std::shared_ptr _membersDbWatcher; diff --git a/nonfree/controller/OtelCarrier.hpp b/nonfree/controller/OtelCarrier.hpp new file mode 100644 index 000000000..10a710530 --- /dev/null +++ b/nonfree/controller/OtelCarrier.hpp @@ -0,0 +1,47 @@ +#ifndef OTEL_CARRIER_HPP +#define OTEL_CARRIER_HPP + +#include "opentelemetry/context/propagation/text_map_propagator.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" + +namespace nostd = opentelemetry::nostd; + +namespace ZeroTier { + +template class OtelCarrier : public opentelemetry::context::propagation::TextMapCarrier { + public: + OtelCarrier(T& headers) : headers_(headers) + { + } + + OtelCarrier() = default; + + virtual nostd::string_view Get(nostd::string_view key) const noexcept override + { + std::string key_to_compare = key.data(); + + if (key == opentelemetry::trace::propagation::kTraceParent) { + key_to_compare = "traceparent"; + } + else if (key == opentelemetry::trace::propagation::kTraceState) { + key_to_compare = "tracestate"; + } + auto it = headers_.find(key_to_compare); + if (it != headers_.end()) { + return it->second; + } + + return ""; + } + + virtual void Set(nostd::string_view key, nostd::string_view value) noexcept override + { + headers_.insert(std::pair(std::string(key), std::string(value))); + } + + T& headers_; +}; + +} // namespace ZeroTier + +#endif // OTEL_CARRIER_HPP \ No newline at end of file diff --git a/nonfree/controller/PubSubListener.cpp b/nonfree/controller/PubSubListener.cpp index 1e93a47a3..a15586499 100644 --- a/nonfree/controller/PubSubListener.cpp +++ b/nonfree/controller/PubSubListener.cpp @@ -4,11 +4,16 @@ #include "ControllerConfig.hpp" #include "CtlUtil.hpp" #include "DB.hpp" +#include "OtelCarrier.hpp" #include "member.pb.h" #include "network.pb.h" +#include "opentelemetry/context/propagation/global_propagator.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" #include "opentelemetry/trace/provider.h" +#include "opentelemetry/trace/tracer.h" #include "rustybits.h" +#include #include #include #include @@ -50,7 +55,9 @@ PubSubListener::PubSubListener(std::string controller_id, std::string project, s create_gcp_pubsub_subscription_if_needed(_project, _subscription_id, _topic, _controller_id); } - _subscriber = std::make_shared(pubsub::MakeSubscriberConnection(*_subscription)); + _subscriber = std::make_shared( + pubsub::MakeSubscriberConnection(*_subscription), + google::cloud::Options {}.set(true)); _run = true; _subscriberThread = std::thread(&PubSubListener::subscribe, this); @@ -77,20 +84,38 @@ void PubSubListener::subscribe() auto session = _subscriber->Subscribe([this](pubsub::Message const& m, pubsub::AckHandler h) { auto provider = opentelemetry::trace::Provider::GetTracerProvider(); auto tracer = provider->GetTracer("PubSubListener"); - auto span = tracer->StartSpan("PubSubListener::onMessage"); - auto scope = tracer->WithActiveSpan(span); - span->SetAttribute("message_id", m.message_id()); - span->SetAttribute("ordering_key", m.ordering_key()); - fprintf(stderr, "Received message %s\n", m.message_id().c_str()); - if (onNotification(m.data())) { - std::move(h).ack(); - span->SetStatus(opentelemetry::trace::StatusCode::kOk); - return true; + auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + auto attrs = m.attributes(); + std::map attrs_map; + for (auto const& kv : m.attributes()) { + fprintf(stderr, "Message attribute: %s=%s\n", kv.first.c_str(), kv.second.c_str()); + attrs_map.emplace(kv.first, kv.second); } - else { - span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed"); - return false; + + OtelCarrier > carrier(attrs_map); + + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + auto new_context = propagator->Extract(carrier, current_ctx); + auto remote_span = opentelemetry::trace::GetSpan(new_context); + auto remote_scope = tracer->WithActiveSpan(remote_span); + + { + auto span = tracer->StartSpan("PubSubListener::onMessage"); + auto scope = tracer->WithActiveSpan(span); + span->SetAttribute("message_id", m.message_id()); + span->SetAttribute("ordering_key", m.ordering_key()); + + fprintf(stderr, "Received message %s\n", m.message_id().c_str()); + if (onNotification(m.data())) { + std::move(h).ack(); + span->SetStatus(opentelemetry::trace::StatusCode::kOk); + return true; + } + else { + span->SetStatus(opentelemetry::trace::StatusCode::kError, "onNotification failed"); + return false; + } } }); diff --git a/nonfree/controller/PubSubWriter.cpp b/nonfree/controller/PubSubWriter.cpp index 00bd26598..c35800abc 100644 --- a/nonfree/controller/PubSubWriter.cpp +++ b/nonfree/controller/PubSubWriter.cpp @@ -2,9 +2,11 @@ #include "../../osdep/OSUtils.hpp" #include "CtlUtil.hpp" +#include "OtelCarrier.hpp" #include "member.pb.h" #include "member_status.pb.h" #include "network.pb.h" +#include "opentelemetry/context/propagation/global_propagator.h" #include #include @@ -42,7 +44,8 @@ PubSubWriter::PubSubWriter(std::string project, std::string topic, std::string c ::google::cloud::Options {} .set(pubsub::LimitedTimeRetryPolicy(std::chrono::seconds(5)).clone()) .set( - pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone()); + pubsub::ExponentialBackoffPolicy(std::chrono::milliseconds(100), std::chrono::seconds(2), 1.3).clone()) + .set(true); auto publisher = pubsub::MakePublisherConnection(pubsub::Topic(project, topic), std::move(options)); _publisher = std::make_shared(std::move(publisher)); } @@ -56,10 +59,26 @@ bool PubSubWriter::publishMessage( const std::string& frontend, const std::string& orderingKey) { + auto provider = opentelemetry::trace::Provider::GetTracerProvider(); + auto tracer = provider->GetTracer("PubSubWriter"); + auto span = tracer->StartSpan("PubSubWriter::publishMessage"); + auto scope = tracer->WithActiveSpan(span); + fprintf(stderr, "Publishing message to %s\n", _topic.c_str()); std::vector > attributes; attributes.emplace_back("controller_id", _controller_id); + std::map attrs_map; + OtelCarrier > carrier(attrs_map); + auto propagator = opentelemetry::context::propagation::GlobalTextMapPropagator::GetGlobalPropagator(); + auto current_ctx = opentelemetry::context::RuntimeContext::GetCurrent(); + propagator->Inject(carrier, current_ctx); + + for (const auto& kv : attrs_map) { + fprintf(stderr, "Attributes injected: %s=%s\n", kv.first.c_str(), kv.second.c_str()); + attributes.emplace_back(kv.first, kv.second); + } + if (! frontend.empty()) { attributes.emplace_back("frontend", frontend); } diff --git a/service/OneService.cpp b/service/OneService.cpp index 605f9345a..107562cbf 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -49,6 +49,10 @@ #endif #ifdef ZT_OPENTELEMETRY_ENABLED +#include "opentelemetry/baggage/propagation/baggage_propagator.h" +#include "opentelemetry/context/propagation/composite_propagator.h" +#include "opentelemetry/context/propagation/global_propagator.h" +#include "opentelemetry/context/propagation/text_map_propagator.h" #include "opentelemetry/exporters/memory/in_memory_data.h" #include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h" #include "opentelemetry/exporters/otlp/otlp_grpc_log_record_exporter.h" @@ -68,6 +72,7 @@ #include "opentelemetry/sdk/trace/tracer.h" #include "opentelemetry/sdk/trace/tracer_context.h" #include "opentelemetry/sdk/trace/tracer_provider.h" +#include "opentelemetry/trace/propagation/http_trace_context.h" #include "opentelemetry/trace/provider.h" namespace sdktrace = opentelemetry::v1::sdk::trace; @@ -1169,6 +1174,19 @@ class OneServiceImpl : public OneService { _traceProvider = opentelemetry::nostd::shared_ptr( new sdktrace::TracerProvider(std::move(tracer_context))); sdktrace::Provider::SetTracerProvider(_traceProvider); + + std::vector > propagators; + propagators.push_back( + std::unique_ptr( + new opentelemetry::trace::propagation::HttpTraceContext())); + propagators.push_back( + std::unique_ptr( + new opentelemetry::baggage::propagation::BaggagePropagator())); + + auto p = opentelemetry::nostd::shared_ptr( + new opentelemetry::context::propagation::CompositePropagator(std::move(propagators))); + + opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(p); } }