1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00

Update redis.cpp

This commit is contained in:
Victor Stewart 2020-07-07 11:04:41 -04:00 committed by GitHub
parent d0e5b9ddad
commit 816c51cc3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -354,11 +354,11 @@ bool UREDISClient_Base::deleteKeys(const char* pattern, uint32_t len) // Delete
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__) && GCC_VERSION_NUM < 100100
int UREDISClusterMaster::handlerRead()
int RedisSubscriber::handlerRead()
{
// BytesRead(100) = "*3\r\n$7\r\nmessage\r\n$19\r\n{ABC}.trafficSignal\r\n$1\r\n1\r\n*3\r\n$7\r\nmessage\r\n$19\r\n{DEF}.trafficSignal\r\n$1\r\n1\r\n"
U_TRACE_NO_PARAM(0, "UREDISClusterMaster::handlerRead()")
U_TRACE_NO_PARAM(0, "RedisSubscriber::handlerRead()")
if (subscriptionString.size()) subscriptionString.setEmpty();
USocketExt::read(subscriptionSocket, subscriptionString, U_SINGLE_READ, 1000);
@ -399,28 +399,6 @@ int UREDISClusterMaster::handlerRead()
U_RETURN(U_NOTIFIER_OK);
}
void UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels
{
U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep)
if (subscriptionString.size()) subscriptionString.setEmpty();
UCompileTimeRESPEncoder::encode<"UNSUBSCRIBE {}"_ctv>(subscriptionString, channel);
USocketExt::write(subscriptionSocket, U_STRING_TO_PARAM(subscriptionString), 1000);
(void)pchannelCallbackMap->erase(channel);
}
void UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels
{
U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback)
if (subscriptionString.size()) subscriptionString.setEmpty();
UCompileTimeRESPEncoder::encode<"SUBSCRIBE {}"_ctv>(subscriptionString, channel);
USocketExt::write(subscriptionSocket, U_STRING_TO_PARAM(subscriptionString), 1000);
UString channelCopy(U_STRING_TO_PARAM(channel));
pchannelCallbackMap->insert(channelCopy, (const void*)callback);
}
void UREDISClusterMaster::cloneClusterTopology()
{
U_TRACE_NO_PARAM(0, "UREDISClusterMaster::cloneClusterTopology()")
@ -535,24 +513,15 @@ bool UREDISClusterMaster::connect(const UString& host, uint16_t port)
{
U_TRACE(0, "UREDISClusterMaster::connect(%v,%hhu)", host.rep, port)
// const UString& singleTest = single()
if (managementSocket->connectServer(host, port, 1000))
{
cloneClusterTopology();
RedisClusterNode *randomNode = clusterNodes->randomElement();
if (randomNode)
if (randomNode)
{
subscriptionSocket->connectServer(randomNode->ipAddress, randomNode->port, 1000);
U_NEW(UHashMap<void*>, pchannelCallbackMap, UHashMap<void*>());
this->UEventFd::fd = subscriptionSocket->getFd();
this->UEventFd::op_mask |= EPOLLET;
UServer_Base::addHandlerEvent(this);
RedisSubscriber::connectForSubscriptions(randomNode->ipAddress, randomNode->port);
U_RETURN(true);
}
}
@ -561,8 +530,6 @@ bool UREDISClusterMaster::connect(const UString& host, uint16_t port)
}
#endif
// DEBUG
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UREDISClient_Base::dump(bool _reset) const
{