1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00

update redis

This commit is contained in:
stefanocasazza 2019-09-13 18:45:05 +02:00
parent 369cbf4c8f
commit fc2eb21419
3 changed files with 52 additions and 37 deletions

View File

@ -990,19 +990,30 @@ private:
public: public:
U_MEMORY_TEST
U_MEMORY_ALLOCATOR
U_MEMORY_DEALLOCATOR
void processResponse(); void processResponse();
UREDISClusterClient(UREDISClusterMaster *_master) : UREDISClient<UTCPSocket>(), master(_master) {}
UREDISClusterClient() = delete;
UREDISClusterClient(UREDISClusterMaster *_master) : master(_master) {}
}; };
struct RedisClusterNode { struct RedisClusterNode {
U_MEMORY_TEST
U_MEMORY_ALLOCATOR
U_MEMORY_DEALLOCATOR
UString ipAddress; UString ipAddress;
UREDISClusterClient client; UREDISClusterClient *client;
uint16_t port, lowHashSlot, highHashSlot; uint16_t port, lowHashSlot, highHashSlot;
RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot, UREDISClusterMaster *master) : ipAddress(_ipAddress), client(master), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot, UREDISClusterMaster *master) : ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
{ {
client.connect(ipAddress.c_str(), port); U_NEW(UREDISClusterClient, client, UREDISClusterClient(master));
client->connect(ipAddress.c_str(), port);
} }
#if defined(U_STDCPP_ENABLE) && defined(DEBUG) #if defined(U_STDCPP_ENABLE) && defined(DEBUG)
@ -1020,14 +1031,11 @@ enum class ClusterError : uint8_t {
class U_EXPORT UREDISClusterMaster { class U_EXPORT UREDISClusterMaster {
private: private:
U_MEMORY_ALLOCATOR
U_MEMORY_DEALLOCATOR
friend class UREDISClusterClient; friend class UREDISClusterClient;
ClusterError error; ClusterError error;
UString temporaryASKip; UString temporaryASKip;
UREDISClusterClient subscriptionClient; UREDISClusterClient *subscriptionClient;
UHashMap<RedisClusterNode *> clusterNodes; // when these call they need to be processed... also when MOVED... we need to set up and recalculate UHashMap<RedisClusterNode *> clusterNodes; // when these call they need to be processed... also when MOVED... we need to set up and recalculate
uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); } uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); }
@ -1044,7 +1052,7 @@ private:
return hashslotForKey(command.substr(beginning, end - beginning)); return hashslotForKey(command.substr(beginning, end - beginning));
} }
UREDISClusterClient& clientForHashslot(uint16_t hashslot) UREDISClusterClient* clientForHashslot(uint16_t hashslot)
{ {
U_TRACE(0, "UREDISClusterMaster::clientForHashslot(%u)", hashslot) U_TRACE(0, "UREDISClusterMaster::clientForHashslot(%u)", hashslot)
@ -1058,7 +1066,7 @@ private:
return subscriptionClient; // never reached return subscriptionClient; // never reached
} }
UREDISClusterClient& clientForASKip() UREDISClusterClient* clientForASKip()
{ {
for (UHashMapNode *node : clusterNodes) for (UHashMapNode *node : clusterNodes)
{ {
@ -1070,7 +1078,7 @@ private:
return subscriptionClient; // never reached return subscriptionClient; // never reached
} }
UREDISClusterClient& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } UREDISClusterClient* clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); }
template<bool silence> template<bool silence>
const UVector<UString>& processPipeline(UString& pipeline, bool reorderable); const UVector<UString>& processPipeline(UString& pipeline, bool reorderable);
@ -1079,15 +1087,19 @@ private:
public: public:
U_MEMORY_TEST
U_MEMORY_ALLOCATOR
U_MEMORY_DEALLOCATOR
bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379); bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379);
// all of these multis require all keys to exist within a single hash slot (on the same node isn't good enough) // all of these multis require all keys to exist within a single hash slot (on the same node isn't good enough)
UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).single(pipeline); } UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey)->single(pipeline); }
const UVector<UString>& clusterMulti( const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).multi(pipeline); } const UVector<UString>& clusterMulti( const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey)->multi(pipeline); }
void clusterSilencedMulti( const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey).silencedMulti(pipeline); } void clusterSilencedMulti( const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey)->silencedMulti(pipeline); }
void clusterSilencedSingle(const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey).silencedSingle(pipeline); } void clusterSilencedSingle(const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey)->silencedSingle(pipeline); }
// anon multis are pipelined commands of various keys that might belong to many nodes. always processed in order. Commands always delimined by \r\n // anon multis are pipelined commands of various keys that might belong to many nodes. always processed in order. Commands always delimined by \r\n
// example -> SET {abc}xyz 5 \r\n GET abc{xyz} \r\n SET xyz{abc} 9 \r\n // example -> SET {abc}xyz 5 \r\n GET abc{xyz} \r\n SET xyz{abc} 9 \r\n
@ -1101,10 +1113,13 @@ public:
bool clusterUnsubscribe(const UString& channel); bool clusterUnsubscribe(const UString& channel);
bool clusterSubscribe( const UString& channel, vPFcscs callback); bool clusterSubscribe( const UString& channel, vPFcscs callback);
UREDISClusterMaster() : subscriptionClient(this) {} UREDISClusterMaster()
{
U_NEW(UREDISClusterClient, subscriptionClient, UREDISClusterClient(this));
}
#if defined(U_STDCPP_ENABLE) && defined(DEBUG) #if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* dump(bool _reset) const { return subscriptionClient.UREDISClient_Base::dump(_reset); } const char* dump(bool _reset) const { return subscriptionClient->UREDISClient_Base::dump(_reset); }
#endif #endif
}; };

View File

@ -631,10 +631,10 @@ void UREDISClusterMaster::calculateNodeMap()
uint16_t workingLowHashSlot; uint16_t workingLowHashSlot;
uint16_t workingHighHashSlot; uint16_t workingHighHashSlot;
(void) subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); (void) subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS"));
UHashMap<RedisClusterNode *> newNodes; UHashMap<RedisClusterNode *> newNodes;
const UVector<UString>& rawNodes = subscriptionClient.vitem; const UVector<UString>& rawNodes = subscriptionClient->vitem;
for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2) for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2)
{ {
@ -682,11 +682,11 @@ bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
{ {
U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port) U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port)
if (subscriptionClient.connect(host, _port)) if (subscriptionClient->connect(host, _port))
{ {
calculateNodeMap(); calculateNodeMap();
UServer_Base::addHandlerEvent(&subscriptionClient); // NB: we ask to listen for events to a Redis publish channel... UServer_Base::addHandlerEvent(subscriptionClient); // NB: we ask to listen for events to a Redis publish channel...
U_RETURN(true); U_RETURN(true);
} }
@ -698,9 +698,9 @@ bool UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregist
{ {
U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep) U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep)
if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) if (subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{ {
(void)subscriptionClient.UREDISClient_Base::pchannelCallbackMap->erase(channel); (void)subscriptionClient->UREDISClient_Base::pchannelCallbackMap->erase(channel);
U_RETURN(true); U_RETURN(true);
} }
@ -712,14 +712,14 @@ bool UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callb
{ {
U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback) U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback)
if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) if (subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{ {
if (subscriptionClient.UREDISClient_Base::pchannelCallbackMap == U_NULLPTR) if (subscriptionClient->UREDISClient_Base::pchannelCallbackMap == U_NULLPTR)
{ {
U_NEW(UHashMap<void*>, subscriptionClient.UREDISClient_Base::pchannelCallbackMap, UHashMap<void*>); U_NEW(UHashMap<void*>, subscriptionClient->UREDISClient_Base::pchannelCallbackMap, UHashMap<void*>);
} }
subscriptionClient.UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback); subscriptionClient->UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback);
U_RETURN(true); U_RETURN(true);
} }
@ -754,12 +754,12 @@ const UVector<UString>& UREDISClusterMaster::processPipeline(UString& pipeline,
} }
} }
UREDISClusterClient& client = clientForHashslot(hashslot); UREDISClusterClient* client = clientForHashslot(hashslot);
if constexpr (silence) (void) client.sendRequest(workingString); if constexpr (silence) (void) client->sendRequest(workingString);
else else
{ {
replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); replay: (void) client->processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString));
switch (error) switch (error)
{ {
@ -772,16 +772,16 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
case ClusterError::ask: case ClusterError::ask:
{ {
UREDISClusterClient& temporaryClient = clientForASKip(); UREDISClusterClient* temporaryClient = clientForASKip();
(void) temporaryClient.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); (void) temporaryClient->processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString));
} }
break; break;
case ClusterError::none: break; case ClusterError::none: break;
} }
if constexpr (silence == false) subscriptionClient.vitem.move(client.vitem); if constexpr (silence == false) subscriptionClient->vitem.move(client->vitem);
count = 0; count = 0;
workingString.clear(); workingString.clear();
@ -846,7 +846,7 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
} }
} }
return subscriptionClient.vitem; return subscriptionClient->vitem;
} }
# endif # endif

View File

@ -1 +1 @@
05FA 0601