mirror of
https://github.com/stefanocasazza/ULib.git
synced 2025-09-28 19:05:55 +08:00
Update redis.h
This commit is contained in:
parent
2e931dfa95
commit
d0e5b9ddad
|
|
@ -192,45 +192,6 @@ public:
|
|||
|
||||
bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379);
|
||||
|
||||
// by Victor Stewart
|
||||
|
||||
UString single(const UString& pipeline)
|
||||
{
|
||||
U_TRACE(0, "UREDISClient_Base::single(%V)", pipeline.rep)
|
||||
|
||||
(void) processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(pipeline));
|
||||
|
||||
return vitem[0];
|
||||
}
|
||||
|
||||
bool silencedSingle(UString& pipeline)
|
||||
{
|
||||
U_TRACE(0, "UREDISClient_Base::silencedSingle(%V)", pipeline.rep)
|
||||
|
||||
return sendRequest(U_CONSTANT_TO_PARAM("CLIENT REPLY SKIP \r\n"), pipeline);
|
||||
}
|
||||
|
||||
const UVector<UString>& multi(const UString& pipeline)
|
||||
{
|
||||
U_TRACE(0, "UREDISClient_Base::multi(%V)", pipeline.rep)
|
||||
|
||||
(void) processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(pipeline));
|
||||
|
||||
return vitem;
|
||||
}
|
||||
|
||||
bool silencedMulti(UString& pipeline)
|
||||
{
|
||||
U_TRACE(0, "UREDISClient_Base::silencedMulti(%V)", pipeline.rep)
|
||||
|
||||
bool result = sendRequest(U_CONSTANT_TO_PARAM("CLIENT REPLY OFF \r\n"), pipeline + "CLIENT REPLY ON \r\n");
|
||||
|
||||
// CLIENT REPLY ON responds with "+OK\r\n" and no way to silence it
|
||||
UClient_Base::readResponse();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// STRING (@see http://redis.io/commands#string)
|
||||
|
||||
bool get(const char* key, uint32_t keylen) // Get the value of a key
|
||||
|
|
@ -807,7 +768,7 @@ protected:
|
|||
|
||||
void init();
|
||||
|
||||
static void parseResponse(UString& response, UVector<UString>& parsed, size_t index, size_t terminalIndex)
|
||||
static void parseResponse(UString& response, UVector<UString>& parsed, size_t index, size_t terminalIndex, ssize_t depth = 0)
|
||||
{
|
||||
U_TRACE(0, "UREDISClient_Base::parseResponse(index = %lu, terminalIndex = %lu)", index, terminalIndex);
|
||||
|
||||
|
|
@ -815,9 +776,12 @@ protected:
|
|||
|
||||
ptr1 = (ptr2 = response.c_pointer(index));
|
||||
|
||||
while (ptr2 < pend)
|
||||
while (ptr2 < pend && depth > 0)
|
||||
{
|
||||
while (*ptr2 != '\r') ++ptr2;
|
||||
while (*ptr2 != '\r')
|
||||
{
|
||||
++ptr2;
|
||||
}
|
||||
|
||||
switch (*ptr1++)
|
||||
{
|
||||
|
|
@ -838,6 +802,7 @@ protected:
|
|||
else
|
||||
{
|
||||
size_t length = u_strtoul(ptr1, ptr2);
|
||||
|
||||
parsed.push_back(response.substr((ptr2 += U_CONSTANT_SIZE(U_CRLF)), length));
|
||||
ptr2 += length;
|
||||
}
|
||||
|
|
@ -846,15 +811,17 @@ protected:
|
|||
}
|
||||
// *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n
|
||||
case U_RC_MULTIBULK:
|
||||
// never
|
||||
default:
|
||||
break;
|
||||
{
|
||||
depth += u_strtoul(ptr1, ptr2);
|
||||
break;
|
||||
}
|
||||
default: break; // never
|
||||
}
|
||||
|
||||
--depth;
|
||||
|
||||
ptr1 = (ptr2 += U_CONSTANT_SIZE(U_CRLF));
|
||||
}
|
||||
|
||||
//U_DUMP_CONTAINER(parsed);
|
||||
}
|
||||
|
||||
void processResponse()
|
||||
|
|
@ -875,9 +842,7 @@ protected:
|
|||
|
||||
UClient_Base::iov[0].iov_base = (caddr_t)pipeline.data();
|
||||
UClient_Base::iov[0].iov_len = pipeline.size();
|
||||
UClient_Base::iov[1].iov_base = (caddr_t)U_CRLF;
|
||||
UClient_Base::iov[1].iov_len =
|
||||
UClient_Base::iovcnt = 2;
|
||||
UClient_Base::iovcnt = 1;
|
||||
|
||||
return UClient_Base::sendRequest(false);
|
||||
}
|
||||
|
|
@ -1039,6 +1004,39 @@ private:
|
|||
|
||||
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__) && GCC_VERSION_NUM < 100100
|
||||
|
||||
enum class RedisOptions : uint8_t {
|
||||
|
||||
one = 0b0000'0001, // return one item, of type const UString&
|
||||
many = 0b0000'0010, // return all items, of type const UVector<UString>&
|
||||
|
||||
// for cluster mode these are "psuedo-silenced", aka we wait on responses to ensure no cluster errors, but don't waste resources processing the responses
|
||||
silenced = 0b0000'0100, // void
|
||||
reorderable = 0b0000'1000, // only in cluster mode
|
||||
|
||||
copy = 0b0001'0000, // if data needs to persist through subsequent Redis calls, at the cost of copy operation
|
||||
asynchronous = 0b0010'0000
|
||||
};
|
||||
|
||||
constexpr RedisOptions operator |(RedisOptions lhs, RedisOptions rhs)
|
||||
{
|
||||
using underlying = typename std::underlying_type<RedisOptions>::type;
|
||||
return static_cast<RedisOptions>
|
||||
(
|
||||
static_cast<underlying>(lhs) |
|
||||
static_cast<underlying>(rhs)
|
||||
);
|
||||
}
|
||||
|
||||
constexpr bool operator &(RedisOptions lhs, RedisOptions rhs)
|
||||
{
|
||||
using underlying = typename std::underlying_type<RedisOptions>::type;
|
||||
return static_cast<bool>
|
||||
(
|
||||
static_cast<underlying>(lhs) &
|
||||
static_cast<underlying>(rhs)
|
||||
);
|
||||
}
|
||||
|
||||
class UCompileTimeRESPEncoder : public UCompileTimeStringFormatter {
|
||||
private:
|
||||
|
||||
|
|
@ -1179,37 +1177,229 @@ public:
|
|||
}
|
||||
};
|
||||
|
||||
enum class RedisOptions : uint8_t {
|
||||
class RedisSubscriber : public UEventFd {
|
||||
private:
|
||||
|
||||
UString subscriptionString;
|
||||
USocket *subscriptionSocket;
|
||||
UHashMap<void*>* pchannelCallbackMap;
|
||||
|
||||
public:
|
||||
|
||||
virtual int handlerRead() U_DECL_FINAL;
|
||||
|
||||
void subscribe(const UString& channel, vPFcscs callback)
|
||||
{
|
||||
subscriptionString.setEmpty();
|
||||
UCompileTimeRESPEncoder::encode<"SUBSCRIBE {}"_ctv>(subscriptionString, channel);
|
||||
USocketExt::write(subscriptionSocket, U_STRING_TO_PARAM(subscriptionString), 500);
|
||||
|
||||
UString channelCopy(U_STRING_TO_PARAM(channel));
|
||||
pchannelCallbackMap->insert(channelCopy, (const void*)callback);
|
||||
}
|
||||
|
||||
void unsubscribe(const UString& channel)
|
||||
{
|
||||
subscriptionString.setEmpty();
|
||||
UCompileTimeRESPEncoder::encode<"UNSUBSCRIBE {}"_ctv>(subscriptionString, channel);
|
||||
USocketExt::write(subscriptionSocket, U_STRING_TO_PARAM(subscriptionString), 500);
|
||||
(void)pchannelCallbackMap->erase(channel);
|
||||
}
|
||||
|
||||
void connectForSubscriptions(const UString& host, uint16_t port)
|
||||
{
|
||||
subscriptionSocket->connectServer(host, port, 500);
|
||||
|
||||
U_NEW(UHashMap<void*>, pchannelCallbackMap, UHashMap<void*>());
|
||||
|
||||
this->UEventFd::fd = subscriptionSocket->getFd();
|
||||
this->UEventFd::op_mask |= EPOLLET;
|
||||
UServer_Base::addHandlerEvent(this);
|
||||
}
|
||||
|
||||
RedisSubscriber()
|
||||
{
|
||||
U_NEW(USocket, subscriptionSocket, USocket);
|
||||
}
|
||||
|
||||
one = 0b0000'0001, // const UString&
|
||||
many = 0b0000'0010, // const UVector<UString>&
|
||||
|
||||
//these are "psuedo-silenced", aka we wait on responses to ensure no cluster errors, but don't waste resources processing the responses
|
||||
silenced = 0b0000'0100, // void
|
||||
reorderable = 0b0000'1000,
|
||||
|
||||
copy = 0b0001'0000 // if data needs to persist through subsequent Redis calls, at the cost of copy operation
|
||||
~RedisSubscriber()
|
||||
{
|
||||
U_DELETE(subscriptionSocket);
|
||||
if (pchannelCallbackMap) U_DELETE(pchannelCallbackMap);
|
||||
}
|
||||
};
|
||||
|
||||
constexpr RedisOptions operator |(RedisOptions lhs, RedisOptions rhs)
|
||||
{
|
||||
using underlying = typename std::underlying_type<RedisOptions>::type;
|
||||
return static_cast<RedisOptions>
|
||||
(
|
||||
static_cast<underlying>(lhs) |
|
||||
static_cast<underlying>(rhs)
|
||||
);
|
||||
}
|
||||
class URedisClient2 : public UEventFd {
|
||||
private:
|
||||
|
||||
constexpr bool operator &(RedisOptions lhs, RedisOptions rhs)
|
||||
{
|
||||
using underlying = typename std::underlying_type<RedisOptions>::type;
|
||||
return static_cast<bool>
|
||||
(
|
||||
static_cast<underlying>(lhs) &
|
||||
static_cast<underlying>(rhs)
|
||||
);
|
||||
}
|
||||
RedisSubscriber subscriber;
|
||||
USocket *socket;
|
||||
|
||||
UString workingString;
|
||||
UVector<UString> vitem;
|
||||
uint8_t pendingSilencedWrites;
|
||||
|
||||
virtual int handlerRead() U_DECL_FINAL
|
||||
{
|
||||
// currently if you want to read responses asynchronsouly, you must do so by explicitly calling read() before your USP returns, otherwise the response was assumed to have been silenced, and thus we read until empty then discard.
|
||||
|
||||
workingString.setEmpty();
|
||||
USocketExt::read(socket, workingString, U_SINGLE_READ, 100);
|
||||
|
||||
// every silenced response is +OK/r/n and we assume we only receive whole responses.. aka not chopped in the middle into differing TCP packets.
|
||||
// thus each response is 5 bytes
|
||||
pendingSilencedWrites -= (workingString.size() / 5);
|
||||
|
||||
return U_NOTIFIER_OK;
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
// only explicitly call this to read asynchronously
|
||||
template<RedisOptions options>
|
||||
const decltype(auto) read(size_t commandCount)
|
||||
{
|
||||
vitem.clear();
|
||||
workingString.setEmpty();
|
||||
|
||||
const char *pointer1, *pointer2, *pend;
|
||||
pointer1 = pointer2 = pend = workingString.data();
|
||||
|
||||
auto readAndRevalidatePointers = [&] (void) -> void {
|
||||
|
||||
size_t index1 = pointer1 - workingString.data();
|
||||
size_t index2 = pointer2 - workingString.data();
|
||||
USocketExt::read(socket, workingString, U_SINGLE_READ, 500);
|
||||
pointer1 = workingString.c_pointer(index1);
|
||||
pointer2 = workingString.c_pointer(index2);
|
||||
pend = workingString.pend();
|
||||
};
|
||||
|
||||
uint8_t depth = commandCount;
|
||||
|
||||
do
|
||||
{
|
||||
if (pointer1 >= pend) readAndRevalidatePointers();
|
||||
|
||||
while (*pointer2 != '\r')
|
||||
{
|
||||
// no knowing where the TCP packets might have been sliced
|
||||
if (UNLIKELY(++pointer2 == pend)) readAndRevalidatePointers();
|
||||
}
|
||||
|
||||
switch (*pointer1++)
|
||||
{
|
||||
// :0\r\n
|
||||
case U_RC_INT:
|
||||
// +OK\r\n
|
||||
case U_RC_INLINE:
|
||||
// -Error message\r\n
|
||||
case U_RC_ERROR: // only errors here are your fault
|
||||
{
|
||||
if (LIKELY(pendingSilencedWrites == 0)) vitem.push_back(workingString.substr(pointer1, pointer2 - pointer1));
|
||||
else
|
||||
{
|
||||
depth++; // to counterbalance the decrement after the switch
|
||||
pendingSilencedWrites--;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
// $-1\r\n
|
||||
// $15\r\nmy-value-tester\r\n
|
||||
case U_RC_BULK:
|
||||
{
|
||||
if (pointer1[0] == '-') vitem.push_back(UString::getStringNull());
|
||||
else
|
||||
{
|
||||
size_t length = u_strtoul(pointer1, pointer2);
|
||||
|
||||
if ((pointer2 + U_CONSTANT_SIZE(U_CRLF) + length + U_CONSTANT_SIZE(U_CRLF)) > pend) readAndRevalidatePointers();
|
||||
|
||||
vitem.push_back(workingString.substr((pointer2 += U_CONSTANT_SIZE(U_CRLF)), length));
|
||||
|
||||
pointer2 += length;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
// *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n
|
||||
case U_RC_MULTIBULK:
|
||||
{
|
||||
depth += u_strtoul(pointer1, pointer2);
|
||||
break;
|
||||
}
|
||||
|
||||
default: break;
|
||||
}
|
||||
|
||||
--depth;
|
||||
|
||||
pointer1 = (pointer2 += U_CONSTANT_SIZE(U_CRLF));
|
||||
|
||||
} while (depth > 0);
|
||||
|
||||
if constexpr (options & RedisOptions::one)
|
||||
{
|
||||
if (vitem.size())
|
||||
{
|
||||
if constexpr (options & RedisOptions::copy) return vitem[0].copy();
|
||||
else return vitem[0];
|
||||
}
|
||||
else return UString::getStringNull().copy();
|
||||
}
|
||||
else if constexpr (options & RedisOptions::many)
|
||||
{
|
||||
if constexpr (options & RedisOptions::copy) return UVector<UString>(vitem);
|
||||
else return (vitem);
|
||||
}
|
||||
}
|
||||
|
||||
// single refers to the targeting of a single Redis node (versus multi as in multiple nodes in a Cluster config)
|
||||
|
||||
template<RedisOptions options, UStringType A>
|
||||
const decltype(auto) single(A&& pipeline, ssize_t commandCount = -1)
|
||||
{
|
||||
// this guarantees that no matter how many commands were pipelined, the only response will be +OK/r/n
|
||||
// which decomplicates scenarios where you push multiple silenced writes, then issue a read where we
|
||||
// would have to read out those silenced replies first before getting to the read in question.
|
||||
if constexpr (options & RedisOptions::silenced)
|
||||
{
|
||||
pendingSilencedWrites++;
|
||||
pipeline.insert(0, UCompileTimeRESPEncoder::CLIENTREPLYOFF);
|
||||
pipeline.append(UCompileTimeRESPEncoder::CLIENTREPLYON);
|
||||
}
|
||||
|
||||
USocketExt::write(socket, U_STRING_TO_PARAM(pipeline), 500);
|
||||
|
||||
if constexpr (options & RedisOptions::asynchronous || options & RedisOptions::silenced) return commandCount;
|
||||
else return read<options>(commandCount);
|
||||
}
|
||||
|
||||
template <auto format, RedisOptions options, typename... Ts>
|
||||
const decltype(auto) single(Ts&&... ts)
|
||||
{
|
||||
return single<options>(workingString, UCompileTimeRESPEncoder::encode<format>(workingString, std::forward<Ts>(ts)...));
|
||||
}
|
||||
|
||||
void subscribe(const UString& channel, vPFcscs callback) { subscriber.subscribe(channel, callback); }
|
||||
void unsubscribe(const UString& channel) { subscriber.unsubscribe(channel); }
|
||||
|
||||
URedisClient2(const UString& host, uint16_t port)
|
||||
{
|
||||
U_NEW(USocket, socket, USocket);
|
||||
socket->connectServer(host, port, 500);
|
||||
subscriber.connectForSubscriptions(host, port);
|
||||
}
|
||||
|
||||
~URedisClient2()
|
||||
{
|
||||
U_DELETE(socket);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
///// CLUSTER /////
|
||||
|
||||
static uint16_t hashslotForKey(UStringType&& hashableKey)
|
||||
{
|
||||
|
|
@ -1329,7 +1519,7 @@ public:
|
|||
RedisClusterMultiPipeline() : pipeline(300U) {}
|
||||
};
|
||||
|
||||
class U_EXPORT UREDISClusterMaster : public UEventFd {
|
||||
class U_EXPORT UREDISClusterMaster : public RedisSubscriber {
|
||||
private:
|
||||
|
||||
struct RedisClusterNode {
|
||||
|
|
@ -1345,7 +1535,7 @@ private:
|
|||
RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
|
||||
{
|
||||
U_NEW(USocket, socket, USocket);
|
||||
socket->connectServer(ipAddress, port, 1000);
|
||||
socket->connectServer(ipAddress, port, 500);
|
||||
}
|
||||
|
||||
#if defined(DEBUG)
|
||||
|
|
@ -1355,17 +1545,13 @@ private:
|
|||
|
||||
friend class RedisClusterMultiPipeline;
|
||||
|
||||
UString workingString, subscriptionString;
|
||||
UString workingString;
|
||||
UVector<UString> parsed;
|
||||
|
||||
USocket *subscriptionSocket;
|
||||
USocket *managementSocket;
|
||||
UHashMap<RedisClusterNode *> *clusterNodes;
|
||||
// speed at the cost of memory, worth it
|
||||
std::unordered_map<uint16_t, USocket*> hashslotToSocket;
|
||||
UHashMap<void*>* pchannelCallbackMap;
|
||||
|
||||
virtual int handlerRead() U_DECL_FINAL;
|
||||
|
||||
template <UStringType A>
|
||||
USocket* socketForHashableKey(A&& hashableKey) const { return hashslotToSocket[hashslotForKey(std::forward<A>(hashableKey))]; }
|
||||
|
|
@ -1388,7 +1574,7 @@ private:
|
|||
|
||||
size_t index1 = pointer1 - workingString.data();
|
||||
size_t index2 = pointer2 - workingString.data();
|
||||
USocketExt::read(socket, workingString, U_SINGLE_READ, 1000);
|
||||
USocketExt::read(socket, workingString, U_SINGLE_READ, 500);
|
||||
pointer1 = workingString.c_pointer(index1);
|
||||
pointer2 = workingString.c_pointer(index2);
|
||||
pend = workingString.pend();
|
||||
|
|
@ -1482,7 +1668,7 @@ private:
|
|||
|
||||
} while (depth > 0);
|
||||
|
||||
report.end = pointer2 - workingString.data();;
|
||||
report.end = pointer2 - workingString.data();
|
||||
|
||||
return report;
|
||||
}
|
||||
|
|
@ -1509,7 +1695,7 @@ private:
|
|||
case RedisClusterError::ask:
|
||||
case RedisClusterError::tryagain:
|
||||
{
|
||||
USocketExt::write(report.socketAfterError, U_STRING_TO_PARAM(pipeline), 1000);
|
||||
USocketExt::write(report.socketAfterError, U_STRING_TO_PARAM(pipeline), 500);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
|
|
@ -1536,7 +1722,7 @@ private:
|
|||
|
||||
size_t pipelineLength = (pipeline.data() == workingString.data()) ? pipeline.size() : 0;
|
||||
// U_DUMP("pipelineLength = %lu", pipelineLength);
|
||||
USocketExt::write(workingSocket, U_STRING_TO_PARAM(pipeline), 1000);
|
||||
USocketExt::write(workingSocket, U_STRING_TO_PARAM(pipeline), 500);
|
||||
|
||||
RedisReadReport report = read(workingSocket, workingString.size(), commandCount);
|
||||
|
||||
|
|
@ -1679,7 +1865,7 @@ public:
|
|||
}
|
||||
while (++it != pipeline.spans.end() && workingSocket == hashslotToSocket[it->hashslot]);
|
||||
|
||||
USocketExt::write(workingSocket, U_STRING_TO_PARAM(workingString), 1000);
|
||||
USocketExt::write(workingSocket, U_STRING_TO_PARAM(workingString), 500);
|
||||
workingString.setEmpty();
|
||||
|
||||
} while (it != pipeline.spans.end());
|
||||
|
|
@ -1751,22 +1937,16 @@ public:
|
|||
return handleReturn<options>();
|
||||
}
|
||||
|
||||
void clusterUnsubscribe(const UString& channel);
|
||||
void clusterSubscribe( const UString& channel, vPFcscs callback);
|
||||
|
||||
UREDISClusterMaster()
|
||||
{
|
||||
clusterNodes = U_NULLPTR;
|
||||
U_NEW(USocket, managementSocket, USocket);
|
||||
U_NEW(USocket, subscriptionSocket, USocket);
|
||||
}
|
||||
|
||||
~UREDISClusterMaster()
|
||||
{
|
||||
U_DELETE(subscriptionSocket);
|
||||
U_DELETE(managementSocket);
|
||||
if (clusterNodes) U_DELETE(clusterNodes);
|
||||
if (pchannelCallbackMap) U_DELETE(pchannelCallbackMap);
|
||||
}
|
||||
|
||||
#if defined(DEBUG)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user