print a message when the SSO PSK is configured

This commit is contained in:
Grant Limberg 2026-03-31 08:48:00 -07:00
parent 7ec42461d8
commit b047038ca1
No known key found for this signature in database
GPG key ID: 8F2F97D3BE8D7735

View file

@ -52,13 +52,12 @@ using Attrs = std::vector<std::pair<std::string, std::string> >;
using Item = std::pair<std::string, Attrs>;
using ItemStream = std::vector<Item>;
CentralDB::CentralDB(
const Identity& myId,
const char* connString,
int listenPort,
CentralDB::ListenerMode listenMode,
CentralDB::StatusWriterMode statusMode,
const ControllerConfig* cc)
CentralDB::CentralDB(const Identity& myId,
const char* connString,
int listenPort,
CentralDB::ListenerMode listenMode,
CentralDB::StatusWriterMode statusMode,
const ControllerConfig* cc)
: DB()
, _listenerMode(listenMode)
, _statusWriterMode(statusMode)
@ -102,6 +101,7 @@ CentralDB::CentralDB(
// returns something non-NULL. If the hex encodes something shorter than 48 bytes,
// it will be padded at the end with zeroes. If longer, it'll be truncated.
Utils::unhex(ssoPskHex, _ssoPsk, sizeof(_ssoPsk));
fprintf(stderr, "SSO PSK configured\n");
}
const char* redisMemberStatus = getenv("ZT_REDIS_MEMBER_STATUS");
if (redisMemberStatus && (strcmp(redisMemberStatus, "true") == 0)) {
@ -139,9 +139,8 @@ CentralDB::CentralDB(
_readyLock.lock();
fprintf(
stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S,
::_timestr(), (unsigned long long)_myAddress.toInt());
fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL waiting for initial data download..." ZT_EOL_S,
::_timestr(), (unsigned long long)_myAddress.toInt());
_waitNoticePrinted = true;
initializeNetworks();
@ -172,16 +171,16 @@ CentralDB::CentralDB(
_myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->member_change_recv_topic, this);
_networksDbWatcher = std::make_shared<PubSubNetworkListener>(
_myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->network_change_recv_topic, this);
_changeNotifier = std::make_shared<PubSubChangeNotifier>(
_myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->member_change_send_topic,
cc->pubSubConfig->network_change_send_topic);
_changeNotifier = std::make_shared<PubSubChangeNotifier>(_myAddressStr, cc->pubSubConfig->project_id,
cc->pubSubConfig->member_change_send_topic,
cc->pubSubConfig->network_change_send_topic);
if (! cc->pubSubConfig->sso_send_topic.empty()) {
_ssoNonceWriter = std::make_shared<PubSubWriter>(
cc->pubSubConfig->project_id, cc->pubSubConfig->sso_send_topic, _myAddressStr);
_ssoNonceWriter = std::make_shared<PubSubWriter>(cc->pubSubConfig->project_id,
cc->pubSubConfig->sso_send_topic, _myAddressStr);
}
if (! cc->pubSubConfig->sso_recv_topic.empty()) {
_ssoAuthListener = std::make_shared<PubSubSSOListener>(
_myAddressStr, cc->pubSubConfig->project_id, cc->pubSubConfig->sso_recv_topic, _pool);
_ssoAuthListener = std::make_shared<PubSubSSOListener>(_myAddressStr, cc->pubSubConfig->project_id,
cc->pubSubConfig->sso_recv_topic, _pool);
}
}
else {
@ -306,9 +305,7 @@ bool CentralDB::waitForReady()
}
bool CentralDB::isReady()
{
return ((_ready == 2) && (_connected));
}
{ return ((_ready == 2) && (_connected)); }
bool CentralDB::save(nlohmann::json& record, bool notifyListeners)
{
@ -447,11 +444,10 @@ void CentralDB::eraseMember(const uint64_t networkId, const uint64_t memberId)
_memberChanged(qi.jsonData, nullJson, true);
}
void CentralDB::nodeIsOnline(
const uint64_t networkId,
const uint64_t memberId,
const InetAddress& physicalAddress,
const char* osArch)
void CentralDB::nodeIsOnline(const uint64_t networkId,
const uint64_t memberId,
const InetAddress& physicalAddress,
const char* osArch)
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("CentralDB");
@ -475,9 +471,7 @@ void CentralDB::nodeIsOnline(
}
void CentralDB::nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress)
{
this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown");
}
{ this->nodeIsOnline(networkId, memberId, physicalAddress, "unknown/unknown"); }
AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL)
{
@ -518,17 +512,15 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri
// check if the member exists first.
pqxx::row count =
w.exec(
"SELECT count(id) FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2",
pqxx::params { memberId, networkId })
w.exec("SELECT count(id) FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2",
pqxx::params { memberId, networkId })
.one_row();
if (count[0].as<int>() == 1) {
// Query the network's frontend while the transaction is guaranteed open
std::string frontend;
if (_ssoNonceWriter) {
pqxx::result fr = w.exec(
"SELECT frontend FROM networks_ctl WHERE id = $1",
pqxx::params { networkId });
pqxx::result fr =
w.exec("SELECT frontend FROM networks_ctl WHERE id = $1", pqxx::params { networkId });
if (fr.size() == 1) {
frontend = fr.at(0)[0].as<std::optional<std::string> >().value_or("");
}
@ -564,15 +556,14 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri
nonce = std::string(nonceBuf);
uint64_t nonceExpiration = OSUtils::now() + 300000ULL;
pqxx::result ir = w.exec(
"INSERT INTO sso_expiry "
"(nonce, nonce_expiration, network_id, device_id) VALUES "
"($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)",
pqxx::params { nonce, nonceExpiration, networkId, memberId });
pqxx::result ir = w.exec("INSERT INTO sso_expiry "
"(nonce, nonce_expiration, network_id, device_id) VALUES "
"($1, TO_TIMESTAMP($2::double precision/1000), $3, $4)",
pqxx::params { nonce, nonceExpiration, networkId, memberId });
if (_ssoNonceWriter) {
_ssoNonceWriter->publishSSONonceUpdate(
nonce, nonceExpiration, networkId, memberId, frontend);
_ssoNonceWriter->publishSSONonceUpdate(nonce, nonceExpiration, networkId, memberId,
frontend);
}
w.commit();
@ -598,8 +589,7 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri
// We use 0 for nonceExpiration on reused nonces since the actual
// expiration is already stored in the database.
if (_ssoNonceWriter && ! nonce.empty()) {
_ssoNonceWriter->publishSSONonceUpdate(
nonce, 0, networkId, memberId, frontend);
_ssoNonceWriter->publishSSONonceUpdate(nonce, 0, networkId, memberId, frontend);
}
r = w.exec(
@ -624,9 +614,8 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri
sso_version = r.at(0)[4].as<std::optional<uint64_t> >().value_or(1);
}
else if (r.size() > 1) {
fprintf(
stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n",
networkId.c_str());
fprintf(stderr, "ERROR: More than one auth endpoint for an organization?!?!? NetworkID: %s\n",
networkId.c_str());
}
else {
fprintf(stderr, "No client or auth endpoint?!?\n");
@ -669,9 +658,8 @@ AuthInfo CentralDB::getSSOAuthInfo(const nlohmann::json& member, const std::stri
}
}
else {
fprintf(
stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(),
authorization_endpoint.c_str());
fprintf(stderr, "client_id: %s\nauthorization_endpoint: %s\n", client_id.c_str(),
authorization_endpoint.c_str());
}
}
@ -701,33 +689,31 @@ void CentralDB::initializeNetworks()
try {
char qbuf[2048];
sprintf(
qbuf,
"SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision, frontend "
"FROM networks_ctl WHERE controller_id = '%s'",
_myAddressStr.c_str());
sprintf(qbuf,
"SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE 'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision, frontend "
"FROM networks_ctl WHERE controller_id = '%s'",
_myAddressStr.c_str());
auto c = _pool->borrow();
pqxx::work w(*c->c);
fprintf(stderr, "Load networks from psql...\n");
auto stream = pqxx::stream_from::query(w, qbuf);
std::tuple<
std::string // network ID
,
std::optional<std::string> // name
,
std::string // configuration
,
std::optional<uint64_t> // creation_time
,
std::optional<uint64_t> // last_modified
,
std::optional<uint64_t> // revision
,
std::string // frontend
>
std::tuple<std::string // network ID
,
std::optional<std::string> // name
,
std::string // configuration
,
std::optional<uint64_t> // creation_time
,
std::optional<uint64_t> // last_modified
,
std::optional<uint64_t> // revision
,
std::string // frontend
>
row;
uint64_t count = 0;
uint64_t total = 0;
@ -827,9 +813,8 @@ void CentralDB::initializeNetworks()
if (++this->_ready == 2) {
if (_waitNoticePrinted) {
fprintf(
stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(),
(unsigned long long)_myAddress.toInt());
fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,
_timestr(), (unsigned long long)_myAddress.toInt());
}
_readyLock.unlock();
}
@ -895,63 +880,61 @@ void CentralDB::initializeMembers()
}
char qbuf[2048];
sprintf(
qbuf,
"SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, "
"nm.no_auto_assign_ips, "
"nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, "
"(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
"nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags, "
"nm.frontend "
"FROM network_memberships_ctl nm "
"INNER JOIN networks_ctl n "
" ON nm.network_id = n.id "
"WHERE n.controller_id = '%s'",
_myAddressStr.c_str());
sprintf(qbuf,
"SELECT nm.device_id, nm.network_id, nm.authorized, nm.active_bridge, nm.ip_assignments, "
"nm.no_auto_assign_ips, "
"nm.sso_exempt, (EXTRACT(EPOCH FROM nm.authentication_expiry_time AT TIME ZONE 'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM nm.creation_time AT TIME ZONE 'UTC')*1000)::bigint, nm.identity, "
"(EXTRACT(EPOCH FROM nm.last_authorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM nm.last_deauthorized_time AT TIME ZONE 'UTC')*1000)::bigint, "
"nm.remote_trace_level, nm.remote_trace_target, nm.revision, nm.capabilities, nm.tags, "
"nm.frontend "
"FROM network_memberships_ctl nm "
"INNER JOIN networks_ctl n "
" ON nm.network_id = n.id "
"WHERE n.controller_id = '%s'",
_myAddressStr.c_str());
auto c = _pool->borrow();
pqxx::work w(*c->c);
fprintf(stderr, "Load members from psql...\n");
auto stream = pqxx::stream_from::query(w, qbuf);
std::tuple<
std::string // device ID
,
std::string // network ID
,
bool // authorized
,
std::optional<bool> // active_bridge
,
std::optional<std::string> // ip_assignments
,
std::optional<bool> // no_auto_assign_ips
,
std::optional<bool> // sso_exempt
,
std::optional<uint64_t> // authentication_expiry_time
,
std::optional<uint64_t> // creation_time
,
std::optional<std::string> // identity
,
std::optional<uint64_t> // last_authorized_time
,
std::optional<uint64_t> // last_deauthorized_time
,
std::optional<int32_t> // remote_trace_level
,
std::optional<std::string> // remote_trace_target
,
std::optional<uint64_t> // revision
,
std::optional<std::string> // capabilities
,
std::optional<std::string> // tags
,
std::string // frontend
>
std::tuple<std::string // device ID
,
std::string // network ID
,
bool // authorized
,
std::optional<bool> // active_bridge
,
std::optional<std::string> // ip_assignments
,
std::optional<bool> // no_auto_assign_ips
,
std::optional<bool> // sso_exempt
,
std::optional<uint64_t> // authentication_expiry_time
,
std::optional<uint64_t> // creation_time
,
std::optional<std::string> // identity
,
std::optional<uint64_t> // last_authorized_time
,
std::optional<uint64_t> // last_deauthorized_time
,
std::optional<int32_t> // remote_trace_level
,
std::optional<std::string> // remote_trace_target
,
std::optional<uint64_t> // revision
,
std::optional<std::string> // capabilities
,
std::optional<std::string> // tags
,
std::string // frontend
>
row;
auto tmp = std::chrono::high_resolution_clock::now();
@ -1072,9 +1055,8 @@ void CentralDB::initializeMembers()
if (++this->_ready == 2) {
if (_waitNoticePrinted) {
fprintf(
stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S, _timestr(),
(unsigned long long)_myAddress.toInt());
fprintf(stderr, "[%s] NOTICE: %.10llx controller PostgreSQL data download complete." ZT_EOL_S,
_timestr(), (unsigned long long)_myAddress.toInt());
}
_readyLock.unlock();
}
@ -1130,14 +1112,14 @@ void CentralDB::heartbeat()
try {
pqxx::work w { *c->c };
w.exec(
"INSERT INTO controllers_ctl (id, hostname, last_heartbeat, public_identity, version, assigned_central_version) VALUES "
"($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6) "
"ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = "
"EXCLUDED.last_heartbeat, "
"public_identity = EXCLUDED.public_identity, version = EXCLUDED.version, "
"assigned_central_version = EXCLUDED.assigned_central_version",
pqxx::params { controllerId, hostname, ts, publicIdentity, versionStr, _assignedCentralVersion })
w.exec("INSERT INTO controllers_ctl (id, hostname, last_heartbeat, public_identity, version, "
"assigned_central_version) VALUES "
"($1, $2, TO_TIMESTAMP($3::double precision/1000), $4, $5, $6) "
"ON CONFLICT (id) DO UPDATE SET hostname = EXCLUDED.hostname, last_heartbeat = "
"EXCLUDED.last_heartbeat, "
"public_identity = EXCLUDED.public_identity, version = EXCLUDED.version, "
"assigned_central_version = EXCLUDED.assigned_central_version",
pqxx::params { controllerId, hostname, ts, publicIdentity, versionStr, _assignedCentralVersion })
.no_rows();
w.commit();
}
@ -1235,9 +1217,8 @@ void CentralDB::commitThread()
// get network and the frontend it is assigned to
// if network does not exist, skip member update
pqxx::row nwrow =
w.exec(
"SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend",
pqxx::params { networkId })
w.exec("SELECT COUNT(id), frontend FROM networks_ctl WHERE id = $1 GROUP BY frontend",
pqxx::params { networkId })
.one_row();
int nwcount = nwrow[0].as<int>();
std::string frontend = nwrow[1].as<std::string>();
@ -1250,10 +1231,9 @@ void CentralDB::commitThread()
}
pqxx::row mrow =
w.exec(
"SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 "
"AND network_id = $2",
pqxx::params { memberId, networkId })
w.exec("SELECT COUNT(device_id) FROM network_memberships_ctl WHERE device_id = $1 "
"AND network_id = $2",
pqxx::params { memberId, networkId })
.one_row();
int membercount = mrow[0].as<int>();
bool isNewMember = (membercount == 0);
@ -1396,15 +1376,14 @@ void CentralDB::commitThread()
_memberChanged(memOrig, memNew, qitem.notifyListeners);
}
else {
fprintf(
stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n",
_myAddressStr.c_str(), (unsigned long long)nwidInt, (unsigned long long)memberidInt);
fprintf(stderr, "%s: Can't notify of change. Error parsing nwid or memberid: %llu-%llu\n",
_myAddressStr.c_str(), (unsigned long long)nwidInt,
(unsigned long long)memberidInt);
}
}
catch (std::exception& e) {
fprintf(
stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(),
networkId.c_str(), memberId.c_str(), e.what());
fprintf(stderr, "%s ERROR: Error updating member %s-%s: %s\n", _myAddressStr.c_str(),
networkId.c_str(), memberId.c_str(), e.what());
mspan->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
}
}
@ -1477,9 +1456,8 @@ void CentralDB::commitThread()
_networkChanged(nwOrig, nwNew, qitem.notifyListeners);
}
else {
fprintf(
stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(),
(unsigned long long)nwidInt);
fprintf(stderr, "%s: Can't notify network changed: %llu\n", _myAddressStr.c_str(),
(unsigned long long)nwidInt);
}
}
catch (std::exception& e) {
@ -1559,9 +1537,8 @@ void CentralDB::commitThread()
std::string networkId = config["nwid"];
pqxx::result res =
w.exec(
"DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2",
pqxx::params { memberId, networkId })
w.exec("DELETE FROM network_memberships_ctl WHERE device_id = $1 AND network_id = $2",
pqxx::params { memberId, networkId })
.no_rows();
w.commit();
@ -1670,11 +1647,10 @@ void CentralDB::onlineNotificationThread()
// exec_params1 will throw pqxx::unexpected_rows if not exactly one row is returned. If that's
// the case, skip this record and move on.
pqxx::row r =
w.exec(
"SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = "
"$1 AND device_id "
"= $2",
pqxx::params { networkId, memberId })
w.exec("SELECT device_id, network_id FROM network_memberships_ctl WHERE network_id = "
"$1 AND device_id "
"= $2",
pqxx::params { networkId, memberId })
.one_row();
}
catch (pqxx::unexpected_rows& e) {
@ -1709,8 +1685,8 @@ void CentralDB::onlineNotificationThread()
arch = osArchSplit[1];
}
_statusWriter->updateNodeStatus(
networkId, memberId, os, arch, version, i->second.physicalAddress, ts, frontend);
_statusWriter->updateNodeStatus(networkId, memberId, os, arch, version, i->second.physicalAddress,
ts, frontend);
}
_statusWriter->writePending();
w.commit();
@ -1805,8 +1781,8 @@ nlohmann::json CentralDB::_getNetworkMember(pqxx::work& tx, const std::string ne
out["frontend"] = frontend;
}
catch (std::exception& e) {
fprintf(
stderr, "ERROR: Error getting network member %s-%s: %s\n", networkID.c_str(), memberID.c_str(), e.what());
fprintf(stderr, "ERROR: Error getting network member %s-%s: %s\n", networkID.c_str(), memberID.c_str(),
e.what());
return nlohmann::json();
}
@ -1825,13 +1801,13 @@ nlohmann::json CentralDB::_getNetwork(pqxx::work& tx, const std::string networkI
std::optional<uint64_t> revision;
std::string frontend;
pqxx::row row = tx.exec(
"SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE "
"'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision, frontend "
"FROM networks_ctl WHERE id = $1",
pqxx::params { networkID })
.one_row();
pqxx::row row =
tx.exec("SELECT id, name, configuration , (EXTRACT(EPOCH FROM creation_time AT TIME ZONE "
"'UTC')*1000)::bigint, "
"(EXTRACT(EPOCH FROM last_modified AT TIME ZONE 'UTC')*1000)::bigint, revision, frontend "
"FROM networks_ctl WHERE id = $1",
pqxx::params { networkID })
.one_row();
cfg = row[2].as<std::string>();
creation_time = row[3].is_null() ? std::optional<uint64_t>() : std::optional<uint64_t>(row[3].as<uint64_t>());