From b440d80be1f1225613bd60577f258e9f0b5976b2 Mon Sep 17 00:00:00 2001 From: Victor Stewart Date: Mon, 24 Feb 2020 16:57:03 -0500 Subject: [PATCH] Update redis.h --- include/ulib/net/client/redis.h | 907 +++++++++++++++++++++++--------- 1 file changed, 644 insertions(+), 263 deletions(-) diff --git a/include/ulib/net/client/redis.h b/include/ulib/net/client/redis.h index c0a245f1..97197ee7 100644 --- a/include/ulib/net/client/redis.h +++ b/include/ulib/net/client/redis.h @@ -806,7 +806,63 @@ protected: } void init(); - void processResponse(); + + static void parseResponse(UString& response, UVector& parsed, size_t index, size_t terminalIndex) + { + U_TRACE(0, "UREDISClient_Base::parseResponse(index = %lu, terminalIndex = %lu)", index, terminalIndex); + + const char *ptr1, *ptr2, *pend = response.c_pointer(terminalIndex); + + ptr1 = (ptr2 = response.c_pointer(index)); + + while (ptr2 < pend) + { + while (*ptr2 != '\r') ++ptr2; + + switch (*ptr1++) + { + // :0\r\n + case U_RC_INT: + // -Error message\r\n + case U_RC_ERROR: + // +OK\r\n + case U_RC_INLINE: + { + parsed.push_back(response.substr(ptr1, ptr2 - ptr1)); + break; + } + case U_RC_BULK: + { + // $-1\r\n (Null Bulk String) + if (ptr1[0] == '-') parsed.push_back(UString::getStringNull()); + else + { + size_t length = u_strtoul(ptr1, ptr2); + parsed.push_back(response.substr((ptr2 += U_CONSTANT_SIZE(U_CRLF)), length)); + ptr2 += length; + } + + break; + } + // *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n + case U_RC_MULTIBULK: + // never + default: + break; + } + + ptr1 = (ptr2 += U_CONSTANT_SIZE(U_CRLF)); + } + + //U_DUMP_CONTAINER(parsed); + } + + void processResponse() + { + vitem.clear(); + parseResponse(response, vitem, 0, response.size()); + } + bool processRequest(char recvtype); #if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__) @@ -815,6 +871,8 @@ protected: bool sendRequest(const UString& pipeline) #endif { + U_TRACE_NO_PARAM(0, "UREDISClient_Base::sendRequest()"); + UClient_Base::iov[0].iov_base = (caddr_t)pipeline.data(); UClient_Base::iov[0].iov_len = pipeline.size(); UClient_Base::iov[1].iov_base = (caddr_t)U_CRLF; @@ -981,236 +1039,16 @@ private: #if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__) -class U_EXPORT UREDISClusterClient : public UREDISClient, public UEventFd { -public: - - enum class ClientType : uint8_t { - - subscription, - management, - node - }; - - const ClientType type; - UREDISClusterMaster *master; - - virtual int handlerRead() U_DECL_FINAL; - - virtual void handlerDelete() U_DECL_FINAL - { - U_TRACE_NO_PARAM(0, "UREDISClusterClient::handlerDelete()") - - U_INTERNAL_DUMP("UREDISClusterClient::handlerDelete() -> client = %p", this); - U_INTERNAL_DUMP("UEventFd::fd = %d", UEventFd::fd) - - UEventFd::fd = -1; - } - - UREDISClusterClient(UREDISClusterMaster *_master, const ClientType _type) : UREDISClient(), type(_type), master(_master) {} - -#if defined(U_STDCPP_ENABLE) && defined(DEBUG) - const char* dump(bool _reset) const { return UREDISClient_Base::dump(_reset); } -#endif -}; - -struct RedisClusterNode { - - U_MEMORY_TEST - U_MEMORY_ALLOCATOR - U_MEMORY_DEALLOCATOR - - UString ipAddress; - UREDISClusterClient *client; - uint16_t port, lowHashSlot, highHashSlot; - - RedisClusterNode(UREDISClusterMaster *master, const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) - { - U_NEW(UREDISClusterClient, client, UREDISClusterClient(master, UREDISClusterClient::ClientType::node)); - client->setHostPort(ipAddress, _port); - client->connect(ipAddress.c_str(), port); - } - -#if defined(DEBUG) - const char* dump(bool _reset) const { return ""; } -#endif -}; - -enum class ClusterError : uint8_t { - none, - moved, - ask, - tryagain -}; - -class AnonymousClusterPipeline; - -class U_EXPORT UREDISClusterMaster { -private: - - friend class AnonymousClusterPipeline; - friend class UREDISClusterClient; - - UREDISClusterClient *subscriptionClient; - UREDISClusterClient *managementClient; - UHashMap *clusterNodes; - UHashMap* pchannelCallbackMap; - - static uint16_t hashslotForKey(UStringType&& hashableKey) {return u_crc16(U_STRING_TO_PARAM(hashableKey)) % 16384;} - - UREDISClusterClient* clientForHashslot(uint16_t hashslot) - { - for (UHashMapNode *node : *clusterNodes) - { - RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem); - - if ((workingNode->lowHashSlot <= hashslot) && (workingNode->highHashSlot >= hashslot)) - { - return workingNode->client; - } - } - - return U_NULLPTR; // never reached - } - - UREDISClusterClient* clientForIP(const UString& ip) - { - for (UHashMapNode *node : *clusterNodes) - { - RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem); - - if (ip == workingNode->ipAddress) return workingNode->client; - } - - return U_NULLPTR; // never reached - } - - template - UREDISClusterClient* clientForHashableKey(A&& hashableKey) { return clientForHashslot(hashslotForKey(std::forward(hashableKey)));} - - // this might delete cluster nodes so be careful of client pointers after - void calculateNodeMap(); - - static ClusterError checkResponseForClusterErrors(const UString& response, size_t offset); - - template - UREDISClusterClient* sendToCluster(uint16_t hashslot, UStringType&& pipeline, UREDISClusterClient* workingClient) - { - U_TRACE_NO_PARAM(0, "UREDISClusterMaster::sendToCluster"); - - U_DUMP("pipeline = %.*s", pipeline.size(), pipeline.data()); - - ClusterError error; - - retry: - - workingClient->response.setEmpty(); - workingClient->vitem.clear(); - - workingClient->sendRequest(pipeline); - workingClient->readResponse(U_SINGLE_READ); - - error = checkResponseForClusterErrors(workingClient->response, 0); - - while (error != ClusterError::none) - { - switch (error) - { - case ClusterError::moved: - { - calculateNodeMap(); - workingClient = clientForHashslot(hashslot); - break; - } - case ClusterError::ask: - { - uint32_t _start = workingClient->response.find(' ', U_CONSTANT_SIZE("-ASK 3999")) + 1, - end = workingClient->response.find(':', _start); - - workingClient = clientForIP(workingClient->response.substr(_start, end - _start)); - break; - } - case ClusterError::tryagain: - { - UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms - break; - } - case ClusterError::none: break; - } - - goto retry; - } - - if constexpr (!psuedoSilence) workingClient->processResponse(); - - return workingClient; - } - - template - inline UREDISClusterClient* routeToCluster(A&& hashableKey, B&& pipeline) - { - uint16_t hashslot = UREDISClusterMaster::hashslotForKey(std::forward(hashableKey)); - return sendToCluster(hashslot, std::forward(pipeline), clientForHashslot(hashslot)); - } - - public: - - U_MEMORY_TEST - U_MEMORY_ALLOCATOR - U_MEMORY_DEALLOCATOR - - bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379); - - template - const UString clusterSingle(A&& hashableKey, B&& pipeline) {return routeToCluster(std::forward(hashableKey), std::forward(pipeline))->vitem[0];} - - // both of these multis require all keys to exist within a single hash slot (on the same node isn't good enough) - template - const UVector& clusterMulti(A&& hashableKey, B&& pipeline) {return routeToCluster(std::forward(hashableKey), std::forward(pipeline))->vitem;} - -// these are "psuedo-silenced", aka we wait on responses to ensure no cluster errors, but don't waste resources processing the responses - template - void clusterSilencedSingle(A&& hashableKey, B&& pipeline) {routeToCluster(std::forward(hashableKey), std::forward(pipeline));} - template - void clusterSilencedMulti(A&& hashableKey, B&& pipeline) {routeToCluster(std::forward(hashableKey), std::forward(pipeline));} - - // if reorderable == false, commands are grouped and pushed SEQUENTIALLY BY HASHSLOT. even if other commands point to hashslots on the same cluster node, we are unable to garuntee ordering since Redis only checks for -MOVED etc errors command by command as it executes them, and does not fail upon reaching a -MOVED etc error. this requires waiting for each response, to ensure no errors occured, before moving onto the next batch of commands. - - // if reorderable == true, we are able to group and push commands BY NODE regardless of sequence. we assume the fast-path 99.999% occurance that -MOVED and other errors did not occur, and push commands to redis as rapidly as possible without waiting on responses. we same a copy of all commands, and then at the end, check for -MOVED or other errors, and correct those if need be, while we process all resposnes. - const UVector& clusterAnonMulti(const AnonymousClusterPipeline& pipeline, bool reorderable); - - void clusterUnsubscribe(const UString& channel); - void clusterSubscribe( const UString& channel, vPFcscs callback); - - UREDISClusterMaster() - { - clusterNodes = U_NULLPTR; - U_NEW(UREDISClusterClient, managementClient, UREDISClusterClient(this, UREDISClusterClient::ClientType::management)); - U_NEW(UREDISClusterClient, subscriptionClient, UREDISClusterClient(this, UREDISClusterClient::ClientType::subscription)); - } - - ~UREDISClusterMaster() - { - U_DELETE(subscriptionClient); - U_DELETE(managementClient); - if (clusterNodes) U_DELETE(clusterNodes); - if (pchannelCallbackMap) U_DELETE(pchannelCallbackMap); - } - -#if defined(DEBUG) - const char* dump(bool _reset) const { return subscriptionClient->UREDISClient_Base::dump(_reset); } -#endif -}; - class UCompileTimeRESPEncoder : public UCompileTimeStringFormatter { private: #ifdef GCC_IS_GNU -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-but-set-parameter" + #pragma GCC diagnostic push + #pragma GCC diagnostic ignored "-Wunused-but-set-parameter" #endif - template - static constexpr auto generateSegments(StringClass format, size_t& outputCount, std::tuple&& workingCommand, T&& t, Ts&&... ts) + template + static constexpr auto generateSegments(StringClass format, size_t& outputCount, std::tuple&& workingCommand, std::tuple&& workingSegment, size_t workingSegmentLength, T&& t, Ts&&... ts) { constexpr size_t segmentStart = StringClass::instance.find(workingIndex, " "_ctv, StringClass::notChars); @@ -1224,7 +1062,7 @@ private: else { constexpr auto segmentCountString = "*"_ctv + integerToString() + "\r\n"_ctv; - constexpr size_t nextCommand = StringClass::instance.find(segmentStart, " \r\n"_ctv, StringClass::notChars); + constexpr size_t nextCommand = StringClass::instance.find(segmentStart + 1, " \r\n"_ctv, StringClass::notChars); outputCount += 1; @@ -1232,7 +1070,8 @@ private: { return std::apply([&] (auto... params) { - return generateSegments(format, outputCount, std::tuple(), std::forward(t), std::forward(ts)..., segmentCountString, params...); + // this murks length readings + return generateSegments(format, outputCount, std::tuple(), std::tuple(), 0, std::forward(t), std::forward(ts)..., segmentCountString, params...); }, workingCommand); } @@ -1241,35 +1080,44 @@ private: } else { - constexpr size_t segmentEnd = StringClass::instance.find(segmentStart, " \r"_ctv); - constexpr size_t formatStart = StringClass::instance.find(segmentStart, "{"_ctv, StringClass::skipDoubles, segmentEnd); + // this will find the same segment end each time, even though the segment start will change if multiple formats + constexpr size_t segmentEnd = StringClass::instance.find(segmentStart + 1, " \r"_ctv, 0, StringClass::length); + constexpr size_t formatStart = StringClass::instance.find(segmentStart, "{}"_ctv, StringClass::matchWholeString, segmentEnd); + // there is at least one format in the segment if constexpr (formatStart < segmentEnd) { constexpr size_t formatTermination = formatStart + 1; - constexpr size_t lengthSurplus = (segmentEnd + formatStart) - (segmentStart + formatTermination) - 1; - - // some ocassional bug where the length surplus ctor seems to pull references and the first number value in overflowing, so bypass it for now - return generateSegments(format, outputCount, std::tuple_cat(workingCommand, std::make_tuple("$"_ctv, getLength(t) + lengthSurplus, "\r\n"_ctv, StringClass::instance.template substr(), std::forward(t), StringClass::instance.template substr<(std::min(formatTermination + 1, segmentEnd)), segmentEnd>() + "\r\n"_ctv)), std::forward(ts)...); - } + constexpr size_t nextSubsegment = StringClass::instance.find(formatTermination + 1, "{}"_ctv, StringClass::matchWholeString, segmentEnd); + + if constexpr (nextSubsegment < segmentEnd) + { + constexpr size_t lengthSurplus = nextSubsegment - segmentStart - 2; + + return generateSegments(format, outputCount, std::tuple_cat(workingCommand), std::tuple_cat(workingSegment, std::make_tuple(StringClass::instance.template substr(), std::forward(t), StringClass::instance.template substr<(std::min(formatTermination + 1, nextSubsegment)), nextSubsegment>())), getLength(t) + lengthSurplus + workingSegmentLength, std::forward(ts)...); + } + else + { + constexpr size_t lengthSurplus = segmentEnd - segmentStart - 2; // -2 for the tag length, segmentEnd will always overshoot by 1 so that factors the 0 index + return generateSegments(format, outputCount, std::tuple_cat(workingCommand, std::make_tuple("$"_ctv, getLength(std::forward(t)) + lengthSurplus, "\r\n"_ctv), workingSegment, std::tuple(StringClass::instance.template substr(), std::forward(t), StringClass::instance.template substr<(std::min(formatTermination + 1, segmentEnd)), segmentEnd>() + "\r\n"_ctv)), std::tuple(), 0, std::forward(ts)...); + } + } + // no format at all in the segment else { constexpr auto segmentString = "$"_ctv + integerToString() + "\r\n"_ctv + StringClass::instance.template substr() + "\r\n"_ctv; - return generateSegments(format, outputCount, std::tuple_cat(workingCommand, std::tie(segmentString)), std::forward(t), std::forward(ts)...); + return generateSegments(format, outputCount, std::tuple_cat(workingCommand, std::tie(segmentString)), std::tuple(), 0, std::forward(t), std::forward(ts)...); } } } - #ifdef GCC_IS_GNU -#pragma GCC diagnostic pop + #pragma GCC diagnostic pop #endif template static size_t encode_impl(size_t writePosition, UString& workingString, Ts&&... ts) - { - U_TRACE_NO_PARAM(0, "UCompileTimeRESPEncoder::encode_impl()"); - + { // if partial will output segment count // if full, will output command count size_t count = 0; @@ -1278,7 +1126,7 @@ private: UCompileTimeStringFormatter::snprintf_impl(writePosition, workingString, params...); - }, generateSegments(format, count, std::tuple(), std::forward(ts)..., ""_ctv)); + }, generateSegments(format, count, std::tuple(), std::tuple(), 0, std::forward(ts)..., ""_ctv)); return count; } @@ -1302,6 +1150,7 @@ public: template static size_t encode_add(UString& workingString, Ts&&... ts) { + //U_TRACE_NO_PARAM(0, "encode_add"); return encode_impl(workingString.size(), workingString, std::forward(ts)...); } @@ -1330,7 +1179,101 @@ public: } }; -class AnonymousClusterPipeline { +enum class RedisOptions : uint8_t { + + one = 0b0000'0001, // const UString& + many = 0b0000'0010, // const UVector& + + //these are "psuedo-silenced", aka we wait on responses to ensure no cluster errors, but don't waste resources processing the responses + silenced = 0b0000'0100, // void + reorderable = 0b0000'1000, + + copy = 0b0001'0000 // if data needs to persist through subsequent Redis calls, at the cost of copy operation +}; + +constexpr RedisOptions operator |(RedisOptions lhs, RedisOptions rhs) +{ + using underlying = typename std::underlying_type::type; + return static_cast + ( + static_cast(lhs) | + static_cast(rhs) + ); +} + +constexpr bool operator &(RedisOptions lhs, RedisOptions rhs) +{ + using underlying = typename std::underlying_type::type; + return static_cast + ( + static_cast(lhs) & + static_cast(rhs) + ); +} + +static uint16_t hashslotForKey(UStringType&& hashableKey) +{ + return u_crc16(U_STRING_TO_PARAM(hashableKey)) % 16384; +} + +class RedisClusterPipeline { +private: + + friend class UREDISClusterMaster; + + uint16_t hashslot; + size_t commandCount; + UString pipeline; + +public: + + size_t size() + { + return pipeline.size(); + } + + void setEmpty() + { + commandCount = 0; + pipeline.setEmpty(); + } + + void setHashslot(UStringType&& hashableKey) + { + hashslot = hashslotForKey(std::forward(hashableKey)); + } + + void append(const UString& command, uint8_t count) + { + commandCount += count; + pipeline.reserve(pipeline.size() + command.size()); + pipeline.append(command); + } + + template + void append(Ts&&... ts) + { + commandCount += UCompileTimeRESPEncoder::encode_add(pipeline, std::forward(ts)...); + } + + RedisClusterPipeline() : pipeline(300U) {} +}; + +enum class RedisClusterError : uint8_t { + none, + moved, + ask, + tryagain +}; + +struct RedisReadReport { + + size_t start, end; + RedisClusterError error; + USocket *socketAfterError; +}; + +class RedisClusterMultiPipeline { private: friend class UREDISClusterMaster; @@ -1338,59 +1281,497 @@ private: struct Span { uint8_t commandCount; - int16_t hashslot; + uint16_t hashslot; size_t beginning, end, index; + RedisReadReport report; Span(uint8_t _commandCount, uint16_t _hashslot, size_t _beginning, size_t _end, size_t _index) : commandCount(_commandCount), hashslot(_hashslot), beginning(_beginning), end(_end), index(_index) {} }; -public: - UString pipeline; std::vector spans; - - size_t size() - { - return pipeline.size(); - } - + +public: + void setEmpty() { pipeline.setEmpty(); spans.clear(); } + + size_t size() const + { + return pipeline.size(); + } template - void append(A&& hashableKey, const UString& command, uint8_t commandCount = 1) + void append(A&& hashableKey, const UString& command, uint8_t commandCount) { - U_TRACE_NO_PARAM(0, "AnonymousClusterPipeline::append(ustring)") - size_t beginning = pipeline.size(); pipeline.reserve(pipeline.size() + command.size()); pipeline.append(command); - spans.emplace_back(commandCount, UREDISClusterMaster::hashslotForKey(std::forward(hashableKey)), beginning, pipeline.size(), spans.size()); - - U_DUMP("appended, %.*s", pipeline.size() - beginning, pipeline.data()); + spans.emplace_back(commandCount, hashslotForKey(std::forward(hashableKey)), beginning, pipeline.size(), spans.size()); } template void append(A&& hashableKey, Ts&&... ts) { - U_TRACE_NO_PARAM(0, "AnonymousClusterPipeline::append(variadic)") - size_t beginning = pipeline.size(); size_t commandCount = UCompileTimeRESPEncoder::encode_add(pipeline, std::forward(ts)...); - spans.emplace_back(commandCount, UREDISClusterMaster::hashslotForKey(std::forward(hashableKey)), beginning, pipeline.size(), spans.size()); - - U_DUMP("appended, %.*s", pipeline.size() - beginning, pipeline.data()); + spans.emplace_back(commandCount, hashslotForKey(std::forward(hashableKey)), beginning, pipeline.size(), spans.size()); } - AnonymousClusterPipeline() : pipeline(300U) {} + RedisClusterMultiPipeline() : pipeline(300U) {} +}; + +class U_EXPORT UREDISClusterMaster : public UEventFd { +private: + + struct RedisClusterNode { + + U_MEMORY_TEST + U_MEMORY_ALLOCATOR + U_MEMORY_DEALLOCATOR + + UString ipAddress; + USocket *socket; + uint16_t port, lowHashSlot, highHashSlot; + + RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : ipAddress(U_STRING_FROM_CONSTANT("3.3.0.3")), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) + { + U_NEW(USocket, socket, USocket); + socket->connectServer(ipAddress, port, 1000); + } + + #if defined(DEBUG) + const char* dump(bool _reset) const { return ""; } + #endif + }; + + friend class RedisClusterMultiPipeline; + + UString workingString, subscriptionString; + UVector parsed; + + USocket *subscriptionSocket; + USocket *managementSocket; + UHashMap *clusterNodes; + // speed at the cost of memory, worth it + std::unordered_map hashslotToSocket; + UHashMap* pchannelCallbackMap; + + virtual int handlerRead() U_DECL_FINAL; + + template + USocket* socketForHashableKey(A&& hashableKey) const { return hashslotToSocket[hashslotForKey(std::forward(hashableKey))]; } + + // this might delete cluster nodes so be careful of client pointers after + void cloneClusterTopology(); + + // inline-able when in header, so we can achieve generic-ness without paying cost of stack build up and teardown over each function call + + RedisReadReport read(USocket* socket, size_t marker, size_t depth) + { + RedisReadReport report; + report.start = marker; + report.error = RedisClusterError::none; + + const char *pointer1, *pointer2, *pend = workingString.pend(); + pointer1 = pointer2 = workingString.c_pointer(marker); + + auto readAndRevalidatePointers = [&] (void) -> void { + + size_t index1 = pointer1 - workingString.data(); + size_t index2 = pointer2 - workingString.data(); + USocketExt::read(socket, workingString, U_SINGLE_READ, 1000); + pointer1 = workingString.c_pointer(index1); + pointer2 = workingString.c_pointer(index2); + pend = workingString.pend(); + }; + + do + { + if (pointer1 >= pend) readAndRevalidatePointers(); + + while (*pointer2 != '\r') + { + // no knowing where the TCP packets might have been sliced + if (UNLIKELY(++pointer2 == pend)) readAndRevalidatePointers(); + } + + switch (*pointer1++) + { + // :0\r\n + case U_RC_INT: + // +OK\r\n + case U_RC_INLINE: + case U_RC_ERROR: + { + // only handle the error if the depth left is 1.... so that it touches the last error.... + if (depth == 1) + { + size_t prefixAt = (pointer1 - 1 - workingString.data()); + + // -MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node + if (workingString.find("-MOVED", prefixAt, 6) != U_NOT_FOUND) report.error = RedisClusterError::moved; + + // -ASK 3999 127.0.0.1:6381 => this means that one of the hash slots is being migrated to another server + else if (workingString.find("-ASK", prefixAt, 4) != U_NOT_FOUND) + { + // every ASK must be replied to as ASKING \r\n GET {ABC}.a \r\n + // even if the migration has completed by the time this is received... we still gucci + + size_t ipStart = workingString.find(' ', prefixAt + 5) + 1; + size_t ipEnd = workingString.find(':', ipStart); + + const UString& ip = workingString.substr(ipStart, ipEnd - ipStart); + const UString& port = workingString.substr(ipEnd + 1, workingString.find('\r', ipEnd) - ipEnd - 1); + + UString compositeAddress(30U); + UCompileTimeStringFormatter::snprintf<"{}.{}"_ctv>(compositeAddress, ip, port); + + RedisClusterNode *workingNode = clusterNodes->at(compositeAddress); + + // this will happen if the new node we're pointed to is a new node that's migrating hashslots to itself + if (UNLIKELY(!workingNode)) + { + // max Redis hashslot is 16384, so these won't get confused... and they'll be corrected once we receive a -MOVED and reclone the topology + U_NEW(RedisClusterNode, workingNode, RedisClusterNode(ip, port.strtoul(), 17000, 17000)); + clusterNodes->insert(compositeAddress, workingNode); + } + + report.socketAfterError = workingNode->socket; + report.error = RedisClusterError::ask; + } + + // during a resharding the multi-key operations targeting keys that all exist and are all still in the same node (either the source or destination node) are still available. Operations on keys that don't exist or are - during the resharding - split between the source and destination nodes, will generate a -TRYAGAIN error. The client can try the operation after some time, or report back the error. As soon as migration of the specified hash slot has terminated, all multi-key operations are available again for that hash slot + else if (workingString.find("-TRYAGAIN", prefixAt, 9) != U_NOT_FOUND) report.error = RedisClusterError::tryagain; + } + break; + } + // $-1\r\n + // $15\r\nmy-value-tester\r\n + case U_RC_BULK: + { + if (*pointer1 != '-') pointer2 += (u_strtol(pointer1, pointer2) + U_CONSTANT_SIZE(U_CRLF)); + + // in cases of very large data, redis will break up the response into multiple packets... + // and event might trigger before all were read into the response buffer + while ((pointer2 + U_CONSTANT_SIZE(U_CRLF)) > pend) readAndRevalidatePointers(); + + break; + } + // *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n + case U_RC_MULTIBULK: + { + depth += u_strtoul(pointer1, pointer2); + break; + } + default: + break; + } + + --depth; + + pointer1 = (pointer2 += U_CONSTANT_SIZE(U_CRLF)); + + } while (depth > 0); + + report.end = pointer2 - workingString.data();; + + return report; + } + + bool handleErrors(RedisReadReport& report, uint16_t hashslot, UStringType&& pipeline, size_t commandCount, bool skipRecloning = false) + { + bool recloned = skipRecloning; + + do + { + switch (report.error) + { + case RedisClusterError::moved: + { + if (!skipRecloning) + { + cloneClusterTopology(); + recloned = true; + } + + skipRecloning = false; + report.socketAfterError = hashslotToSocket[hashslot]; + } + case RedisClusterError::ask: + case RedisClusterError::tryagain: + { + USocketExt::write(report.socketAfterError, U_STRING_TO_PARAM(pipeline), 1000); + } + default: + break; + } + + report = read(report.socketAfterError, workingString.size(), commandCount); + + } while (report.error != RedisClusterError::none); + + return recloned; + } + + // we need the command count so that we know how many responses to ensure we read back + template + void talkToCluster(uint16_t hashslot, A&& pipeline, size_t commandCount) + { + U_TRACE(0, "UREDISClusterMaster::talkToCluster(commandCount = %lu)", commandCount); + + U_DUMP("talkToCluster -> pipeline = %.*s", pipeline.size() > 500 ? 500 : pipeline.size(), pipeline.data()); + + USocket* workingSocket = hashslotToSocket[hashslot]; + + // pipeline is either workingString, the string of a pipeline object, or a string that was fed in (which could be a compile time string) + + size_t pipelineLength = (pipeline.data() == workingString.data()) ? pipeline.size() : 0; + // U_DUMP("pipelineLength = %lu", pipelineLength); + USocketExt::write(workingSocket, U_STRING_TO_PARAM(pipeline), 1000); + + RedisReadReport report = read(workingSocket, workingString.size(), commandCount); + + // if there are errors... they'll have overwritten our pipeline...rrrr + if (UNLIKELY(report.error != RedisClusterError::none)) + { + if constexpr (is_ctv_v) + { + handleErrors(report, hashslot, pipeline, commandCount, false); + } + else handleErrors(report, hashslot, pipelineLength ? pipeline.substr(pipeline.data(), pipelineLength) : pipeline, commandCount, false); + } + + if constexpr (!(options & RedisOptions::silenced)) UREDISClient_Base::parseResponse(workingString, parsed, pipelineLength, workingString.size()); + } + + template + const decltype(auto) handleReturn() + { + if (workingString.size()) workingString.setEmpty(); + + if constexpr (options & RedisOptions::one) + { + if (parsed.size()) + { + if constexpr (options & RedisOptions::copy) + { + return parsed[0].copy(); + } + else + { + return parsed[0]; + } + } + else + { + return UString::getStringNull().copy(); + } + } + else if constexpr (options & RedisOptions::many) + { + if constexpr (options & RedisOptions::copy) + { + return UVector(parsed); + } + else + { + return (parsed); + } + } + } + + // options + // + // 1) we could make the policy such that if you don't copy, then your response WILL ALWAYS get erased upon the next call + // 2) MIGHT get deleted at any future time.... that isn't really useful though because you can't reason about it + + template + const decltype(auto) routeToCluster(uint16_t hashslot, A&& pipeline, size_t commandCount) + { + U_TRACE_NO_PARAM(0, "RedisCluterMaster::routeToCluster"); + + // if return not copied, garaunteed to be cleared upon next call + parsed.clear(); + + // we could let these send + process we dont care + talkToCluster(hashslot, std::forward(pipeline), commandCount); + + return handleReturn(); + } + +public: + + U_MEMORY_TEST + U_MEMORY_ALLOCATOR + U_MEMORY_DEALLOCATOR + + bool connect(const UString& host, uint16_t port); + + // clusterSingle -> all commands must target a single hashslot + // clusterMulti -> commands can target many hashslots over many nodes + + template + const decltype(auto) clusterSingle(A&& hashableKey, B&& pipeline, size_t commandCount) + { + U_TRACE_NO_PARAM(0, "RedisCluterMaster::clusterSingle(ustring)") + + return routeToCluster(hashslotForKey(std::forward(hashableKey)), std::forward(pipeline), commandCount); + } + + // if we ever clear the workingString, then anything that wasn't "copied" will be cleared upon resetting the workingString... + + template + const decltype(auto) clusterSingle(const RedisClusterPipeline& pipeline) + { + U_TRACE_NO_PARAM(0, "RedisCluterMaster::clusterSingle(pipeline)") + + return routeToCluster(pipeline.hashslot, pipeline.pipeline, pipeline.commandCount); + } + + template + const decltype(auto) clusterSingle(A&& hashableKey, Ts&&... ts) + { + U_TRACE_NO_PARAM(0, "RedisCluterMaster::clusterSingle(variadic)") + + return routeToCluster(hashslotForKey(std::forward(hashableKey)), workingString, UCompileTimeRESPEncoder::encode(workingString, std::forward(ts)...)); + } + + template + const decltype(auto) clusterMulti(RedisClusterMultiPipeline& pipeline) + { + U_TRACE_NO_PARAM(0, "UREDISClusterMaster::clusterMulti"); + + U_DUMP("clusterMulti -> pipeline = %.*s", pipeline.pipeline.size() > 500 ? 500 : pipeline.pipeline.size(), pipeline.pipeline.data()); + + // if return not copied, garaunteed to be cleared upon next call + parsed.clear(); + if (workingString.size()) workingString.setEmpty(); + workingString.reserve(pipeline.size()); + + // if reorderable == true, we are able to group and push commands BY NODE regardless of sequence. we assume the fast-path 99.999% occurance that -MOVED and other errors did not occur, and push commands to redis as rapidly as possible without waiting on responses. we same a copy of all commands, and then at the end, check for -MOVED or other errors, and correct those if need be, while we process all resposnes. + if constexpr (options & RedisOptions::reorderable) + { + U_DUMP("reorderable == true"); + + std::sort(pipeline.spans.begin(), pipeline.spans.end(), [] (const auto& a, const auto& b) { return a.hashslot > b.hashslot; }); + + // write + + USocket *workingSocket; + auto it = pipeline.spans.begin(); + + do + { + workingSocket = hashslotToSocket[it->hashslot]; + + do + { + workingString.append(pipeline.pipeline.data() + it->beginning, it->end - it->beginning); + } + while (++it != pipeline.spans.end() && workingSocket == hashslotToSocket[it->hashslot]); + + USocketExt::write(workingSocket, U_STRING_TO_PARAM(workingString), 1000); + workingString.setEmpty(); + + } while (it != pipeline.spans.end()); + + + // read + + it = pipeline.spans.begin(); + size_t marker = 0; + + do + { + workingSocket = hashslotToSocket[it->hashslot]; + + do + { + it->report = read(workingSocket, marker, it->commandCount); + marker = it->report.end; + + } while (++it != pipeline.spans.end() && workingSocket == hashslotToSocket[it->hashslot]); + + } while (it != pipeline.spans.end()); + + // map in responses + + std::sort(pipeline.spans.begin(), pipeline.spans.end(), [] (const auto& a, const auto& b) { return a.index > b.index; }); + + bool recloned = false; + + for (auto& span : pipeline.spans) + { + if (UNLIKELY(span.report.error != RedisClusterError::none)) + { + recloned = handleErrors(span.report, span.hashslot, pipeline.pipeline.substr(span.beginning, span.end), span.commandCount, recloned); + } + + if constexpr (!(options & RedisOptions::silenced)) UREDISClient_Base::parseResponse(workingString, parsed, span.report.start, span.report.end); + } + } + // if reorderable == false, commands are grouped and pushed SEQUENTIALLY BY HASHSLOT. even if other commands point to hashslots on the same cluster node, we are unable to garuntee ordering since Redis only checks for -MOVED etc errors command by command as it executes them, and does not fail upon reaching a -MOVED etc error. this requires waiting for each response, to ensure no errors occured, before moving onto the next batch of commands. + else + { + U_DUMP("reorderable == false"); + auto it = pipeline.spans.begin(); + + do + { + uint16_t workingHashslot = it->hashslot; + uint16_t workingCommandCount = 0; + + do + { + workingString.append(pipeline.pipeline.data() + it->beginning, it->end - it->beginning); + workingCommandCount += it->commandCount; + + } while (++it != pipeline.spans.end() && workingHashslot == it->hashslot); + + talkToCluster(workingHashslot, workingString, workingCommandCount); + workingString.setEmpty(); + + } while (it != pipeline.spans.end()); + + if constexpr (!(options & RedisOptions::silenced)) + { + UREDISClient_Base::parseResponse(workingString, parsed, 0, workingString.size()); + } + } + + return handleReturn(); + } + + void clusterUnsubscribe(const UString& channel); + void clusterSubscribe( const UString& channel, vPFcscs callback); + + UREDISClusterMaster() + { + clusterNodes = U_NULLPTR; + U_NEW(USocket, managementSocket, USocket); + U_NEW(USocket, subscriptionSocket, USocket); + } + + ~UREDISClusterMaster() + { + U_DELETE(subscriptionSocket); + U_DELETE(managementSocket); + if (clusterNodes) U_DELETE(clusterNodes); + if (pchannelCallbackMap) U_DELETE(pchannelCallbackMap); + } + +#if defined(DEBUG) + const char* dump(bool _reset) const { return ""; } +#endif }; #endif