1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00

Update redis.cpp

This commit is contained in:
Victor Stewart 2019-11-29 19:38:08 -04:00 committed by GitHub
parent 9c1992942c
commit 743ce33ff9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -16,20 +16,8 @@
uint32_t UREDISClient_Base::start; uint32_t UREDISClient_Base::start;
ptrdiff_t UREDISClient_Base::diff; ptrdiff_t UREDISClient_Base::diff;
UHashMap<void*>* UREDISClient_Base::pchannelCallbackMap;
UREDISClient_Base* UREDISClient_Base::pthis; UREDISClient_Base* UREDISClient_Base::pthis;
UREDISClient_Base::~UREDISClient_Base()
{
U_TRACE_DTOR(0, UREDISClient_Base)
if (pchannelCallbackMap)
{
U_DELETE(pchannelCallbackMap)
pchannelCallbackMap = U_NULLPTR;
}
}
// Connect to REDIS server // Connect to REDIS server
void UREDISClient_Base::init() void UREDISClient_Base::init()
@ -439,13 +427,13 @@ bool UREDISClient_Base::deleteKeys(const char* pattern, uint32_t len) // Delete
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__) #if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
// this is called for subscribed channels // this is called for subscribed channels
int UREDISClient_Base::handlerRead() int UREDISClusterClient::handlerRead()
{ {
// BytesRead(100) = "*3\r\n$7\r\nmessage\r\n$19\r\n{ABC}.trafficSignal\r\n$1\r\n1\r\n*3\r\n$7\r\nmessage\r\n$19\r\n{DEF}.trafficSignal\r\n$1\r\n1\r\n" // BytesRead(100) = "*3\r\n$7\r\nmessage\r\n$19\r\n{ABC}.trafficSignal\r\n$1\r\n1\r\n*3\r\n$7\r\nmessage\r\n$19\r\n{DEF}.trafficSignal\r\n$1\r\n1\r\n"
U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerRead()") U_TRACE_NO_PARAM(0, "UREDISClusterClient::handlerRead()")
if ((clear(), UClient_Base::response.setEmpty(), UClient_Base::readResponse(U_SINGLE_READ))) if ((clear(), response.setEmpty(), readResponse(U_SINGLE_READ)))
{ {
processResponse(); processResponse();
@ -457,7 +445,7 @@ int UREDISClient_Base::handlerRead()
{ {
if (vitem[index] == "message"_ctv) if (vitem[index] == "message"_ctv)
{ {
vPFcscs callback = (vPFcscs)pchannelCallbackMap->at(vitem[index + 1]); vPFcscs callback = (vPFcscs)(master->pchannelCallbackMap->at(vitem[index + 1]));
if (callback) callback(vitem[index + 1], vitem[index + 2]); if (callback) callback(vitem[index + 1], vitem[index + 2]);
} }
} }
@ -468,8 +456,6 @@ int UREDISClient_Base::handlerRead()
ClusterError UREDISClusterMaster::checkResponseForClusterErrors(const UString& response, size_t offset) ClusterError UREDISClusterMaster::checkResponseForClusterErrors(const UString& response, size_t offset)
{ {
U_TRACE_NO_PARAM(0, "checkResponseForClusterErrors()")
// all of these errors are very rare, and only occur in the midst of cluster topology changes // all of these errors are very rare, and only occur in the midst of cluster topology changes
// -MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node // -MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node
@ -500,8 +486,6 @@ void UREDISClusterMaster::calculateNodeMap()
UString& response = managementClient->response; UString& response = managementClient->response;
U_WARNING("CLUSTER SLOTS response = %.*s", response.size(), response.data());
uint16_t lowHashSlot; uint16_t lowHashSlot;
uint16_t highHashSlot; uint16_t highHashSlot;
UString compositeAddress(50U); UString compositeAddress(50U);
@ -568,7 +552,7 @@ void UREDISClusterMaster::calculateNodeMap()
else else
{ {
U_NEW(RedisClusterNode, workingNode, RedisClusterNode(this, address, port, lowHashSlot, highHashSlot)); U_NEW(RedisClusterNode, workingNode, RedisClusterNode(this, address, port, lowHashSlot, highHashSlot));
} }
newNodes->insert(compositeAddress, workingNode); newNodes->insert(compositeAddress, workingNode);
@ -588,6 +572,8 @@ bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
{ {
U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port) U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port)
managementClient->UEventFd::op_mask |= EPOLLET;
if (managementClient->connect(host, _port)) if (managementClient->connect(host, _port))
{ {
calculateNodeMap(); calculateNodeMap();
@ -597,19 +583,14 @@ bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
if (randomNode) if (randomNode)
{ {
subscriptionClient->connect(randomNode->ipAddress.c_str(), randomNode->port); subscriptionClient->connect(randomNode->ipAddress.c_str(), randomNode->port);
U_INTERNAL_ASSERT_EQUALS(UREDISClient_Base::pchannelCallbackMap, U_NULLPTR)
U_NEW(UHashMap<void*>, UREDISClient_Base::pchannelCallbackMap, UHashMap<void*>()); U_NEW(UHashMap<void*>, pchannelCallbackMap, UHashMap<void*>());
subscriptionClient->UEventFd::fd = subscriptionClient->getFd(); subscriptionClient->UEventFd::fd = subscriptionClient->getFd();
subscriptionClient->UEventFd::op_mask |= EPOLLET; subscriptionClient->UEventFd::op_mask |= EPOLLET;
U_DUMP("subscriptionClient = %p", subscriptionClient);
UServer_Base::addHandlerEvent(subscriptionClient); UServer_Base::addHandlerEvent(subscriptionClient);
U_RETURN(true); U_RETURN(true);
} }
} }
@ -621,16 +602,20 @@ void UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregist
{ {
U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep) U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep)
subscriptionClient->sendRequest(U_CTV_TO_PARAM("UNSUBSCRIBE "_ctv), channel); UCompileTimeRESPEncoder::encode<"UNSUBSCRIBE {}"_ctv>(subscriptionClient->response, channel);
(void)subscriptionClient->pchannelCallbackMap->erase(channel); subscriptionClient->sendRequest(subscriptionClient->response);
(void)pchannelCallbackMap->erase(channel);
} }
void UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels void UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels
{ {
U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback) U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback)
subscriptionClient->sendRequest(U_CTV_TO_PARAM("SUBSCRIBE "_ctv), channel); UCompileTimeRESPEncoder::encode<"SUBSCRIBE {}"_ctv>(subscriptionClient->response, channel);
subscriptionClient->pchannelCallbackMap->insert(channel, (const void*)callback); subscriptionClient->sendRequest(subscriptionClient->response);
UString channelCopy(U_STRING_TO_PARAM(channel));
pchannelCallbackMap->insert(channelCopy, (const void*)callback);
} }
static void getNextCommandResponse(const UString& string, size_t& marker) static void getNextCommandResponse(const UString& string, size_t& marker)