From fc2eb214192be4c9f8677f46fd1f6228e4251265 Mon Sep 17 00:00:00 2001 From: stefanocasazza Date: Fri, 13 Sep 2019 18:45:05 +0200 Subject: [PATCH] update redis --- include/ulib/net/client/redis.h | 53 +++++++++++++++++++++------------ src/ulib/net/client/redis.cpp | 34 ++++++++++----------- tests/examples/TSA/tsaserial | 2 +- 3 files changed, 52 insertions(+), 37 deletions(-) diff --git a/include/ulib/net/client/redis.h b/include/ulib/net/client/redis.h index ab868ddb..bc6c3b10 100644 --- a/include/ulib/net/client/redis.h +++ b/include/ulib/net/client/redis.h @@ -990,19 +990,30 @@ private: public: + U_MEMORY_TEST + U_MEMORY_ALLOCATOR + U_MEMORY_DEALLOCATOR + void processResponse(); - UREDISClusterClient(UREDISClusterMaster *_master) : UREDISClient(), master(_master) {} + + UREDISClusterClient() = delete; + UREDISClusterClient(UREDISClusterMaster *_master) : master(_master) {} }; struct RedisClusterNode { + U_MEMORY_TEST + U_MEMORY_ALLOCATOR + U_MEMORY_DEALLOCATOR + UString ipAddress; - UREDISClusterClient client; + UREDISClusterClient *client; uint16_t port, lowHashSlot, highHashSlot; - RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot, UREDISClusterMaster *master) : ipAddress(_ipAddress), client(master), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) + RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot, UREDISClusterMaster *master) : ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) { - client.connect(ipAddress.c_str(), port); + U_NEW(UREDISClusterClient, client, UREDISClusterClient(master)); + client->connect(ipAddress.c_str(), port); } #if defined(U_STDCPP_ENABLE) && defined(DEBUG) @@ -1019,15 +1030,12 @@ enum class ClusterError : uint8_t { class U_EXPORT UREDISClusterMaster { private: - - U_MEMORY_ALLOCATOR - U_MEMORY_DEALLOCATOR friend class UREDISClusterClient; ClusterError error; UString temporaryASKip; - UREDISClusterClient subscriptionClient; + UREDISClusterClient *subscriptionClient; UHashMap clusterNodes; // when these call they need to be processed... also when MOVED... we need to set up and recalculate uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); } @@ -1044,7 +1052,7 @@ private: return hashslotForKey(command.substr(beginning, end - beginning)); } - UREDISClusterClient& clientForHashslot(uint16_t hashslot) + UREDISClusterClient* clientForHashslot(uint16_t hashslot) { U_TRACE(0, "UREDISClusterMaster::clientForHashslot(%u)", hashslot) @@ -1058,7 +1066,7 @@ private: return subscriptionClient; // never reached } - UREDISClusterClient& clientForASKip() + UREDISClusterClient* clientForASKip() { for (UHashMapNode *node : clusterNodes) { @@ -1070,7 +1078,7 @@ private: return subscriptionClient; // never reached } - UREDISClusterClient& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } + UREDISClusterClient* clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } template const UVector& processPipeline(UString& pipeline, bool reorderable); @@ -1078,16 +1086,20 @@ private: void calculateNodeMap(); public: - + + U_MEMORY_TEST + U_MEMORY_ALLOCATOR + U_MEMORY_DEALLOCATOR + bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379); // all of these multis require all keys to exist within a single hash slot (on the same node isn't good enough) - UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).single(pipeline); } - const UVector& clusterMulti( const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).multi(pipeline); } + UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey)->single(pipeline); } + const UVector& clusterMulti( const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey)->multi(pipeline); } - void clusterSilencedMulti( const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey).silencedMulti(pipeline); } - void clusterSilencedSingle(const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey).silencedSingle(pipeline); } + void clusterSilencedMulti( const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey)->silencedMulti(pipeline); } + void clusterSilencedSingle(const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey)->silencedSingle(pipeline); } // anon multis are pipelined commands of various keys that might belong to many nodes. always processed in order. Commands always delimined by \r\n // example -> SET {abc}xyz 5 \r\n GET abc{xyz} \r\n SET xyz{abc} 9 \r\n @@ -1101,10 +1113,13 @@ public: bool clusterUnsubscribe(const UString& channel); bool clusterSubscribe( const UString& channel, vPFcscs callback); - UREDISClusterMaster() : subscriptionClient(this) {} - + UREDISClusterMaster() + { + U_NEW(UREDISClusterClient, subscriptionClient, UREDISClusterClient(this)); + } + #if defined(U_STDCPP_ENABLE) && defined(DEBUG) - const char* dump(bool _reset) const { return subscriptionClient.UREDISClient_Base::dump(_reset); } + const char* dump(bool _reset) const { return subscriptionClient->UREDISClient_Base::dump(_reset); } #endif }; diff --git a/src/ulib/net/client/redis.cpp b/src/ulib/net/client/redis.cpp index ca41ec44..aaa15bf5 100644 --- a/src/ulib/net/client/redis.cpp +++ b/src/ulib/net/client/redis.cpp @@ -631,10 +631,10 @@ void UREDISClusterMaster::calculateNodeMap() uint16_t workingLowHashSlot; uint16_t workingHighHashSlot; - (void) subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); + (void) subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); UHashMap newNodes; - const UVector& rawNodes = subscriptionClient.vitem; + const UVector& rawNodes = subscriptionClient->vitem; for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2) { @@ -682,11 +682,11 @@ bool UREDISClusterMaster::connect(const char* host, unsigned int _port) { U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port) - if (subscriptionClient.connect(host, _port)) + if (subscriptionClient->connect(host, _port)) { calculateNodeMap(); - UServer_Base::addHandlerEvent(&subscriptionClient); // NB: we ask to listen for events to a Redis publish channel... + UServer_Base::addHandlerEvent(subscriptionClient); // NB: we ask to listen for events to a Redis publish channel... U_RETURN(true); } @@ -698,9 +698,9 @@ bool UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregist { U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep) - if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) + if (subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) { - (void)subscriptionClient.UREDISClient_Base::pchannelCallbackMap->erase(channel); + (void)subscriptionClient->UREDISClient_Base::pchannelCallbackMap->erase(channel); U_RETURN(true); } @@ -712,14 +712,14 @@ bool UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callb { U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback) - if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) + if (subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) { - if (subscriptionClient.UREDISClient_Base::pchannelCallbackMap == U_NULLPTR) + if (subscriptionClient->UREDISClient_Base::pchannelCallbackMap == U_NULLPTR) { - U_NEW(UHashMap, subscriptionClient.UREDISClient_Base::pchannelCallbackMap, UHashMap); + U_NEW(UHashMap, subscriptionClient->UREDISClient_Base::pchannelCallbackMap, UHashMap); } - subscriptionClient.UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback); + subscriptionClient->UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback); U_RETURN(true); } @@ -754,12 +754,12 @@ const UVector& UREDISClusterMaster::processPipeline(UString& pipeline, } } - UREDISClusterClient& client = clientForHashslot(hashslot); + UREDISClusterClient* client = clientForHashslot(hashslot); - if constexpr (silence) (void) client.sendRequest(workingString); + if constexpr (silence) (void) client->sendRequest(workingString); else { -replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); +replay: (void) client->processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); switch (error) { @@ -772,16 +772,16 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS case ClusterError::ask: { - UREDISClusterClient& temporaryClient = clientForASKip(); + UREDISClusterClient* temporaryClient = clientForASKip(); - (void) temporaryClient.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); + (void) temporaryClient->processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); } break; case ClusterError::none: break; } - if constexpr (silence == false) subscriptionClient.vitem.move(client.vitem); + if constexpr (silence == false) subscriptionClient->vitem.move(client->vitem); count = 0; workingString.clear(); @@ -846,7 +846,7 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS } } - return subscriptionClient.vitem; + return subscriptionClient->vitem; } # endif diff --git a/tests/examples/TSA/tsaserial b/tests/examples/TSA/tsaserial index 653f980a..8dda154a 100644 --- a/tests/examples/TSA/tsaserial +++ b/tests/examples/TSA/tsaserial @@ -1 +1 @@ -05FA +0601