1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
This commit is contained in:
stefanocasazza 2019-11-25 17:24:02 +01:00
parent 11d415d1ce
commit 1c482602c9
12 changed files with 729 additions and 531 deletions

View File

@ -935,6 +935,7 @@ public:
// these three methods form the basis of an iterator for use with a range-based for loop
bool operator==(const UVectorStringIterBase& other) const { return (_pos == other._pos); }
bool operator<(const UVectorStringIterBase& other) const { return (_pos < other._pos); }
bool operator!=(const UVectorStringIterBase& other) const { return (_pos != other._pos); }
UVectorStringIterBase& operator++()

View File

@ -789,12 +789,15 @@ public:
// define method VIRTUAL of class UEventFd
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
virtual int handlerRead() U_DECL_FINAL;
#endif
virtual void handlerDelete() U_DECL_FINAL
virtual void handlerDelete()
{
U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerDelete()")
U_INTERNAL_DUMP("UREDISClient_Base::handlerDelete() -> client = %p", this);
U_INTERNAL_DUMP("UEventFd::fd = %d", UEventFd::fd)
UEventFd::fd = -1;
@ -809,7 +812,6 @@ protected:
static uint32_t start;
static ptrdiff_t diff;
static UVector<UString>* pvec;
static UREDISClient_Base* pthis;
static UHashMap<void*>* pchannelCallbackMap;
@ -825,7 +827,7 @@ protected:
void processResponse();
bool processRequest(char recvtype);
#if defined(U_STDCPP_ENABLE) && defined(U_LINUX) && defined(HAVE_CXX20) && !defined(__clang__)
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
bool sendRequest(UStringType&& pipeline)
#else
bool sendRequest(const UString& pipeline)
@ -995,9 +997,25 @@ private:
// by Victor Stewart
#if defined(U_STDCPP_ENABLE) && defined(U_LINUX) && defined(HAVE_CXX20) && !defined(__clang__)
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
typedef UREDISClient<UTCPSocket> UREDISClusterClient;
class UREDISClusterClient : public UREDISClient<UTCPSocket> {
public:
enum class ClientType : uint8_t {
subscription,
management,
node
};
const ClientType type;
UREDISClusterMaster *master;
//virtual void handlerDelete() U_DECL_FINAL;
UREDISClusterClient(UREDISClusterMaster *_master, const ClientType _type) : UREDISClient<UTCPSocket>(), type(_type), master(_master) {}
};
struct RedisClusterNode {
@ -1009,9 +1027,9 @@ struct RedisClusterNode {
UREDISClusterClient *client;
uint16_t port, lowHashSlot, highHashSlot;
RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
RedisClusterNode(UREDISClusterMaster *master, const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : ipAddress(U_STRING_FROM_CONSTANT("3.3.0.3")), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
{
U_NEW(UREDISClusterClient, client, UREDISClusterClient);
U_NEW(UREDISClusterClient, client, UREDISClusterClient(master, UREDISClusterClient::ClientType::node));
client->setHostPort(ipAddress, _port);
client->connect(ipAddress.c_str(), port);
}
@ -1034,6 +1052,7 @@ class U_EXPORT UREDISClusterMaster {
private:
friend class AnonymousClusterPipeline;
friend class UREDISClusterClient;
UREDISClusterClient *subscriptionClient;
UREDISClusterClient *managementClient;
@ -1043,16 +1062,17 @@ private:
UREDISClusterClient* clientForHashslot(uint16_t hashslot)
{
U_TRACE(0, "UREDISClusterMaster::clientForHashslot(%u)", hashslot)
for (UHashMapNode *node : *clusterNodes)
{
RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem);
if ((workingNode->lowHashSlot <= hashslot) && (workingNode->highHashSlot >= hashslot)) return workingNode->client;
if ((workingNode->lowHashSlot <= hashslot) && (workingNode->highHashSlot >= hashslot))
{
return workingNode->client;
}
}
return managementClient; // never reached
return U_NULLPTR; // never reached
}
UREDISClusterClient* clientForIP(const UString& ip)
@ -1064,7 +1084,7 @@ private:
if (ip == workingNode->ipAddress) return workingNode->client;
}
return managementClient; // never reached
return U_NULLPTR; // never reached
}
template <UStringType A>
@ -1075,22 +1095,20 @@ private:
static ClusterError checkResponseForClusterErrors(const UString& response, size_t offset);
template<UStringType A, UStringType B>
UREDISClusterClient* sendToCluster(A&& hashableKey, B&& pipeline)
template<bool psuedoSilence>
UREDISClusterClient* sendToCluster(uint16_t hashslot, UStringType&& pipeline, UREDISClusterClient* workingClient)
{
ClusterError error;
UREDISClusterClient* workingClient = clientForHashableKey(std::forward<A>(hashableKey));
retry:
workingClient->clear();
retry:
workingClient->UREDISClient_Base::sendRequest(pipeline);
workingClient->response.setEmpty();
workingClient->vitem.clear();
workingClient->UClient_Base::response.setEmpty();
workingClient->UClient_Base::readResponse(U_SINGLE_READ);
workingClient->sendRequest(pipeline);
workingClient->readResponse(U_SINGLE_READ);
error = checkResponseForClusterErrors(workingClient->UClient_Base::response, 0);
error = checkResponseForClusterErrors(workingClient->response, 0);
while (error != ClusterError::none)
{
@ -1098,16 +1116,17 @@ private:
{
case ClusterError::moved:
{
// U_DUMP("(D) calling calculateNodeMap");
calculateNodeMap();
workingClient = clientForHashableKey(std::forward<A>(hashableKey));
workingClient = clientForHashslot(hashslot);
break;
}
case ClusterError::ask:
{
uint32_t _start = workingClient->UClient_Base::response.find(' ', U_CONSTANT_SIZE("-ASK 3999")) + 1,
end = workingClient->UClient_Base::response.find(':', _start);
uint32_t _start = workingClient->response.find(' ', U_CONSTANT_SIZE("-ASK 3999")) + 1,
end = workingClient->response.find(':', _start);
workingClient = clientForIP(workingClient->UClient_Base::response.substr(_start, end - _start));
workingClient = clientForIP(workingClient->response.substr(_start, end - _start));
break;
}
case ClusterError::tryagain:
@ -1121,13 +1140,20 @@ private:
goto retry;
}
workingClient->UREDISClient_Base::processResponse();
if constexpr (!psuedoSilence) workingClient->processResponse();
return workingClient;
}
template<bool psuedoSilence, UStringType A, UStringType B>
inline UREDISClusterClient* routeToCluster(A&& hashableKey, B&& pipeline)
{
uint16_t hashslot = UREDISClusterMaster::hashslotForKey(std::forward<A>(hashableKey));
return sendToCluster<psuedoSilence>(hashslot, std::forward<B>(pipeline), clientForHashslot(hashslot));
}
public:
U_MEMORY_TEST
U_MEMORY_ALLOCATOR
U_MEMORY_DEALLOCATOR
@ -1135,31 +1161,31 @@ public:
bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379);
template<UStringType A, UStringType B>
const UString clusterSingle(A&& hashableKey, B&& pipeline)
{
return sendToCluster(std::forward<A>(hashableKey), std::forward<B>(pipeline))->vitem[0];
}
const UString clusterSingle(A&& hashableKey, B&& pipeline) {return routeToCluster<false>(std::forward<A>(hashableKey), std::forward<B>(pipeline))->vitem[0];}
// both of these multis require all keys to exist within a single hash slot (on the same node isn't good enough)
template<UStringType A, UStringType B>
const UVector<UString>& clusterMulti(A&& hashableKey, B&& pipeline)
{
return sendToCluster(std::forward<A>(hashableKey), std::forward<B>(pipeline))->vitem;
}
const UVector<UString>& clusterMulti(A&& hashableKey, B&& pipeline) {return routeToCluster<false>(std::forward<A>(hashableKey), std::forward<B>(pipeline))->vitem;}
// these are "psuedo-silenced", aka we wait on responses to ensure no cluster errors, but don't waste resources processing the responses
template<UStringType A, UStringType B>
void clusterSilencedSingle(A&& hashableKey, B&& pipeline) {routeToCluster<true>(std::forward<A>(hashableKey), std::forward<B>(pipeline));}
template<UStringType A, UStringType B>
void clusterSilencedMulti(A&& hashableKey, B&& pipeline) {routeToCluster<true>(std::forward<A>(hashableKey), std::forward<B>(pipeline));}
// if reorderable == false, commands are grouped and pushed SEQUENTIALLY BY HASHSLOT. even if other commands point to hashslots on the same cluster node, we are unable to garuntee ordering since Redis only checks for -MOVED etc errors command by command as it executes them, and does not fail upon reaching a -MOVED etc error. this requires waiting for each response, to ensure no errors occured, before moving onto the next batch of commands.
// if reorderable == true, we are able to group and push commands BY NODE regardless of sequence. we assume the fast-path 99.999% occurance that -MOVED and other errors did not occur, and push commands to redis as rapidly as possible without waiting on responses. we same a copy of all commands, and then at the end, check for -MOVED or other errors, and correct those if need be, while we process all resposnes.
const UVector<UString>& clusterAnonMulti(const AnonymousClusterPipeline& pipeline, bool reorderable);
bool clusterUnsubscribe(const UString& channel);
bool clusterSubscribe( const UString& channel, vPFcscs callback);
void clusterUnsubscribe(const UString& channel);
void clusterSubscribe( const UString& channel, vPFcscs callback);
UREDISClusterMaster()
{
clusterNodes = U_NULLPTR;
U_NEW(UREDISClusterClient, managementClient, UREDISClusterClient);
U_NEW(UREDISClusterClient, subscriptionClient, UREDISClusterClient);
U_NEW(UREDISClusterClient, managementClient, UREDISClusterClient(this, UREDISClusterClient::ClientType::management));
U_NEW(UREDISClusterClient, subscriptionClient, UREDISClusterClient(this, UREDISClusterClient::ClientType::subscription));
}
~UREDISClusterMaster()
@ -1177,7 +1203,7 @@ class UCompileTimeRESPEncoder : public UCompileTimeStringFormatter {
private:
template<bool isPartial, size_t workingIndex = 0, size_t workingSegmentCount = 0, typename StringClass, typename... Xs, typename T, typename... Ts>
static constexpr auto generateSegments(StringClass format, size_t& outputSegmentCount, std::tuple<Xs...>&& workingCommand, T&& t, Ts&&... ts)
static constexpr auto generateSegments(StringClass format, size_t& outputCount, std::tuple<Xs...>&& workingCommand, T&& t, Ts&&... ts)
{
constexpr size_t segmentStart = StringClass::instance.find(workingIndex, " "_ctv, StringClass::notChars);
@ -1185,7 +1211,7 @@ private:
{
if constexpr (isPartial)
{
outputSegmentCount = workingSegmentCount;
outputCount = workingSegmentCount;
return workingCommand;
}
else
@ -1193,11 +1219,13 @@ private:
constexpr auto segmentCountString = "*"_ctv + integerToString<workingSegmentCount>() + "\r\n"_ctv;
constexpr size_t nextCommand = StringClass::instance.find(segmentStart, " \r\n"_ctv, StringClass::notChars);
outputCount += 1;
if constexpr (nextCommand < StringClass::length)
{
return std::apply([&] (auto... params) {
return generateSegments<isPartial, nextCommand>(format, outputSegmentCount, std::tuple(), std::forward<T>(t), std::forward<Ts>(ts)..., segmentCountString, params...);
return generateSegments<isPartial, nextCommand>(format, outputCount, std::tuple(), std::forward<T>(t), std::forward<Ts>(ts)..., segmentCountString, params...);
}, workingCommand);
}
@ -1212,30 +1240,36 @@ private:
if constexpr (formatStart < segmentEnd)
{
constexpr size_t formatTermination = formatStart + 1;
return generateSegments<isPartial, segmentEnd, workingSegmentCount + 1>(format, outputSegmentCount, std::tuple_cat(workingCommand, std::make_tuple("$"_ctv, LengthSurplusPackage<T>{(segmentEnd + formatStart) - (segmentStart + formatTermination) - 1, std::forward<T>(t)}, "\r\n"_ctv, StringClass::instance.template substr<segmentStart, formatStart>(), std::forward<T>(t), StringClass::instance.template substr<(std::min(formatTermination + 1, segmentEnd)), segmentEnd>() + "\r\n"_ctv)), std::forward<Ts>(ts)...);
constexpr size_t lengthSurplus = (segmentEnd + formatStart) - (segmentStart + formatTermination) - 1;
// some ocassional bug where the length surplus ctor seems to pull references and the first number value in overflowing, so bypass it for now
return generateSegments<isPartial, segmentEnd, workingSegmentCount + 1>(format, outputCount, std::tuple_cat(workingCommand, std::make_tuple("$"_ctv, getLength(t) + lengthSurplus, "\r\n"_ctv, StringClass::instance.template substr<segmentStart, formatStart>(), std::forward<T>(t), StringClass::instance.template substr<(std::min(formatTermination + 1, segmentEnd)), segmentEnd>() + "\r\n"_ctv)), std::forward<Ts>(ts)...);
}
else
{
constexpr auto segmentString = "$"_ctv + integerToString<segmentEnd - segmentStart>() + "\r\n"_ctv + StringClass::instance.template substr<segmentStart, segmentEnd>() + "\r\n"_ctv;
return generateSegments<isPartial, segmentEnd, workingSegmentCount + 1>(format, outputSegmentCount, std::tuple_cat(workingCommand, std::tie(segmentString)), std::forward<T>(t), std::forward<Ts>(ts)...);
return generateSegments<isPartial, segmentEnd, workingSegmentCount + 1>(format, outputCount, std::tuple_cat(workingCommand, std::tie(segmentString)), std::forward<T>(t), std::forward<Ts>(ts)...);
}
}
}
template<bool isPartial, bool overwrite, auto format, typename... Ts>
static size_t encode_impl(size_t writePosition, UString& workingString, Ts&&... ts)
{
size_t segmentCount = 0;
{
U_TRACE_NO_PARAM(0, "UCompileTimeRESPEncoder::encode_impl()");
// if partial will output segment count
// if full, will output command count
size_t count = 0;
std::apply([&] (auto... params) {
UCompileTimeStringFormatter::snprintf_impl<overwrite>(writePosition, workingString, params...);
}, generateSegments<isPartial>(format, segmentCount, std::tuple(), std::forward<Ts>(ts)..., ""_ctv));
return segmentCount;
}, generateSegments<isPartial>(format, count, std::tuple(), std::forward<Ts>(ts)..., ""_ctv));
return count;
}
public:
@ -1249,21 +1283,21 @@ public:
// fulls
template<auto format, typename ... Ts>
static void encode(UString& workingString, Ts&&... ts)
static size_t encode(UString& workingString, Ts&&... ts)
{
(void)encode_impl<false, true, format>(0, workingString, std::forward<Ts>(ts)...);
return encode_impl<false, true, format>(0, workingString, std::forward<Ts>(ts)...);
}
template<auto format, typename ... Ts>
static void encode_add(UString& workingString, Ts&&... ts)
static size_t encode_add(UString& workingString, Ts&&... ts)
{
(void)encode_impl<false, false, format>(workingString.size(), workingString, std::forward<Ts>(ts)...);
return encode_impl<false, false, format>(workingString.size(), workingString, std::forward<Ts>(ts)...);
}
template<auto format, typename ... Ts>
static void encode_pos(size_t writePosition, UString& workingString, Ts&&... ts)
static size_t encode_pos(size_t writePosition, UString& workingString, Ts&&... ts)
{
(void)encode_impl<false, false, format>(writePosition, workingString, std::forward<Ts>(ts)...);
return encode_impl<false, false, format>(writePosition, workingString, std::forward<Ts>(ts)...);
}
// partials
@ -1292,16 +1326,17 @@ private:
struct Span {
uint8_t commandCount;
int16_t hashslot;
size_t beginning, end, index;
Span(uint16_t _hashslot, size_t _beginning, size_t _end, size_t _index) : hashslot(_hashslot), beginning(_beginning), end(_end), index(_index) {}
Span(uint8_t _commandCount, uint16_t _hashslot, size_t _beginning, size_t _end, size_t _index) : commandCount(_commandCount), hashslot(_hashslot), beginning(_beginning), end(_end), index(_index) {}
};
public:
UString pipeline;
std::vector<Span> spans;
public:
size_t size()
{
@ -1315,28 +1350,33 @@ public:
}
template <UStringType A>
void append(A&& hashableKey, const UString& command)
void append(A&& hashableKey, const UString& command, uint8_t commandCount = 1)
{
U_TRACE_NO_PARAM(0, "AnonymousClusterPipeline::append(ustring)")
size_t beginning = pipeline.size();
pipeline.reserve(pipeline.size() + command.size());
pipeline.append(command);
spans.emplace_back(UREDISClusterMaster::hashslotForKey(std::forward<A>(hashableKey)), beginning, pipeline.size(), spans.size());
spans.emplace_back(commandCount, UREDISClusterMaster::hashslotForKey(std::forward<A>(hashableKey)), beginning, pipeline.size(), spans.size());
}
template <auto format, UStringType A, typename... Ts>
void append(A&& hashableKey, Ts&&... ts)
{
U_TRACE_NO_PARAM(0, "AnonymousClusterPipeline::append(variadic)")
size_t beginning = pipeline.size();
UCompileTimeRESPEncoder::encode_add<format>(pipeline, std::forward<Ts>(ts)...);
size_t commandCount = UCompileTimeRESPEncoder::encode_add<format>(pipeline, std::forward<Ts>(ts)...);
spans.emplace_back(UREDISClusterMaster::hashslotForKey(std::forward<A>(hashableKey)), beginning, pipeline.size(), spans.size());
spans.emplace_back(commandCount, UREDISClusterMaster::hashslotForKey(std::forward<A>(hashableKey)), beginning, pipeline.size(), spans.size());
}
AnonymousClusterPipeline() : pipeline(300U) {}
};
#endif
#endif

View File

@ -677,6 +677,9 @@ public:
// INIT
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
static void setStack(uint8_t* ptr, uint32_t len)
{
U_TRACE(0, "UFlatBuffer::setStack(%p,%u)", ptr, len)
@ -684,6 +687,8 @@ public:
stack_str = ptr;
stack_max = len / UFlatBufferValue::size();
}
#pragma GCC diagnostic pop
static void setBuffer(uint8_t* ptr, uint32_t len)
{

View File

@ -2854,7 +2854,8 @@ namespace std {
};
}
# endif
# if defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
#if defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
# include <utility> // std::index_sequence
template <char... Chars>
class UCompileTimeStringView {
@ -2892,6 +2893,9 @@ public:
else return substr<From>(std::make_index_sequence<To - From>{});
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-but-set-parameter"
template<size_t index = 0>
constexpr bool contains(char value) const
{
@ -2900,6 +2904,8 @@ public:
else return contains<index+1>(value);
}
#pragma GCC diagnostic pop
// so that CTV is transparently a UString to U_STRING_TO_PARAM
char const *data() const noexcept { return string; }
size_t size() const noexcept { return length; }
@ -2908,7 +2914,7 @@ public:
static constexpr uint8_t skipDoubles = 0x2;
template <class String>
constexpr ssize_t find(size_t index, String findTheseChars, uint8_t options = 0, size_t terminalIndex = length) const
constexpr size_t find(size_t index, String findTheseChars, uint8_t options = 0, size_t terminalIndex = length) const
{
while (index < terminalIndex)
{
@ -2923,7 +2929,7 @@ public:
++index;
}
return index;
return index;
}
};
@ -2933,8 +2939,6 @@ constexpr auto operator""_ctv() // compile time view
return UCompileTimeStringView<static_cast<char>(Chars)...>{};
}
#define U_CTV_TO_PARAM(ct_string) ct_string.string, ct_string.length
template <typename T, template <char... CharsB> class Template>
struct is_ctv : std::false_type {};
@ -2944,6 +2948,7 @@ struct is_ctv<Template<CharsA...>, Template> : std::true_type {};
template <typename T>
static inline constexpr bool is_ctv_v = is_ctv<T, UCompileTimeStringView>::value;
#define U_CTV_TO_PARAM(ct_string) ct_string.string, ct_string.length
template<typename Lambda, typename T>
static void snprintf_specialization(Lambda&& lambda, T t)
@ -2956,6 +2961,9 @@ static void snprintf_specialization(Lambda&& lambda, T t)
class UCompileTimeStringFormatter {
protected:
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-but-set-parameter"
template<size_t workingIndex, size_t argumentCount, typename StringClass, typename T, typename... Ts>
static constexpr auto generateSegments(StringClass format, T&& t, Ts&&... ts)
{
@ -2972,6 +2980,8 @@ protected:
}
}
#pragma GCC diagnostic pop
template <typename T, typename U>
struct decay_equiv : std::is_same<typename std::decay<T>::type, U>::type {};
@ -2981,28 +2991,17 @@ protected:
template <typename T>
static void writeBytes(char*& writeTo, T t)
{
if constexpr (is_ctv_v<T>)
{
writeTo = (char*)memcpy(writeTo, t.string, t.length) + t.length;
}
else if constexpr (decay_equiv_v<T, UString>)
{
writeTo = (char*)memcpy(writeTo, t.data(), t.size()) + t.size();
}
else if constexpr (std::is_integral_v<T>)
{
writeTo = u_num2str64s(t, writeTo);
}
else if constexpr (decay_equiv_v<T, char>) // assumes char pointer
{
size_t length = strlen(t);
writeTo = (char*)memcpy(writeTo, t, length) + length;
}
if constexpr (is_ctv_v<T>) writeTo = (char*)mempcpy(writeTo, t.data(), t.size());
else if constexpr (decay_equiv_v<T, UString>) writeTo = (char*)mempcpy(writeTo, t.data(), t.size());
else if constexpr (std::is_integral_v<T>) writeTo = u_num2str64s(t, writeTo);
else if constexpr (std::is_floating_point_v<T>) writeTo = u_dtoa(t, writeTo);
else if constexpr (decay_equiv_v<T, char>) writeTo = (char*)mempcpy(writeTo, t, strlen(t));
else
{
auto lambda = [&] (const void *buffer, size_t bufferSize)
{
writeTo = (char*)memcpy(writeTo, buffer, bufferSize) + bufferSize;
//writeTo = (char*)memcpy(writeTo, buffer, bufferSize) + bufferSize;
writeTo = (char*)mempcpy(writeTo, buffer, bufferSize);
};
snprintf_specialization(lambda, t);
@ -3039,10 +3038,22 @@ protected:
template <typename T>
static size_t getLength(T t)
{
if constexpr (is_ctv_v<T>) return t.length;
else if constexpr (decay_equiv_v<T, UString>) return t.size();
else if constexpr (std::is_integral_v<T>) return countDigits(t);
else if constexpr (decay_equiv_v<T, char>) return strlen(t);
if constexpr (is_ctv_v<T>) return t.length;
else if constexpr (decay_equiv_v<T, UString>) return t.size();
else if constexpr (std::is_integral_v<T>)
{
static char buffer[64];
return u_num2str64s(t, buffer) - buffer;
}
else if constexpr (std::is_floating_point_v<T>)
{
static char buffer[64];
return u_dtoa(t, buffer) - buffer;
}
else if constexpr (decay_equiv_v<T, char>)
{
return strlen(t);
}
else
{
size_t length = 0;
@ -3062,7 +3073,7 @@ protected:
struct LengthSurplusPackage
{
size_t lengthSurplus;
T binding;
const T& binding;
size_t size()
{
@ -3071,12 +3082,10 @@ protected:
};
template<bool overwrite, typename... Ts>
static void snprintf_impl(size_t writePosition, UString& workingString, Ts... ts)
static void snprintf_impl(size_t writePosition, UString& workingString, Ts&&... ts)
{
size_t lengths = (getLength(ts) + ...);
char* target = workingString.data() + writePosition;
// grow string to accomodate new size if necessary
if constexpr (overwrite) workingString.reserve(lengths);
else
@ -3084,9 +3093,11 @@ protected:
workingString.reserve(workingString.size() + lengths);
// shift over existing contents
if (writePosition < workingString.size()) (void) memcpy(target + lengths, target, workingString.size() - writePosition);
if (writePosition < workingString.size()) (void) memcpy(workingString.data() + writePosition + lengths, workingString.data() + writePosition, workingString.size() - writePosition);
}
char* target = workingString.data() + writePosition;
(writeBytes(target, ts), ...);
if constexpr (overwrite) workingString.size_adjust_force(lengths);
@ -3095,24 +3106,8 @@ protected:
public:
template <typename IntegralType, typename = std::enable_if_t<std::is_integral_v<IntegralType>>>
static constexpr size_t countDigits(IntegralType number)
{
size_t digits = 0;
if (number < 0)
{
++digits;
number *= -1;
}
while (number != 0) { number /= 10; digits++; }
return digits;
}
template <auto format, bool overwrite = false, typename... Ts>
static void snprintf_pos(size_t writePosition, UString& workingString, Ts... ts)
static void snprintf_pos(size_t writePosition, UString& workingString, Ts&&... ts)
{
std::apply([&] (auto... params) {
@ -3124,13 +3119,13 @@ public:
}
template <auto format, typename... Ts>
static void snprintf(UString& workingString, Ts... ts)
static void snprintf(UString& workingString, Ts&&... ts)
{
snprintf_pos<format, true>(0, workingString, std::forward<Ts>(ts)...);
}
template <auto format, typename... Ts>
static void snprintf_add(UString& workingString, Ts... ts)
static void snprintf_add(UString& workingString, Ts&&... ts)
{
snprintf_pos<format, false>(workingString.size(), workingString, std::forward<Ts>(ts)...);
}
@ -3139,11 +3134,8 @@ public:
template<typename Lambda, typename T>
static void snprintf_specialization(Lambda&& lambda, UCompileTimeStringFormatter::LengthSurplusPackage<T>& t)
{
static char working[UCompileTimeStringFormatter::countDigits(INT64_MAX)];
char *end = u_num2str64s(t.size(), working);
lambda(working, end - working);
static char working[64];
lambda(working, u_num2str64s(t.size(), working) - working);
}
template<typename T>
@ -3156,6 +3148,10 @@ concept bool UStringType = requires(T string)
{
(std::is_same_v<T, UString> || is_ctv_v<T>);
};
inline bool operator==(const UString& lhs, const UCompileTimeStringType& rhs){ return lhs.equal(rhs.string); }
inline bool operator==(const UCompileTimeStringType& lhs, const UString& rhs){ return rhs.equal(lhs.string); }
# endif
#endif
#endif

View File

@ -0,0 +1,285 @@
#pragma once
// Function Time (ns) Speedup
// sprintf 194.225 1.00x
// ...
// branchlut 8.430 23.04x
// sse2 7.614 25.51x
// sse2 only faster above 9 digits
#include <stdint.h>
const char gDigitsLut[200] = {
'0','0','0','1','0','2','0','3','0','4','0','5','0','6','0','7','0','8','0','9',
'1','0','1','1','1','2','1','3','1','4','1','5','1','6','1','7','1','8','1','9',
'2','0','2','1','2','2','2','3','2','4','2','5','2','6','2','7','2','8','2','9',
'3','0','3','1','3','2','3','3','3','4','3','5','3','6','3','7','3','8','3','9',
'4','0','4','1','4','2','4','3','4','4','4','5','4','6','4','7','4','8','4','9',
'5','0','5','1','5','2','5','3','5','4','5','5','5','6','5','7','5','8','5','9',
'6','0','6','1','6','2','6','3','6','4','6','5','6','6','6','7','6','8','6','9',
'7','0','7','1','7','2','7','3','7','4','7','5','7','6','7','7','7','8','7','9',
'8','0','8','1','8','2','8','3','8','4','8','5','8','6','8','7','8','8','8','9',
'9','0','9','1','9','2','9','3','9','4','9','5','9','6','9','7','9','8','9','9'
};
// Branching for different cases (forward)
// Use lookup table of two digits
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmissing-declarations"
char* u32toa_branchlut(uint32_t value, char* buffer) {
if (value < 10000) {
const uint32_t d1 = (value / 100) << 1;
const uint32_t d2 = (value % 100) << 1;
if (value >= 1000)
*buffer++ = gDigitsLut[d1];
if (value >= 100)
*buffer++ = gDigitsLut[d1 + 1];
if (value >= 10)
*buffer++ = gDigitsLut[d2];
*buffer++ = gDigitsLut[d2 + 1];
}
else if (value < 100000000) {
// value = bbbbcccc
const uint32_t b = value / 10000;
const uint32_t c = value % 10000;
const uint32_t d1 = (b / 100) << 1;
const uint32_t d2 = (b % 100) << 1;
const uint32_t d3 = (c / 100) << 1;
const uint32_t d4 = (c % 100) << 1;
if (value >= 10000000)
*buffer++ = gDigitsLut[d1];
if (value >= 1000000)
*buffer++ = gDigitsLut[d1 + 1];
if (value >= 100000)
*buffer++ = gDigitsLut[d2];
*buffer++ = gDigitsLut[d2 + 1];
*buffer++ = gDigitsLut[d3];
*buffer++ = gDigitsLut[d3 + 1];
*buffer++ = gDigitsLut[d4];
*buffer++ = gDigitsLut[d4 + 1];
}
else {
// value = aabbbbcccc in decimal
const uint32_t a = value / 100000000; // 1 to 42
value %= 100000000;
if (a >= 10) {
const unsigned i = a << 1;
*buffer++ = gDigitsLut[i];
*buffer++ = gDigitsLut[i + 1];
}
else
*buffer++ = '0' + static_cast<char>(a);
const uint32_t b = value / 10000; // 0 to 9999
const uint32_t c = value % 10000; // 0 to 9999
const uint32_t d1 = (b / 100) << 1;
const uint32_t d2 = (b % 100) << 1;
const uint32_t d3 = (c / 100) << 1;
const uint32_t d4 = (c % 100) << 1;
*buffer++ = gDigitsLut[d1];
*buffer++ = gDigitsLut[d1 + 1];
*buffer++ = gDigitsLut[d2];
*buffer++ = gDigitsLut[d2 + 1];
*buffer++ = gDigitsLut[d3];
*buffer++ = gDigitsLut[d3 + 1];
*buffer++ = gDigitsLut[d4];
*buffer++ = gDigitsLut[d4 + 1];
}
*buffer++ = '\0';
return buffer;
}
void i32toa_branchlut(int32_t value, char* buffer) {
uint32_t u = static_cast<uint32_t>(value);
if (value < 0) {
*buffer++ = '-';
u = ~u + 1;
}
u32toa_branchlut(u, buffer);
}
char* u64toa_branchlut(uint64_t value, char* buffer) {
if (value < 100000000) {
uint32_t v = static_cast<uint32_t>(value);
if (v < 10000) {
const uint32_t d1 = (v / 100) << 1;
const uint32_t d2 = (v % 100) << 1;
if (v >= 1000)
*buffer++ = gDigitsLut[d1];
if (v >= 100)
*buffer++ = gDigitsLut[d1 + 1];
if (v >= 10)
*buffer++ = gDigitsLut[d2];
*buffer++ = gDigitsLut[d2 + 1];
}
else {
// value = bbbbcccc
const uint32_t b = v / 10000;
const uint32_t c = v % 10000;
const uint32_t d1 = (b / 100) << 1;
const uint32_t d2 = (b % 100) << 1;
const uint32_t d3 = (c / 100) << 1;
const uint32_t d4 = (c % 100) << 1;
if (value >= 10000000)
*buffer++ = gDigitsLut[d1];
if (value >= 1000000)
*buffer++ = gDigitsLut[d1 + 1];
if (value >= 100000)
*buffer++ = gDigitsLut[d2];
*buffer++ = gDigitsLut[d2 + 1];
*buffer++ = gDigitsLut[d3];
*buffer++ = gDigitsLut[d3 + 1];
*buffer++ = gDigitsLut[d4];
*buffer++ = gDigitsLut[d4 + 1];
}
}
else if (value < 10000000000000000) {
const uint32_t v0 = static_cast<uint32_t>(value / 100000000);
const uint32_t v1 = static_cast<uint32_t>(value % 100000000);
const uint32_t b0 = v0 / 10000;
const uint32_t c0 = v0 % 10000;
const uint32_t d1 = (b0 / 100) << 1;
const uint32_t d2 = (b0 % 100) << 1;
const uint32_t d3 = (c0 / 100) << 1;
const uint32_t d4 = (c0 % 100) << 1;
const uint32_t b1 = v1 / 10000;
const uint32_t c1 = v1 % 10000;
const uint32_t d5 = (b1 / 100) << 1;
const uint32_t d6 = (b1 % 100) << 1;
const uint32_t d7 = (c1 / 100) << 1;
const uint32_t d8 = (c1 % 100) << 1;
if (value >= 1000000000000000)
*buffer++ = gDigitsLut[d1];
if (value >= 100000000000000)
*buffer++ = gDigitsLut[d1 + 1];
if (value >= 10000000000000)
*buffer++ = gDigitsLut[d2];
if (value >= 1000000000000)
*buffer++ = gDigitsLut[d2 + 1];
if (value >= 100000000000)
*buffer++ = gDigitsLut[d3];
if (value >= 10000000000)
*buffer++ = gDigitsLut[d3 + 1];
if (value >= 1000000000)
*buffer++ = gDigitsLut[d4];
if (value >= 100000000)
*buffer++ = gDigitsLut[d4 + 1];
*buffer++ = gDigitsLut[d5];
*buffer++ = gDigitsLut[d5 + 1];
*buffer++ = gDigitsLut[d6];
*buffer++ = gDigitsLut[d6 + 1];
*buffer++ = gDigitsLut[d7];
*buffer++ = gDigitsLut[d7 + 1];
*buffer++ = gDigitsLut[d8];
*buffer++ = gDigitsLut[d8 + 1];
}
else {
const uint32_t a = static_cast<uint32_t>(value / 10000000000000000); // 1 to 1844
value %= 10000000000000000;
if (a < 10)
*buffer++ = '0' + static_cast<char>(a);
else if (a < 100) {
const uint32_t i = a << 1;
*buffer++ = gDigitsLut[i];
*buffer++ = gDigitsLut[i + 1];
}
else if (a < 1000) {
*buffer++ = '0' + static_cast<char>(a / 100);
const uint32_t i = (a % 100) << 1;
*buffer++ = gDigitsLut[i];
*buffer++ = gDigitsLut[i + 1];
}
else {
const uint32_t i = (a / 100) << 1;
const uint32_t j = (a % 100) << 1;
*buffer++ = gDigitsLut[i];
*buffer++ = gDigitsLut[i + 1];
*buffer++ = gDigitsLut[j];
*buffer++ = gDigitsLut[j + 1];
}
const uint32_t v0 = static_cast<uint32_t>(value / 100000000);
const uint32_t v1 = static_cast<uint32_t>(value % 100000000);
const uint32_t b0 = v0 / 10000;
const uint32_t c0 = v0 % 10000;
const uint32_t d1 = (b0 / 100) << 1;
const uint32_t d2 = (b0 % 100) << 1;
const uint32_t d3 = (c0 / 100) << 1;
const uint32_t d4 = (c0 % 100) << 1;
const uint32_t b1 = v1 / 10000;
const uint32_t c1 = v1 % 10000;
const uint32_t d5 = (b1 / 100) << 1;
const uint32_t d6 = (b1 % 100) << 1;
const uint32_t d7 = (c1 / 100) << 1;
const uint32_t d8 = (c1 % 100) << 1;
*buffer++ = gDigitsLut[d1];
*buffer++ = gDigitsLut[d1 + 1];
*buffer++ = gDigitsLut[d2];
*buffer++ = gDigitsLut[d2 + 1];
*buffer++ = gDigitsLut[d3];
*buffer++ = gDigitsLut[d3 + 1];
*buffer++ = gDigitsLut[d4];
*buffer++ = gDigitsLut[d4 + 1];
*buffer++ = gDigitsLut[d5];
*buffer++ = gDigitsLut[d5 + 1];
*buffer++ = gDigitsLut[d6];
*buffer++ = gDigitsLut[d6 + 1];
*buffer++ = gDigitsLut[d7];
*buffer++ = gDigitsLut[d7 + 1];
*buffer++ = gDigitsLut[d8];
*buffer++ = gDigitsLut[d8 + 1];
}
*buffer = '\0';
return buffer;
}
void i64toa_branchlut(int64_t value, char* buffer) {
uint64_t u = static_cast<uint64_t>(value);
if (value < 0) {
*buffer++ = '-';
u = ~u + 1;
}
u64toa_branchlut(u, buffer);
}
#pragma GCC diagnostic pop

View File

@ -50,11 +50,7 @@
# include <ulib/event/event_time.h>
#endif
#ifdef U_STDCPP_ENABLE
# if defined(HAVE_CXX14) && GCC_VERSION_NUM > 60100 && defined(HAVE_ARCH64)
# include "./itoa.h"
# endif
#else
#ifndef U_STDCPP_ENABLE
U_EXPORT bool __cxa_guard_acquire() { return 1; }
U_EXPORT bool __cxa_guard_release() { return 1; }
@ -98,8 +94,12 @@ const double u_pow10[309] = { // 1e-0...1e308: 309 * 8 bytes = 2472 bytes
#endif
#ifndef HAVE_OLD_IOSTREAM
# include "./dtoa.h"
# include "./dtoa.h" // NB: dtoa_milo.h break serialize test...
#endif
# if defined(HAVE_CXX14) && GCC_VERSION_NUM > 60100 && defined(HAVE_ARCH64)
# include "./itoa.h"
# endif
// #include "./branchlut.h" // NB: break json test...
static struct ustringrep u_empty_string_rep_storage = {
# ifdef DEBUG
@ -151,6 +151,11 @@ void ULib::init(char** argv, const char* mempool)
#ifndef HAVE_OLD_IOSTREAM
u_dbl2str = dtoa_rapidjson;
#endif
#if defined(HAVE_CXX14) && GCC_VERSION_NUM > 60100 && defined(HAVE_ARCH64)
u_num2str32 = itoa_fwd;
u_num2str64 = itoa_fwd;
#endif
#ifdef DEBUG
char buffer[32];
@ -164,12 +169,7 @@ void ULib::init(char** argv, const char* mempool)
U_INTERNAL_ASSERT_EQUALS(u_num2str64(1234567890, buffer)-buffer, 10)
U_INTERNAL_DUMP("buffer = %.10S", buffer)
U_INTERNAL_ASSERT_EQUALS(memcmp(buffer, "1234567890", 10), 0)
#endif
#if defined(HAVE_CXX14) && GCC_VERSION_NUM > 60100 && defined(HAVE_ARCH64)
u_num2str32 = itoa_fwd;
u_num2str64 = itoa_fwd;
U_INTERNAL_ASSERT_EQUALS(u_num2str64(1234567890, buffer)-buffer, 10)
U_INTERNAL_DUMP("buffer = %.10S", buffer)
U_INTERNAL_ASSERT_EQUALS(memcmp(buffer, "1234567890", 10), 0)

View File

@ -525,7 +525,17 @@ void UMemoryPool::deallocate(void* ptr, uint32_t length)
if (UFile::nr_hugepages == 0) // NB: MADV_DONTNEED cannot be applied to locked pages, Huge TLB pages, or VM_PFNMAP pages...
# endif
{
(void) U_SYSCALL(madvise, "%p,%lu,%d", (void*)ptr, length, MADV_DONTNEED); // causes the kernel to reclaim the indicated pages immediately and drop their contents
/**
* MADV_PAGEOUT can be used by a process to mark a memory range as not expected to be used for a long time so that kernel reclaims *any LRU* pages instantly.
* The hint can help kernel in deciding which pages to evict proactively. Access in the range after successful operation could cause major page fault but never
* lose the up-to-date contents unlike MADV_DONTNEED
*/
# ifndef MADV_PAGEOUT
# define MADV_PAGEOUT MADV_DONTNEED
# endif
(void) U_SYSCALL(madvise, "%p,%lu,%d", (void*)ptr, length, MADV_PAGEOUT); // causes the kernel to reclaim the indicated pages immediately and drop their contents
return;
}

View File

@ -17,7 +17,6 @@
uint32_t UREDISClient_Base::start;
ptrdiff_t UREDISClient_Base::diff;
UHashMap<void*>* UREDISClient_Base::pchannelCallbackMap;
UVector<UString>* UREDISClient_Base::pvec;
UREDISClient_Base* UREDISClient_Base::pthis;
UREDISClient_Base::~UREDISClient_Base()
@ -173,155 +172,76 @@ void UREDISClient_Base::manageResponseBufferResize(uint32_t n)
}
}
U_NO_EXPORT bool UREDISClient_Base::getResponseItem()
{
U_TRACE_NO_PARAM(0, "UREDISClient_Base::getResponseItem()")
char prefix;
uint32_t len;
const char* ptr1;
const char* ptr2;
// this start == size guards against reading upon every loop
if (start == UClient_Base::response.size() &&
UClient_Base::readResponse() == false)
{
U_RETURN(false);
}
U_INTERNAL_DUMP("start = %u (%.20S)", start, UClient_Base::response.c_pointer(start))
U_INTERNAL_ASSERT(memcmp(UClient_Base::response.c_pointer(start), U_CRLF, U_CONSTANT_SIZE(U_CRLF)))
prefix = UClient_Base::response.c_char(start++);
U_INTERNAL_DUMP("prefix = %C", prefix)
ptr1 =
ptr2 = UClient_Base::response.c_pointer(start);
if (prefix != U_RC_BULK && // "$15\r\nmy-value-tester\r\n"
prefix != U_RC_MULTIBULK) // "*2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n"
{
U_INTERNAL_ASSERT(prefix == U_RC_ANY || // '?'
prefix == U_RC_INT || // ':'
prefix == U_RC_ERROR || // '-'
prefix == U_RC_INLINE) // '+'
while (*ptr2 != '\r') ++ptr2;
len = ptr2-ptr1;
// U_RC_INLINE example -> +OK\r\n
// U_RC_INT example -> :0\r\n
// U_RC_ERROR example -> -Error message\r\n
pvec->push_back(UClient_Base::response.substr(ptr1, len));
start += len + U_CONSTANT_SIZE(U_CRLF);
U_RETURN(false);
}
if (ptr2[0] == '-')
{
U_INTERNAL_ASSERT_EQUALS(ptr2[1], '1')
U_INTERNAL_ASSERT_EQUALS(prefix, U_RC_BULK) // "$-1\r\n" (Null Bulk String)
pvec->push_back(UString::getStringNull());
start += (ptr2-ptr1) + 2 + U_CONSTANT_SIZE(U_CRLF);
U_RETURN(false);
}
len = u_strtoulp(&ptr2);
++ptr2;
U_INTERNAL_DUMP("len = %u ptr2 = %#.2S", len, ptr2-2)
U_INTERNAL_ASSERT_EQUALS(memcmp(ptr2-2, U_CRLF, U_CONSTANT_SIZE(U_CRLF)), 0)
if (prefix == U_RC_BULK) // "$15\r\nmy-value-tester\r\n"
{
len += U_CONSTANT_SIZE(U_CRLF);
while (len > (uint32_t)UClient_Base::response.remain(ptr2))
{
uint32_t d = UClient_Base::response.distance(ptr2);
manageResponseBufferResize(len);
if (UClient_Base::readResponse() == false)
{
U_RETURN(false);
}
ptr1 = UClient_Base::response.c_pointer(start);
ptr2 = UClient_Base::response.c_pointer(d);
}
pvec->push_back(UClient_Base::response.substr(ptr2, len-U_CONSTANT_SIZE(U_CRLF)));
start += (ptr2-ptr1) + len;
U_RETURN(false);
}
/**
* Ex: "*2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n"
*
* Only certain commands (especially those returning list of values) return multi-bulk replies, you can try by using LRANGE for example
* but you can check the command reference for more details. Usually multi-bulk replies are only 1-level deep but some Redis commands can
* return nested multi-bulk replies, notably EXEC (depending on the commands executed while inside the transaction context) and both
* EVAL / EVALSHA (depending on the value returned by the Lua script). Here is an example using EXEC:
*
* redis 127.0.0.1:6379> MULTI
* OK
* redis 127.0.0.1:6379> LPUSH metavars foo foobar hoge
* QUEUED
* redis 127.0.0.1:6379> LRANGE metavars 0 -1
* QUEUED
* redis 127.0.0.1:6379> EXEC
* 1) (integer) 4
* 2) 1) "hoge"
* 2) "foobar"
* 3) "foo"
* 4) "metavars"
*
* The second element of the multi-bulk reply to EXEC is a multi-bulk itself
*/
U_INTERNAL_ASSERT_EQUALS(prefix, U_RC_MULTIBULK)
start += (ptr2-ptr1);
for (uint32_t i = 0; i < len; ++i)
{
getResponseItem();
}
U_RETURN(true);
}
void UREDISClient_Base::processResponse()
{
U_TRACE_NO_PARAM(0, "UREDISClient_Base::processResponse()")
U_INTERNAL_DUMP("err = %d", err)
U_DUMP_CONTAINER(vitem);
U_INTERNAL_ASSERT_EQUALS(err, U_RC_OK)
// for now alwasys process from beginning
size_t index = 0;
char prefix;
uint32_t len;
const char* ptr1;
const char* ptr2;
start = 0;
pvec = &vitem;
while (index < UClient_Base::response.size())
{
prefix = UClient_Base::response.c_char(index++);
len = UClient_Base::response.size() - index + 1;
do {
getResponseItem();
ptr1 =
ptr2 = UClient_Base::response.c_pointer(index);
U_DUMP_CONTAINER(vitem)
while (*ptr2 != '\r') ++ptr2;
//*3\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$9\r\n127.0.0.1\r\n:7001\r\n$40\r\ncd153c9c98419c156db43b91f20464d0feaed1b7\r\n*3\r\n$9\r\n127.0.0.1\r\n:7003\r\n$40\r\n5
switch (prefix)
{
// :0\r\n
case U_RC_INT:
// -Error message\r\n
case U_RC_ERROR:
// +OK\r\n
case U_RC_INLINE:
{
len = ptr2-ptr1;
vitem.push_back(UClient_Base::response.substr(ptr1, len));
index += len + U_CONSTANT_SIZE(U_CRLF);
break;
}
case U_RC_BULK:
{
// $-1\r\n (Null Bulk String)
if (ptr1[0] == '-')
{
vitem.push_back(UString::getStringNull());
index += 2 + U_CONSTANT_SIZE(U_CRLF);
}
else
{
len = u_strtoul(ptr1, ptr2);
ptr2 += U_CONSTANT_SIZE(U_CRLF);
vitem.push_back(UClient_Base::response.substr(ptr2, len));
index += (ptr2 - ptr1) + len + U_CONSTANT_SIZE(U_CRLF);
}
break;
}
// *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n
case U_RC_MULTIBULK:
{
index += (ptr2 - ptr1) + U_CONSTANT_SIZE(U_CRLF);
break;
}
// never
default: break;
}
while (start < UClient_Base::response.size());
}
UClient_Base::response.setEmpty();
U_DUMP_CONTAINER(vitem);
}
bool UREDISClient_Base::processRequest(char recvtype)
@ -514,44 +434,38 @@ bool UREDISClient_Base::deleteKeys(const char* pattern, uint32_t len) // Delete
U_RETURN(true);
}
// define method VIRTUAL of class UEventFd
// by Victor Stewart
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX20) && defined(U_LINUX) && !defined(__clang__)
// this is called for subscribed channels
int UREDISClient_Base::handlerRead()
{
// BytesRead(100) = "*3\r\n$7\r\nmessage\r\n$19\r\n{ABC}.trafficSignal\r\n$1\r\n1\r\n*3\r\n$7\r\nmessage\r\n$19\r\n{DEF}.trafficSignal\r\n$1\r\n1\r\n"
U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerRead()")
if ((clear(), UClient_Base::response.setEmpty(), UClient_Base::readResponse(U_SINGLE_READ)))
{
char prefix = UClient_Base::response[0];
if (prefix != U_RC_MULTIBULK)
{
err = (prefix == U_RC_ERROR ? U_RC_ERROR
: U_RC_ERR_PROTOCOL);
U_RETURN(false);
}
err = U_RC_OK;
{
processResponse();
UString channel = vitem[1];
// [0] message
// [1] channel
// [2] payload
U_INTERNAL_ASSERT_POINTER(pchannelCallbackMap)
vPFcscs callback = (vPFcscs) pchannelCallbackMap->at(channel);
if (callback) callback(channel, vitem[2]);
for (size_t index = 0, n = vitem.size(); index < n; index += 3)
{
if (vitem[index] == "message"_ctv)
{
vPFcscs callback = (vPFcscs)pchannelCallbackMap->at(vitem[index + 1]);
if (callback) callback(vitem[index + 1], vitem[index + 2]);
}
}
}
U_RETURN(U_NOTIFIER_OK);
}
// by Victor Stewart
#if defined(U_STDCPP_ENABLE) && defined(U_LINUX) && defined(HAVE_CXX20) && !defined(__clang__)
ClusterError UREDISClusterMaster::checkResponseForClusterErrors(const UString& response, size_t offset)
{
U_TRACE_NO_PARAM(0, "checkResponseForClusterErrors()")
@ -577,91 +491,97 @@ void UREDISClusterMaster::calculateNodeMap()
{
U_TRACE_NO_PARAM(0, "UREDISClusterMaster::calculateNodeMap()")
/*
127.0.0.1:30001> cluster slots
1) 1) (integer) 0
2) (integer) 5460
3) 1) "127.0.0.1"
2) (integer) 30001
3) "09dbe9720cda62f7865eabc5fd8857c5d2678366"
4) 1) "127.0.0.1"
2) (integer) 30004
3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"
2) 1) (integer) 5461
2) (integer) 10922
3) 1) "127.0.0.1"
2) (integer) 30002
3) "c9d93d9f2c0c524ff34cc11838c2003d8c29e013"
4) 1) "127.0.0.1"
2) (integer) 30005
3) "faadb3eb99009de4ab72ad6b6ed87634c7ee410f"
3) 1) (integer) 10923
2) (integer) 16383
3) 1) "127.0.0.1"
2) (integer) 30003
3) "044ec91f325b7595e76dbcb18cc688b6a5b434a1"
4) 1) "127.0.0.1"
2) (integer) 30006
3) "58e6e48d41228013e5d9c1c37c5060693925e97e"
*/
// the first node in each array is the master
// currently we ignore slaves
bool findHashSlots = true;
uint16_t workingLowHashSlot;
uint16_t workingHighHashSlot;
managementClient->sendRequest("*2\r\n$7\r\nCLUSTER\r\n$5\r\nSLOTS\r\n"_ctv);
managementClient->response.setEmpty();
managementClient->readResponse(U_SINGLE_READ);
UString& response = managementClient->response;
U_WARNING("CLUSTER SLOTS response = %.*s", response.size(), response.data());
uint16_t lowHashSlot;
uint16_t highHashSlot;
UString compositeAddress(50U);
(void) managementClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS"));
UHashMap<RedisClusterNode *> *newNodes;
U_NEW(UHashMap<RedisClusterNode *>, newNodes, UHashMap<RedisClusterNode *>);
const UVector<UString>& rawNodes = managementClient->vitem;
size_t index = response.find('\r', 0) + 2, workingEnd;
for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2)
auto extractNumber = [&] (size_t startOfNumber) -> size_t
{
const UString& first = rawNodes[a];
const UString& second = rawNodes[a+1];
workingEnd = response.find('\r', startOfNumber);
index = workingEnd + 2; // lands on next prefix
return response.substr(startOfNumber, workingEnd - startOfNumber).strtoul();
};
if (findHashSlots)
{
if (first.isNumber() && second.isNumber())
{
workingLowHashSlot = first.strtoul();
workingHighHashSlot = second.strtoul();
while (index < response.size())
{
// *4
// :0
// :5460
// *3
// $9
// 127.0.0.1
// :7000
// $40
// c4869fd1346c4731d2980f46a3dd934627d5dbb4
// *3
// $9
// 127.0.0.1
// :7005
// $40
// 576ce7f8a927b00bb191344f6a167e6615569587
findHashSlots = false;
}
// *4
++index; // skip over prefix
int16_t slaveCount = extractNumber(index) - 3;
// :0
++index; // skip over prefix
lowHashSlot = extractNumber(index);
// :5460
++index; // skip over prefix
highHashSlot = extractNumber(index);
// skip to length of address
index = response.find('$', index) + 1;
size_t lengthOfAddress = extractNumber(index);
UString address = response.substr(index, lengthOfAddress);
uint16_t port = extractNumber(index + lengthOfAddress + 3);
UCompileTimeStringFormatter::snprintf<"{}.{}"_ctv>(compositeAddress, address, port);
RedisClusterNode *workingNode = clusterNodes ? clusterNodes->erase(compositeAddress) : U_NULLPTR;
// in the case of MOVE some nodes will be new, but others we'll already be connected to
if (workingNode)
{
workingNode->lowHashSlot = lowHashSlot;
workingNode->highHashSlot = highHashSlot;
}
else
{
UString compositeAddress(50U);
compositeAddress.snprintf(U_CONSTANT_TO_PARAM("%v.%v"), first.rep, second.rep);
RedisClusterNode *workingNode = clusterNodes ? clusterNodes->erase(compositeAddress) : U_NULLPTR;
// in the case of MOVE some nodes will be new, but others we'll already be connected to
if (workingNode)
{
workingNode->lowHashSlot = workingLowHashSlot;
workingNode->highHashSlot = workingHighHashSlot;
}
else
{
U_NEW(RedisClusterNode, workingNode, RedisClusterNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot));
}
newNodes->insert(compositeAddress, workingNode);
findHashSlots = true;
{
U_NEW(RedisClusterNode, workingNode, RedisClusterNode(this, address, port, lowHashSlot, highHashSlot));
}
newNodes->insert(compositeAddress, workingNode);
// scan til we hit the beginning of the next node cluster
do { index = response.find('*', index + 1); } while ((--slaveCount) > -1);
}
// if any nodes were taken offline, the clients would've disconnected by default
if (clusterNodes) U_DELETE(clusterNodes);
clusterNodes = newNodes;
managementClient->UClient_Base::response.setEmpty();
response.setEmpty();
}
bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
@ -671,11 +591,11 @@ bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
if (managementClient->connect(host, _port))
{
calculateNodeMap();
RedisClusterNode *randomNode = clusterNodes->randomElement();
if (randomNode)
{
{
subscriptionClient->connect(randomNode->ipAddress.c_str(), randomNode->port);
U_INTERNAL_ASSERT_EQUALS(UREDISClient_Base::pchannelCallbackMap, U_NULLPTR)
@ -683,97 +603,77 @@ bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
U_NEW(UHashMap<void*>, UREDISClient_Base::pchannelCallbackMap, UHashMap<void*>());
subscriptionClient->UEventFd::fd = subscriptionClient->getFd();
subscriptionClient->UEventFd::op_mask |= EPOLLET;
subscriptionClient->UEventFd::op_mask |= EPOLLET;
U_DUMP("subscriptionClient = %p", subscriptionClient);
UServer_Base::addHandlerEvent(subscriptionClient);
U_RETURN(true);
}
}
}
U_RETURN(false);
}
bool UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels
void UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels
{
U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep)
if (subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{
(void)subscriptionClient->UREDISClient_Base::pchannelCallbackMap->erase(channel);
U_RETURN(true);
}
U_RETURN(false);
subscriptionClient->sendRequest(U_CTV_TO_PARAM("UNSUBSCRIBE "_ctv), channel);
(void)subscriptionClient->pchannelCallbackMap->erase(channel);
}
bool UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels
void UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels
{
U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback)
if (subscriptionClient->processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{
subscriptionClient->UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback);
U_RETURN(true);
}
U_RETURN(false);
subscriptionClient->sendRequest(U_CTV_TO_PARAM("SUBSCRIBE "_ctv), channel);
subscriptionClient->pchannelCallbackMap->insert(channel, (const void*)callback);
}
static void getNextMark(const UString& string, size_t& marker)
static void getNextCommandResponse(const UString& string, size_t& marker)
{
int8_t depth = 0;
char prefix;
const char *pointer1, *pointer2;
loop:
prefix = string.c_char(marker++);
pointer1 = pointer2 = string.c_pointer(marker);
switch (prefix)
do
{
// :0\r\n
case U_RC_INT:
// +OK\r\n
case U_RC_INLINE:
// -MOVED 3999 127.0.0.1:6381\r\n
case U_RC_ERROR:
{
while (*pointer2 != '\r') ++pointer2;
marker += (pointer2-pointer1) + U_CONSTANT_SIZE(U_CRLF);
break;
}
// $-1\r\n (Null Bulk String)
// $15\r\nmy-value-tester\r\n
case U_RC_BULK:
{
if (*pointer2 == '-')
{
marker += 2 + U_CONSTANT_SIZE(U_CRLF);
}
else
{
while (*pointer2 != '\r') ++pointer2;
marker += (pointer2-pointer1) + U_CONSTANT_SIZE(U_CRLF) + u_strtol(pointer1, pointer2) + U_CONSTANT_SIZE(U_CRLF);
}
break;
}
// *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n
case U_RC_MULTIBULK:
{
while (*pointer2 != '\r') ++pointer2;
depth += u_strtol(pointer1, pointer2);
break;
}
}
prefix = string.c_char(marker++);
pointer1 = pointer2 = string.c_pointer(marker);
depth--;
if (depth > -1) goto loop;
while (*pointer2 != '\r') ++pointer2;
marker += (pointer2-pointer1) + U_CONSTANT_SIZE(U_CRLF);
switch (prefix)
{
// :0\r\n
case U_RC_INT:
// +OK\r\n
case U_RC_INLINE:
// -MOVED 3999 127.0.0.1:6381\r\n
case U_RC_ERROR: break;
// $-1\r\n
// $15\r\nmy-value-tester\r\n
case U_RC_BULK:
{
ssize_t length = u_strtol(pointer1, pointer2);
if (length > -1) marker += length + U_CONSTANT_SIZE(U_CRLF);
break;
}
// *2\r\n$10\r\n1439822796\r\n$6\r\n311090\r\n
case U_RC_MULTIBULK:
{
depth += u_strtol(pointer1, pointer2);
break;
}
}
depth--;
} while (depth > -1);
}
const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClusterPipeline& pipeline, const bool reorderable)
@ -782,7 +682,10 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
U_DUMP("pipeline = %v", pipeline.pipeline.rep);
managementClient->vitem.clear();
static UString workingString(500U);
workingString.setEmpty();
workingString.reserve(pipeline.pipeline.size() * 1.5);
// if reorderable == false, commands are grouped and pushed SEQUENTIALLY BY HASHSLOT. even if other commands point to hashslots on the same cluster node, we are unable to garuntee ordering since Redis only checks for -MOVED etc errors command by command as it executes them, and does not fail upon reaching a -MOVED etc error. this requires waiting for each response, to ensure no errors occured, before moving onto the next batch of commands.
@ -804,79 +707,32 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
it++;
// if end, fall through
if (UNLIKELY(it == pipeline.spans.end())) goto retryPush;
if (UNLIKELY(it == pipeline.spans.end())) goto push;
}
else
{
retryPush:
// push
workingClient->sendRequest(workingString);
workingClient->UClient_Base::response.setEmpty();
workingClient->UClient_Base::readResponse();
push:
// bweare: if the wrong hashable key is passed, this will loop forever.
switch (checkResponseForClusterErrors(workingClient->UClient_Base::response, 0))
// returns when no error. might change client
workingClient = sendToCluster<true>(workingHashslot, workingString, workingClient);
workingString.setEmpty();
UString& workingClientBuffer = workingClient->UClient_Base::response;
UString& masterBuffer = managementClient->UClient_Base::response;
masterBuffer.reserve(masterBuffer.size() + workingClientBuffer.size());
masterBuffer.append(workingClientBuffer);
workingClientBuffer.setEmpty();
if (LIKELY(it != pipeline.spans.end()))
{
case ClusterError::none:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::none", reorderable);
workingString.setEmpty();
UString& workingClientBuffer = workingClient->UClient_Base::response;
UString& masterBuffer = managementClient->UClient_Base::response;
masterBuffer.reserve(masterBuffer.size() + workingClientBuffer.size());
masterBuffer.append(workingClientBuffer);
workingClientBuffer.setEmpty();
if (LIKELY(it != pipeline.spans.end()))
{
workingHashslot = it->hashslot;
workingClient = clientForHashslot(workingHashslot);
goto startNoReorder;
}
else goto finish;
break;
}
case ClusterError::moved:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::moved", reorderable);
workingClient->UClient_Base::response.setEmpty();
calculateNodeMap();
workingClient = clientForHashslot(workingHashslot);
break;
}
case ClusterError::ask:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::ask", reorderable);
// -ASK 3999 127.0.0.1:6381
uint32_t _start = workingClient->UClient_Base::response.find(' ', U_CONSTANT_SIZE("-ASK 3999")) + 1,
end = workingClient->UClient_Base::response.find(':', _start);
const UString& ip = workingClient->UClient_Base::response.substr(_start, end - _start);
workingClient->UClient_Base::response.setEmpty();
workingClient = clientForIP(ip);
break;
}
case ClusterError::tryagain:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::tryagain", reorderable);
UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms
break;
}
workingHashslot = it->hashslot;
workingClient = clientForHashslot(workingHashslot);
goto startNoReorder;
}
goto retryPush;
else goto finish;
}
}
}
@ -889,8 +745,9 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
UREDISClusterClient *client;
size_t ordering;
uint8_t numberOfCommands;
ExecutionMapping(UREDISClusterClient *_client, size_t _ordering) : client(_client), ordering(_ordering) {}
ExecutionMapping(UREDISClusterClient *_client, size_t _ordering, uint8_t _numberOfCommands) : client(_client), ordering(_ordering), numberOfCommands(_numberOfCommands) {}
};
// copy spans
@ -910,13 +767,14 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
goto startReorder;
while (it != spans.end())
{
{
if (clientForHashslot(it->hashslot) == workingClient)
{
startReorder:
workingString.append(pipeline.pipeline.data() + it->beginning, it->end - it->beginning);
spanToExeuction.insert_or_assign(it->index, ExecutionMapping(workingClient, executionCount++));
spanToExeuction.insert_or_assign(it->index, ExecutionMapping(workingClient, executionCount, it->commandCount));
executionCount += it->commandCount;
it = spans.erase(it);
}
@ -924,41 +782,45 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
}
touchedClients.insert_or_assign(workingClient, true);
workingClient->sendRequest(workingString);
// we'll get here once having exhausted the commands for a given node
if (spans.size() > 0) goto rinseAndRepeat;
struct Mark {
struct CommandResponse {
size_t beginning, ending;
};
size_t totalBufferSize = 0;
std::unordered_map<UREDISClusterClient*, std::vector<Mark>> marksByClient;
std::unordered_map<UREDISClusterClient*, std::vector<CommandResponse>> commandResponsesByClient;
for (const auto& [touchedClient, meaninglessFlag] : touchedClients)
{
touchedClient->UClient_Base::readResponse();
{
UString& clientBuffer = touchedClient->UClient_Base::response;
touchedClient->UClient_Base::readResponse();
size_t clientBufferSize = clientBuffer.size();
totalBufferSize += clientBufferSize;
size_t marker = 0;
std::vector<Mark>& marks = marksByClient[touchedClient];
std::vector<CommandResponse> commandResponses;
while (marker < clientBufferSize)
{
size_t beginning = marker;
getNextMark(clientBuffer, marker);
getNextCommandResponse(clientBuffer, marker);
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), mark, [%lu, %lu)", reorderable, beginning, marker);
marks.push_back({beginning, marker});
commandResponses.push_back({beginning, marker});
}
(void)commandResponsesByClient.insert_or_assign(touchedClient, commandResponses);
}
UString& masterBuffer = managementClient->UClient_Base::response;
masterBuffer.reserve(totalBufferSize);
for (size_t index = 0; index < pipeline.spans.size(); index++)
@ -966,35 +828,31 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
const ExecutionMapping& mapping = spanToExeuction.at(index);
UString* clientBuffer = &mapping.client->UClient_Base::response;
Mark& mark = marksByClient[mapping.client][mapping.ordering];
size_t beginning = mark.beginning;
size_t ending = mark.ending;
std::vector<CommandResponse>& commandResponses = commandResponsesByClient[mapping.client];
size_t beginning = commandResponses[mapping.ordering].beginning;
size_t ending = commandResponses[(mapping.ordering + mapping.numberOfCommands - 1)].ending;
checkAgain:
workingClient = 0;
U_DUMP("response = %.*s", ending - beginning, clientBuffer->data() + beginning);
workingClient = 0;
switch (checkResponseForClusterErrors(*clientBuffer, beginning))
{
case ClusterError::none:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::none", reorderable);
masterBuffer.append(clientBuffer->data() + beginning, ending - beginning);
continue;
}
case ClusterError::moved:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::moved", reorderable);
calculateNodeMap();
break;
}
case ClusterError::ask:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::ask", reorderable);
uint32_t _start = clientBuffer->find(' ', beginning + U_CONSTANT_SIZE("-ASK 3999")) + 1,
_end = clientBuffer->find(':', _start);
@ -1003,7 +861,6 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
}
case ClusterError::tryagain:
{
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), ClusterError::tryagain", reorderable);
UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms
break;
}
@ -1016,7 +873,6 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
clientBuffer = &workingClient->UClient_Base::response;
beginning = clientBuffer->size();
workingString.setEmpty();
workingString.append(pipeline.pipeline.data() + span.beginning, span.end - span.beginning);
@ -1030,7 +886,7 @@ const UVector<UString>& UREDISClusterMaster::clusterAnonMulti(const AnonymousClu
}
finish:
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), finished. masterBuffer = %v", reorderable, managementClient->UClient_Base::response.rep);
U_DUMP("UREDISClusterMaster::clusterAnonMulti(%b), masterBuffer = %v", reorderable, managementClient->UClient_Base::response.rep);
managementClient->UREDISClient_Base::processResponse();
@ -1058,4 +914,4 @@ const char* UREDISClient_Base::dump(bool _reset) const
return U_NULLPTR;
}
#endif
# endif

View File

@ -257,6 +257,9 @@ retry:
(void) U_SYSCALL(sigaction, "%d,%p,%p", SIGSEGV, old + SIGSEGV, U_NULLPTR);
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wsuggest-attribute=noreturn"
RETSIGTYPE UInterrupt::handlerSegvWithInfo(int signo, siginfo_t* info, void* context)
{
if (u_recursion == false) // NB: maybe recursion occurs...
@ -292,6 +295,8 @@ RETSIGTYPE UInterrupt::handlerSegvWithInfo(int signo, siginfo_t* info, void* con
::abort();
}
#pragma GCC diagnostic pop
void UInterrupt::getSignalInfo(int signo, siginfo_t* info)
{
U_TRACE(0, "UInterrupt::getSignalInfo(%d,%p)", signo, info)

View File

@ -1 +1 @@
06B8
06D4

View File

@ -1,7 +1,7 @@
{ Certificate Revocation List (CRL):
Version 2 (0x1)
Signature Algorithm: sha1WithRSAEncryption
Issuer: /C=IT/ST=Italy/L=Sesto Fiorentino/O=Unirel spa/OU=CA/CN=localhost.localdomain/emailAddress=root@localhost.localdomain
Signature Algorithm: sha1WithRSAEncryption
Issuer: C=IT, ST=Italy, L=Sesto Fiorentino, O=Unirel spa, OU=CA, CN=localhost.localdomain/emailAddress=root@localhost.localdomain
Last Update: Aug 16 12:20:07 2010 GMT
Next Update: Jul 29 12:20:07 2021 GMT
CRL extensions:

View File

@ -1277,7 +1277,7 @@ static void test_stream_01()
char buffer[1024];
ostrstream ostrs01(buffer, sizeof(buffer));
ostrs01 << str01 << '\0';
U_ASSERT( str01 == ostrs01.str() )
U_ASSERT( str01.equal(ostrs01.str()) )
#endif
}
@ -1429,7 +1429,7 @@ static void test_stream_09()
#ifdef U_STDCPP_ENABLE
ostrstream oss1(buffer, sizeof(buffer));
oss1 << foo << '\0';
U_ASSERT( foo == oss1.str() )
U_ASSERT( foo.equal(oss1.str()) )
ostrstream oss2(buffer, sizeof(buffer));
oss2.width(20);