Remove smee from CentralDB.

Now handled in CV1 on new member join via pubsub integration when a new member comes through
This commit is contained in:
Grant Limberg 2026-04-02 10:04:23 -07:00
parent 20f7311622
commit ea5c91b0e9
No known key found for this signature in database
GPG key ID: 8F2F97D3BE8D7735
2 changed files with 14 additions and 107 deletions

View file

@ -39,7 +39,6 @@
#include <libpq-fe.h>
#include <optional>
#include <pqxx/pqxx>
#include <rustybits.h>
#include <sstream>
// #define REDIS_TRACE 1
@ -74,15 +73,12 @@ CentralDB::CentralDB(const Identity& myId,
, _redis(NULL)
, _cluster(NULL)
, _redisMemberStatus(false)
, _smee(NULL)
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("CentralDB");
auto span = tracer->StartSpan("CentralDB::CentralDB");
auto scope = tracer->WithActiveSpan(span);
rustybits::init_async_runtime();
char myAddress[64];
_myAddressStr = myId.address().toString(myAddress);
_connString = std::string(connString);
@ -238,19 +234,10 @@ CentralDB::CentralDB(const Identity& myId,
_commitThread[i] = std::thread(&CentralDB::commitThread, this);
}
_onlineNotificationThread = std::thread(&CentralDB::onlineNotificationThread, this);
configureSmee();
}
CentralDB::~CentralDB()
{
if (_smee != NULL) {
rustybits::smee_client_delete(_smee);
_smee = NULL;
}
rustybits::shutdown_async_runtime();
_run = 0;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
@ -262,39 +249,6 @@ CentralDB::~CentralDB()
_onlineNotificationThread.join();
}
void CentralDB::configureSmee()
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("CentralDB");
auto span = tracer->StartSpan("CentralDB::configureSmee");
auto scope = tracer->WithActiveSpan(span);
const char* TEMPORAL_SCHEME = "ZT_TEMPORAL_SCHEME";
const char* TEMPORAL_HOST = "ZT_TEMPORAL_HOST";
const char* TEMPORAL_PORT = "ZT_TEMPORAL_PORT";
const char* TEMPORAL_NAMESPACE = "ZT_TEMPORAL_NAMESPACE";
const char* SMEE_TASK_QUEUE = "ZT_SMEE_TASK_QUEUE";
const char* scheme = getenv(TEMPORAL_SCHEME);
if (scheme == NULL) {
scheme = "http";
}
const char* host = getenv(TEMPORAL_HOST);
const char* port = getenv(TEMPORAL_PORT);
const char* ns = getenv(TEMPORAL_NAMESPACE);
const char* task_queue = getenv(SMEE_TASK_QUEUE);
if (scheme != NULL && host != NULL && port != NULL && ns != NULL && task_queue != NULL) {
fprintf(stderr, "creating smee client\n");
std::string hostPort =
std::string(scheme) + std::string("://") + std::string(host) + std::string(":") + std::string(port);
this->_smee = rustybits::smee_client_new(hostPort.c_str(), ns, task_queue);
}
else {
fprintf(stderr, "Smee client not configured\n");
}
}
bool CentralDB::waitForReady()
{
while (_ready < 2) {
@ -1337,24 +1291,6 @@ void CentralDB::commitThread()
}
}
if (_smee != NULL && isNewMember) {
// TODO: Smee Notifications for New Members
// pqxx::row row = w.exec_params1(
// "SELECT "
// " count(h.hook_id) "
// "FROM "
// " ztc_hook h "
// " INNER JOIN ztc_org o ON o.org_id = h.org_id "
// " INNER JOIN ztc_network n ON n.owner_id = o.owner_id "
// " WHERE "
// "n.id = $1 ",
// networkId);
// int64_t hookCount = row[0].as<int64_t>();
// if (hookCount > 0) {
// notifyNewMember(networkId, memberId);
// }
}
const uint64_t nwidInt = OSUtils::jsonIntHex(config["nwid"], 0ULL);
const uint64_t memberidInt = OSUtils::jsonIntHex(config["id"], 0ULL);
if (nwidInt && memberidInt) {
@ -1586,16 +1522,6 @@ void CentralDB::commitThread()
fprintf(stderr, "%s commitThread finished\n", _myAddressStr.c_str());
}
void CentralDB::notifyNewMember(const std::string& networkID, const std::string& memberID)
{
auto provider = opentelemetry::trace::Provider::GetTracerProvider();
auto tracer = provider->GetTracer("CentralDB");
auto span = tracer->StartSpan("CentralDB::notifyNewMember");
auto scope = tracer->WithActiveSpan(span);
rustybits::smee_client_notify_network_joined(_smee, networkID.c_str(), memberID.c_str());
}
void CentralDB::onlineNotificationThread()
{
waitForReady();

View file

@ -16,10 +16,6 @@
#include <pqxx/pqxx>
#include <sw/redis++/redis++.h>
namespace rustybits {
struct SmeeClient;
}
namespace ZeroTier {
struct RedisConfig;
struct ControllerConfig;
@ -40,13 +36,12 @@ class CentralDB : public DB {
STATUS_WRITER_MODE_BIGTABLE = 2,
};
CentralDB(
const Identity& myId,
const char* connString,
int listenPort,
CentralDB::ListenerMode mode,
CentralDB::StatusWriterMode statusMode,
const ControllerConfig* cc);
CentralDB(const Identity& myId,
const char* connString,
int listenPort,
CentralDB::ListenerMode mode,
CentralDB::StatusWriterMode statusMode,
const ControllerConfig* cc);
virtual ~CentralDB();
virtual bool waitForReady();
@ -55,34 +50,25 @@ class CentralDB : public DB {
virtual void eraseNetwork(const uint64_t networkId);
virtual void eraseMember(const uint64_t networkId, const uint64_t memberId);
virtual void nodeIsOnline(const uint64_t networkId, const uint64_t memberId, const InetAddress& physicalAddress);
virtual void nodeIsOnline(
const uint64_t networkId,
const uint64_t memberId,
const InetAddress& physicalAddress,
const char* osArch);
virtual void nodeIsOnline(const uint64_t networkId,
const uint64_t memberId,
const InetAddress& physicalAddress,
const char* osArch);
virtual AuthInfo getSSOAuthInfo(const nlohmann::json& member, const std::string& redirectURL);
virtual bool ready()
{
return _ready == 2;
}
{ return _ready == 2; }
virtual void _memberChanged(nlohmann::json& old, nlohmann::json& memberConfig, bool notifyListeners)
{
DB::_memberChanged(old, memberConfig, notifyListeners);
}
{ DB::_memberChanged(old, memberConfig, notifyListeners); }
virtual void _networkChanged(nlohmann::json& old, nlohmann::json& networkConfig, bool notifyListeners)
{
DB::_networkChanged(old, networkConfig, notifyListeners);
}
{ DB::_networkChanged(old, networkConfig, notifyListeners); }
protected:
struct _PairHasher {
inline std::size_t operator()(const std::pair<uint64_t, uint64_t>& p) const
{
return (std::size_t)(p.first ^ p.second);
}
{ return (std::size_t)(p.first ^ p.second); }
};
private:
@ -93,9 +79,6 @@ class CentralDB : public DB {
void commitThread();
void onlineNotificationThread();
void configureSmee();
void notifyNewMember(const std::string& networkID, const std::string& memberID);
nlohmann::json _getNetworkMember(pqxx::work& tx, const std::string networkID, const std::string memberID);
nlohmann::json _getNetwork(pqxx::work& tx, const std::string networkID);
@ -152,8 +135,6 @@ class CentralDB : public DB {
std::shared_ptr<sw::redis::Redis> _redis;
std::shared_ptr<sw::redis::RedisCluster> _cluster;
bool _redisMemberStatus;
rustybits::SmeeClient* _smee;
};
} // namespace ZeroTier