1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
This commit is contained in:
stefanocasazza 2019-08-27 18:44:35 +02:00
parent bd4e058555
commit 18487034b5
4 changed files with 90 additions and 88 deletions

View File

@ -1044,7 +1044,10 @@ private:
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX11)
template <class T> class UHashMapAnonIter {
public:
explicit UHashMapAnonIter(UHashMap<T*>* m, uint32_t l) : map(m), length(l) {}
explicit UHashMapAnonIter(UHashMap<T*>* m, uint32_t l) : map(m), length(l)
{
if (length == 0) node = map->firstNode();
}
bool operator!=(const UHashMapAnonIter& other) const { return (length != other.length); }
@ -1052,8 +1055,8 @@ public:
UHashMapAnonIter& operator++()
{
node = (length++ ? map->nextNode() : map->firstNode());
length++;
node = map->nextNode();
return *this;
}

View File

@ -822,7 +822,7 @@ protected:
}
void init();
void processResponse();
virtual void processResponse();
bool processRequest(char recvtype);
bool sendRequest(const UString& pipeline)
@ -987,6 +987,18 @@ private:
UString ipAddress;
UREDISClient<UTCPSocket> client;
uint16_t port, lowHashSlot, highHashSlot;
RedisNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) :
ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
{
client.connect(ipAddress.data(), port);
}
// DEBUG
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* dump(bool _reset) const { return ""; }
#endif
};
enum class ClusterError : uint8_t {
@ -998,8 +1010,7 @@ private:
ClusterError error;
UString temporaryASKip;
std::vector<RedisNode> redisNodes;
UREDISClient<UTCPSocket> subscriptionClient;
UHashMap<RedisNode *> redisNodes;
uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); }
@ -1019,22 +1030,26 @@ private:
{
U_TRACE(0, "UREDISClusterClient::clientForHashslot(%u)", hashslot)
for (RedisNode& workingNode : redisNodes)
for (UHashMapNode *node : redisNodes)
{
if ((workingNode.lowHashSlot <= hashslot) || (workingNode.highHashSlot >= hashslot)) return workingNode.client;
RedisNode* workingNode = (RedisNode *)(node->elem);
if ((workingNode->lowHashSlot <= hashslot) || (workingNode->highHashSlot >= hashslot)) return workingNode->client;
}
return redisNodes[0].client;
return *this; // never reached
}
UREDISClient<UTCPSocket>& clientForASKip()
{
for (RedisNode& workingNode : redisNodes)
for (UHashMapNode *node : redisNodes)
{
if (temporaryASKip == workingNode.ipAddress) return workingNode.client;
RedisNode* workingNode = (RedisNode *)(node->elem);
if (temporaryASKip == workingNode->ipAddress) return workingNode->client;
}
return redisNodes[0].client;
return *this; // never reached
}
UREDISClient<UTCPSocket>& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); }
@ -1050,7 +1065,7 @@ public:
U_TRACE_DTOR(0, UREDISClusterClient)
}
void processResponse();
void processResponse() final;
void calculateNodeMap();
bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379);
@ -1087,5 +1102,9 @@ public:
private:
U_DISALLOW_COPY_AND_ASSIGN(UREDISClusterClient)
};
extern template const UVector<UString>& UREDISClusterClient::processPipeline<true>(UString& pipeline, bool reorderable);
extern template const UVector<UString>& UREDISClusterClient::processPipeline<false>(UString& pipeline, bool reorderable);
#endif
#endif

View File

@ -209,13 +209,12 @@ U_NO_EXPORT bool UREDISClient_Base::getResponseItem()
while (*ptr2 != '\r') ++ptr2;
len = ptr2-ptr1;
// U_RC_INLINE example -> +OK\r\n
// U_RC_INT example -> :0\r\n
// U_RC_ERROR example -> -Error message\r\n
if (len != 1 ||
ptr1[0] != '0' ||
prefix != U_RC_INT)
{
pvec->push_back(UClient_Base::response.substr(ptr1, len));
}
pvec->push_back(UClient_Base::response.substr(ptr1, len));
start += len + U_CONSTANT_SIZE(U_CRLF);
@ -294,40 +293,13 @@ U_NO_EXPORT bool UREDISClient_Base::getResponseItem()
U_INTERNAL_ASSERT_EQUALS(prefix, U_RC_MULTIBULK)
UVector<UString> vec1(len);
UVector<UString>* pvec1 = pvec;
pvec = &vec1;
start += (ptr2-ptr1);
for (uint32_t i = 0; i < len; ++i)
{
if (getResponseItem() == false)
{
if (UClient_Base::isConnected() == false)
{
(void) UClient_Base::connect();
U_RETURN(false);
}
pvec1->move(vec1);
}
else
{
typedef UVector<UString> uvectorstring;
char buffer_output[64U * 1024U];
uint32_t buffer_output_len = UObject2String<uvectorstring>(vec1, buffer_output, sizeof(buffer_output));
pvec1->push_back(UStringRep::create(buffer_output_len, buffer_output_len, (const char*)buffer_output));
vec1.clear();
}
getResponseItem();
}
pvec = pvec1;
U_RETURN(true);
}
@ -620,41 +592,51 @@ void UREDISClusterClient::calculateNodeMap()
(void) UREDISClient_Base::processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS"));
UHashMap<RedisNode *> newNodes;
const UVector<UString>& rawNodes = UREDISClient_Base::vitem;
for (uint32_t a = 0, b = rawNodes.size(); a < b; ++a)
{
for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2)
{
const UString& first = rawNodes[a];
const UString& second = rawNodes[a+1];
if (findHashSlots)
{
if (first.isNumber() && second.isNumber())
{
if (rawNodes[a].isNumber() &&
rawNodes[a+1].isNumber())
{
workingLowHashSlot = rawNodes[a++].strtoul();
workingHighHashSlot = rawNodes[a].strtoul();
workingLowHashSlot = first.strtoul();
workingHighHashSlot = second.strtoul();
findHashSlots = false;
}
}
else
{
// the immediate next after hash slot is the master
RedisNode workingNode;
workingNode.lowHashSlot = workingLowHashSlot;
workingNode.highHashSlot = workingHighHashSlot;
(void) workingNode.ipAddress.replace(rawNodes[a]);
workingNode.port = rawNodes[++a].strtoul();
workingNode.client.connect(workingNode.ipAddress.data(), workingNode.port);
redisNodes.push_back(std::move(workingNode));
findHashSlots = true;
}
}
else
{
// first node in the array is the master
UString compositeAddress(50U);
compositeAddress.snprintf(U_CONSTANT_TO_PARAM("%v.%v"), first.rep, second.rep);
RedisNode *workingNode = redisNodes.erase(compositeAddress);
// in the case of MOVE some nodes will be new, but others we'll already be connected to
if (workingNode)
{
workingNode->lowHashSlot = workingLowHashSlot;
workingNode->highHashSlot = workingHighHashSlot;
}
else workingNode = new RedisNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot);
newNodes.insert(compositeAddress, workingNode);
workingNode = newNodes[compositeAddress];
findHashSlots = true;
}
}
// if any nodes were taken offline, the clients would've disconnected by default
redisNodes.assign(newNodes);
}
bool UREDISClusterClient::connect(const char* host, unsigned int _port)
@ -664,17 +646,12 @@ bool UREDISClusterClient::connect(const char* host, unsigned int _port)
if (UREDISClient<UTCPSocket>::connect(host, _port))
{
calculateNodeMap();
// self handles the SUB/PUB traffic. Must be a dedicated client pre-Redis 6
UEventFd::op_mask |= EPOLLET;
UEventFd::op_mask &= ~EPOLLRDHUP;
// select random master node to be responsible for SUB/PUB traffic
const RedisNode& node = redisNodes[u_get_num_random_range0(redisNodes.size())];
subscriptionClient.connect(node.ipAddress.data(), node.port);
subscriptionClient.UEventFd::op_mask |= EPOLLET;
subscriptionClient.UEventFd::op_mask &= ~EPOLLRDHUP;
UNotifier::insert(&subscriptionClient, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel...
UNotifier::insert(this, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel...
U_RETURN(true);
}
@ -686,7 +663,7 @@ bool UREDISClusterClient::clusterUnsubscribe(const UString& channel) // unregist
{
U_TRACE(0, "UREDISClusterClient::clusterUnsubscribe(%V)", channel.rep)
if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel)))
if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{
if (pchannelCallbackMap == U_NULLPTR)
{
@ -705,7 +682,7 @@ bool UREDISClusterClient::clusterSubscribe(const UString& channel, vPFcscs callb
{
U_TRACE(0, "UREDISClusterClient::clusterSubscribe(%V,%p)", channel.rep, callback)
if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel)))
if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{
if (pchannelCallbackMap == U_NULLPTR)
{
@ -765,6 +742,9 @@ void UREDISClusterClient::processResponse()
}
}
template const UVector<UString>& UREDISClusterClient::processPipeline<true>(UString& pipeline, bool reorderable);
template const UVector<UString>& UREDISClusterClient::processPipeline<false>(UString& pipeline, bool reorderable);
template <bool silence>
const UVector<UString>& UREDISClusterClient::processPipeline(UString& pipeline, const bool reorderable)
{

View File

@ -1 +1 @@
05C5
05CB