From 18487034b5b28ac17924dd5001a6c76bcdd621c4 Mon Sep 17 00:00:00 2001 From: stefanocasazza Date: Tue, 27 Aug 2019 18:44:35 +0200 Subject: [PATCH] sync --- include/ulib/container/hash_map.h | 9 ++- include/ulib/net/client/redis.h | 39 ++++++--- src/ulib/net/client/redis.cpp | 128 +++++++++++++----------------- tests/examples/TSA/tsaserial | 2 +- 4 files changed, 90 insertions(+), 88 deletions(-) diff --git a/include/ulib/container/hash_map.h b/include/ulib/container/hash_map.h index 7a4c7529..f65f3862 100644 --- a/include/ulib/container/hash_map.h +++ b/include/ulib/container/hash_map.h @@ -1044,7 +1044,10 @@ private: #if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX11) template class UHashMapAnonIter { public: - explicit UHashMapAnonIter(UHashMap* m, uint32_t l) : map(m), length(l) {} + explicit UHashMapAnonIter(UHashMap* m, uint32_t l) : map(m), length(l) + { + if (length == 0) node = map->firstNode(); + } bool operator!=(const UHashMapAnonIter& other) const { return (length != other.length); } @@ -1052,8 +1055,8 @@ public: UHashMapAnonIter& operator++() { - node = (length++ ? map->nextNode() : map->firstNode()); - + length++; + node = map->nextNode(); return *this; } diff --git a/include/ulib/net/client/redis.h b/include/ulib/net/client/redis.h index c10f1080..49fb3620 100644 --- a/include/ulib/net/client/redis.h +++ b/include/ulib/net/client/redis.h @@ -822,7 +822,7 @@ protected: } void init(); - void processResponse(); + virtual void processResponse(); bool processRequest(char recvtype); bool sendRequest(const UString& pipeline) @@ -987,6 +987,18 @@ private: UString ipAddress; UREDISClient client; uint16_t port, lowHashSlot, highHashSlot; + + RedisNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : + ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) + { + client.connect(ipAddress.data(), port); + } + + // DEBUG + +#if defined(U_STDCPP_ENABLE) && defined(DEBUG) + const char* dump(bool _reset) const { return ""; } +#endif }; enum class ClusterError : uint8_t { @@ -998,8 +1010,7 @@ private: ClusterError error; UString temporaryASKip; - std::vector redisNodes; - UREDISClient subscriptionClient; + UHashMap redisNodes; uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); } @@ -1019,22 +1030,26 @@ private: { U_TRACE(0, "UREDISClusterClient::clientForHashslot(%u)", hashslot) - for (RedisNode& workingNode : redisNodes) + for (UHashMapNode *node : redisNodes) { - if ((workingNode.lowHashSlot <= hashslot) || (workingNode.highHashSlot >= hashslot)) return workingNode.client; + RedisNode* workingNode = (RedisNode *)(node->elem); + + if ((workingNode->lowHashSlot <= hashslot) || (workingNode->highHashSlot >= hashslot)) return workingNode->client; } - return redisNodes[0].client; + return *this; // never reached } UREDISClient& clientForASKip() { - for (RedisNode& workingNode : redisNodes) + for (UHashMapNode *node : redisNodes) { - if (temporaryASKip == workingNode.ipAddress) return workingNode.client; + RedisNode* workingNode = (RedisNode *)(node->elem); + + if (temporaryASKip == workingNode->ipAddress) return workingNode->client; } - return redisNodes[0].client; + return *this; // never reached } UREDISClient& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } @@ -1050,7 +1065,7 @@ public: U_TRACE_DTOR(0, UREDISClusterClient) } - void processResponse(); + void processResponse() final; void calculateNodeMap(); bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379); @@ -1087,5 +1102,9 @@ public: private: U_DISALLOW_COPY_AND_ASSIGN(UREDISClusterClient) }; + +extern template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); +extern template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); + #endif #endif diff --git a/src/ulib/net/client/redis.cpp b/src/ulib/net/client/redis.cpp index 6bebd865..903f5d49 100644 --- a/src/ulib/net/client/redis.cpp +++ b/src/ulib/net/client/redis.cpp @@ -209,13 +209,12 @@ U_NO_EXPORT bool UREDISClient_Base::getResponseItem() while (*ptr2 != '\r') ++ptr2; len = ptr2-ptr1; + + // U_RC_INLINE example -> +OK\r\n + // U_RC_INT example -> :0\r\n + // U_RC_ERROR example -> -Error message\r\n - if (len != 1 || - ptr1[0] != '0' || - prefix != U_RC_INT) - { - pvec->push_back(UClient_Base::response.substr(ptr1, len)); - } + pvec->push_back(UClient_Base::response.substr(ptr1, len)); start += len + U_CONSTANT_SIZE(U_CRLF); @@ -294,40 +293,13 @@ U_NO_EXPORT bool UREDISClient_Base::getResponseItem() U_INTERNAL_ASSERT_EQUALS(prefix, U_RC_MULTIBULK) - UVector vec1(len); - UVector* pvec1 = pvec; - pvec = &vec1; - start += (ptr2-ptr1); for (uint32_t i = 0; i < len; ++i) { - if (getResponseItem() == false) - { - if (UClient_Base::isConnected() == false) - { - (void) UClient_Base::connect(); - - U_RETURN(false); - } - - pvec1->move(vec1); - } - else - { - typedef UVector uvectorstring; - - char buffer_output[64U * 1024U]; - uint32_t buffer_output_len = UObject2String(vec1, buffer_output, sizeof(buffer_output)); - - pvec1->push_back(UStringRep::create(buffer_output_len, buffer_output_len, (const char*)buffer_output)); - - vec1.clear(); - } + getResponseItem(); } - - pvec = pvec1; - + U_RETURN(true); } @@ -620,41 +592,51 @@ void UREDISClusterClient::calculateNodeMap() (void) UREDISClient_Base::processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); + UHashMap newNodes; const UVector& rawNodes = UREDISClient_Base::vitem; - for (uint32_t a = 0, b = rawNodes.size(); a < b; ++a) - { + for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2) + { + const UString& first = rawNodes[a]; + const UString& second = rawNodes[a+1]; + if (findHashSlots) + { + if (first.isNumber() && second.isNumber()) { - if (rawNodes[a].isNumber() && - rawNodes[a+1].isNumber()) - { - workingLowHashSlot = rawNodes[a++].strtoul(); - workingHighHashSlot = rawNodes[a].strtoul(); + workingLowHashSlot = first.strtoul(); + workingHighHashSlot = second.strtoul(); findHashSlots = false; - } - } - else - { - // the immediate next after hash slot is the master - - RedisNode workingNode; - - workingNode.lowHashSlot = workingLowHashSlot; - workingNode.highHashSlot = workingHighHashSlot; - - (void) workingNode.ipAddress.replace(rawNodes[a]); - - workingNode.port = rawNodes[++a].strtoul(); - - workingNode.client.connect(workingNode.ipAddress.data(), workingNode.port); - - redisNodes.push_back(std::move(workingNode)); - - findHashSlots = true; } } + else + { + // first node in the array is the master + + UString compositeAddress(50U); + compositeAddress.snprintf(U_CONSTANT_TO_PARAM("%v.%v"), first.rep, second.rep); + + RedisNode *workingNode = redisNodes.erase(compositeAddress); + + // in the case of MOVE some nodes will be new, but others we'll already be connected to + if (workingNode) + { + workingNode->lowHashSlot = workingLowHashSlot; + workingNode->highHashSlot = workingHighHashSlot; + } + else workingNode = new RedisNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot); + + newNodes.insert(compositeAddress, workingNode); + + workingNode = newNodes[compositeAddress]; + + findHashSlots = true; + } + } + + // if any nodes were taken offline, the clients would've disconnected by default + redisNodes.assign(newNodes); } bool UREDISClusterClient::connect(const char* host, unsigned int _port) @@ -664,17 +646,12 @@ bool UREDISClusterClient::connect(const char* host, unsigned int _port) if (UREDISClient::connect(host, _port)) { calculateNodeMap(); + + // self handles the SUB/PUB traffic. Must be a dedicated client pre-Redis 6 + UEventFd::op_mask |= EPOLLET; + UEventFd::op_mask &= ~EPOLLRDHUP; - // select random master node to be responsible for SUB/PUB traffic - - const RedisNode& node = redisNodes[u_get_num_random_range0(redisNodes.size())]; - - subscriptionClient.connect(node.ipAddress.data(), node.port); - - subscriptionClient.UEventFd::op_mask |= EPOLLET; - subscriptionClient.UEventFd::op_mask &= ~EPOLLRDHUP; - - UNotifier::insert(&subscriptionClient, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel... + UNotifier::insert(this, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel... U_RETURN(true); } @@ -686,7 +663,7 @@ bool UREDISClusterClient::clusterUnsubscribe(const UString& channel) // unregist { U_TRACE(0, "UREDISClusterClient::clusterUnsubscribe(%V)", channel.rep) - if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) + if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) { if (pchannelCallbackMap == U_NULLPTR) { @@ -705,7 +682,7 @@ bool UREDISClusterClient::clusterSubscribe(const UString& channel, vPFcscs callb { U_TRACE(0, "UREDISClusterClient::clusterSubscribe(%V,%p)", channel.rep, callback) - if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) + if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) { if (pchannelCallbackMap == U_NULLPTR) { @@ -765,6 +742,9 @@ void UREDISClusterClient::processResponse() } } +template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); +template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); + template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, const bool reorderable) { diff --git a/tests/examples/TSA/tsaserial b/tests/examples/TSA/tsaserial index 31493878..c60ea4b8 100644 --- a/tests/examples/TSA/tsaserial +++ b/tests/examples/TSA/tsaserial @@ -1 +1 @@ -05C5 +05CB