1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
This commit is contained in:
stefanocasazza 2019-09-03 18:34:39 +02:00
parent 4f2fe33361
commit 28a3a852a8
10 changed files with 304 additions and 243 deletions

View File

@ -63,7 +63,7 @@
typedef void (*vPFcs) (const UString&); typedef void (*vPFcs) (const UString&);
typedef void (*vPFcscs)(const UString&,const UString&); typedef void (*vPFcscs)(const UString&,const UString&);
class UREDISClusterClient; class UREDISClusterMaster;
class U_EXPORT UREDISClient_Base : public UClient_Base, UEventFd { class U_EXPORT UREDISClient_Base : public UClient_Base, UEventFd {
public: public:
@ -822,7 +822,7 @@ protected:
} }
void init(); void init();
virtual void processResponse(); void processResponse();
bool processRequest(char recvtype); bool processRequest(char recvtype);
bool sendRequest(const UString& pipeline) bool sendRequest(const UString& pipeline)
@ -898,7 +898,7 @@ protected:
private: private:
bool getResponseItem() U_NO_EXPORT; bool getResponseItem() U_NO_EXPORT;
friend class UREDISClusterClient; friend class UREDISClusterMaster;
// U_DISALLOW_COPY_AND_ASSIGN(UREDISClient_Base) // U_DISALLOW_COPY_AND_ASSIGN(UREDISClient_Base)
}; };
@ -925,6 +925,7 @@ public:
#endif #endif
private: private:
// U_DISALLOW_COPY_AND_ASSIGN(UREDISClient) // U_DISALLOW_COPY_AND_ASSIGN(UREDISClient)
}; };
@ -978,24 +979,29 @@ private:
// by Victor Stewart // by Victor Stewart
#if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX17) #if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX17)
# include <vector>
class U_EXPORT UREDISClusterClient : protected UREDISClient<UTCPSocket> { class U_EXPORT UREDISClusterClient : public UREDISClient<UTCPSocket> {
private: private:
struct RedisNode { UREDISClusterMaster *master;
public:
void processResponse();
UREDISClusterClient(UREDISClusterMaster *_master) : UREDISClient<UTCPSocket>(), master(_master) {}
};
struct RedisClusterNode {
UString ipAddress; UString ipAddress;
UREDISClient<UTCPSocket> client; UREDISClusterClient client;
uint16_t port, lowHashSlot, highHashSlot; uint16_t port, lowHashSlot, highHashSlot;
RedisNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot, UREDISClusterMaster *master) : ipAddress(_ipAddress), client(master), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot)
{ {
client.connect(ipAddress.data(), port); client.connect(ipAddress.data(), port);
} }
// DEBUG
#if defined(U_STDCPP_ENABLE) && defined(DEBUG) #if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* dump(bool _reset) const { return ""; } const char* dump(bool _reset) const { return ""; }
#endif #endif
@ -1008,15 +1014,21 @@ private:
tryagain tryagain
}; };
class U_EXPORT UREDISClusterMaster {
private:
friend class UREDISClusterClient;
ClusterError error; ClusterError error;
UString temporaryASKip; UString temporaryASKip;
UHashMap<RedisNode *> redisNodes; UREDISClusterClient subscriptionClient;
UHashMap<RedisClusterNode *> clusterNodes; // when these call they need to be processed... also when MOVED... we need to set up and recalculate
uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); } uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); }
uint16_t hashslotFromCommand(const UString& command) uint16_t hashslotFromCommand(const UString& command)
{ {
U_TRACE(0, "UREDISClusterClient::hashslotFromCommand(%V)", command.rep) U_TRACE(0, "UREDISClusterMaster::hashslotFromCommand(%V)", command.rep)
// expects hashable keys to be delivered as abc{hashableKey}xyz value blah \r\n // expects hashable keys to be delivered as abc{hashableKey}xyz value blah \r\n
@ -1026,53 +1038,43 @@ private:
return hashslotForKey(command.substr(beginning, end - beginning)); return hashslotForKey(command.substr(beginning, end - beginning));
} }
UREDISClient<UTCPSocket>& clientForHashslot(uint16_t hashslot) UREDISClusterClient& clientForHashslot(uint16_t hashslot)
{ {
U_TRACE(0, "UREDISClusterClient::clientForHashslot(%u)", hashslot) U_TRACE(0, "UREDISClusterMaster::clientForHashslot(%u)", hashslot)
for (UHashMapNode *node : redisNodes) for (UHashMapNode *node : clusterNodes)
{ {
RedisNode* workingNode = (RedisNode *)(node->elem); RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem);
if ((workingNode->lowHashSlot <= hashslot) || (workingNode->highHashSlot >= hashslot)) return workingNode->client; if ((workingNode->lowHashSlot <= hashslot) && (workingNode->highHashSlot >= hashslot)) return workingNode->client;
} }
return *this; // never reached return subscriptionClient; // never reached
} }
UREDISClient<UTCPSocket>& clientForASKip() UREDISClusterClient& clientForASKip()
{ {
for (UHashMapNode *node : redisNodes) for (UHashMapNode *node : clusterNodes)
{ {
RedisNode* workingNode = (RedisNode *)(node->elem); RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem);
if (temporaryASKip == workingNode->ipAddress) return workingNode->client; if (temporaryASKip == workingNode->ipAddress) return workingNode->client;
} }
return *this; // never reached return subscriptionClient; // never reached
} }
UREDISClient<UTCPSocket>& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } UREDISClusterClient& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); }
public:
UREDISClusterClient() : UREDISClient<UTCPSocket>()
{
U_TRACE_CTOR(0, UREDISClusterClient, "")
}
~UREDISClusterClient()
{
U_TRACE_DTOR(0, UREDISClusterClient)
}
void processResponse() final;
void calculateNodeMap();
bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379);
template<bool silence> template<bool silence>
const UVector<UString>& processPipeline(UString& pipeline, bool reorderable); const UVector<UString>& processPipeline(UString& pipeline, bool reorderable);
void calculateNodeMap();
public:
bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379);
// all of these multis require all keys to exist within a single hash slot (on the same node isn't good enough) // all of these multis require all keys to exist within a single hash slot (on the same node isn't good enough)
UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).single(pipeline); } UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).single(pipeline); }
@ -1093,18 +1095,10 @@ public:
bool clusterUnsubscribe(const UString& channel); bool clusterUnsubscribe(const UString& channel);
bool clusterSubscribe( const UString& channel, vPFcscs callback); bool clusterSubscribe( const UString& channel, vPFcscs callback);
// DEBUG UREDISClusterMaster() : subscriptionClient(this) {}
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* dump(bool _reset) const { return UREDISClient_Base::dump(_reset); }
#endif
private:
U_DISALLOW_COPY_AND_ASSIGN(UREDISClusterClient)
}; };
extern template const UVector<UString>& UREDISClusterClient::processPipeline<true>(UString& pipeline, bool reorderable); extern template const UVector<UString>& UREDISClusterMaster::processPipeline<true>(UString& pipeline, bool reorderable);
extern template const UVector<UString>& UREDISClusterClient::processPipeline<false>(UString& pipeline, bool reorderable); extern template const UVector<UString>& UREDISClusterMaster::processPipeline<false>(UString& pipeline, bool reorderable);
#endif #endif
#endif #endif

View File

@ -73,7 +73,7 @@ public:
U_INTERNAL_ASSERT_POINTER(client) U_INTERNAL_ASSERT_POINTER(client)
return UWebSocket::sendData(client->UClient_Base::socket, type, msg); return UWebSocket::sendData(false, client->UClient_Base::socket, type, msg);
} }
void close() void close()
@ -82,7 +82,7 @@ public:
U_INTERNAL_ASSERT_POINTER(client) U_INTERNAL_ASSERT_POINTER(client)
(void) UWebSocket::sendClose(client->UClient_Base::socket); (void) UWebSocket::sendClose(false, client->UClient_Base::socket);
client->UClient_Base::close(); client->UClient_Base::close();
} }

View File

@ -72,13 +72,13 @@ public:
} WebSocketFrameData; } WebSocketFrameData;
static bool checkForInitialData(); static bool checkForInitialData();
static bool sendData(USocket* socket, int type, const char* data, uint32_t len); static bool sendData(const bool isServer, USocket* socket, int type, const char* data, uint32_t len);
static bool sendData(USocket* socket, int type, const UString& data) { return sendData(socket, type, U_STRING_TO_PARAM(data)); } static bool sendData(const bool isServer, USocket* socket, int type, const UString& data) { return sendData(isServer, socket, type, U_STRING_TO_PARAM(data)); }
static bool sendClose(USocket* socket) static bool sendClose(const bool isServer, USocket* socket)
{ {
U_TRACE(0, "UWebSocket::sendClose(%p)", socket) U_TRACE(0, "UWebSocket::sendClose(%b,%p)", isServer, socket)
// Send server-side closing handshake // Send server-side closing handshake
@ -87,7 +87,7 @@ public:
unsigned char status_code_buffer[2] = { (unsigned char)((status_code >> 8) & 0xFF), unsigned char status_code_buffer[2] = { (unsigned char)((status_code >> 8) & 0xFF),
(unsigned char)( status_code & 0xFF) }; (unsigned char)( status_code & 0xFF) };
if (sendControlFrame(socket, U_WS_OPCODE_CLOSE, status_code_buffer, sizeof(status_code_buffer))) U_RETURN(true); if (sendControlFrame(isServer, socket, U_WS_OPCODE_CLOSE, status_code_buffer, sizeof(status_code_buffer))) U_RETURN(true);
U_RETURN(false); U_RETURN(false);
} }
@ -208,7 +208,7 @@ private:
static bool sendAccept(USocket* socket); static bool sendAccept(USocket* socket);
static RETSIGTYPE handlerForSigTERM(int signo); static RETSIGTYPE handlerForSigTERM(int signo);
static int handleDataFraming(UString* pbuffer, USocket* socket); static int handleDataFraming(UString* pbuffer, USocket* socket);
static bool sendControlFrame(USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length); static bool sendControlFrame(const bool isServer, USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length);
U_DISALLOW_COPY_AND_ASSIGN(UWebSocket) U_DISALLOW_COPY_AND_ASSIGN(UWebSocket)

View File

@ -549,15 +549,57 @@ int UREDISClient_Base::handlerRead()
U_RETURN(U_NOTIFIER_OK); U_RETURN(U_NOTIFIER_OK);
} }
#if defined(U_STDCPP_ENABLE)
// by Victor Stewart // by Victor Stewart
# if defined(HAVE_CXX17) # if defined(HAVE_CXX17)
void UREDISClusterClient::calculateNodeMap()
{
U_TRACE_NO_PARAM(0, "UREDISClusterClient::calculateNodeMap()")
void UREDISClusterClient::processResponse()
{
U_TRACE_NO_PARAM(0, "UREDISClusterClient::processResponse()")
if (UClient_Base::response.find("-MOVED", 0, 6) != U_NOT_FOUND)
{
// MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node
master->error = ClusterError::moved;
master->calculateNodeMap();
}
else if (UClient_Base::response.find("-ASK", 1, 4) != U_NOT_FOUND)
{
// ASK 3999 127.0.0.1:6381 => this means that one of the hash slots is being migrated to another server
master->error = ClusterError::ask;
uint32_t _start = UClient_Base::response.find(' ', 8) + 1,
end = UClient_Base::response.find(':', _start);
(void)master->temporaryASKip.assign(UClient_Base::response.substr(_start, end - _start));
}
else if (UClient_Base::response.find("-TRYAGAIN", 0, 9) != U_NOT_FOUND)
{
//
// * during a resharding the multi-key operations targeting keys that all exist and are all still in the same node (either the source or destination node) are still available.
// * Operations on keys that don't exist or are - during the resharding - split between the source and destination nodes, will generate a -TRYAGAIN error. The client can try
// * the operation after some time, or report back the error. As soon as migration of the specified hash slot has terminated, all multi-key operations are available again for
// * that hash slot
master->error = ClusterError::tryagain;
UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms
}
else
{
master->error = ClusterError::none;
UREDISClient_Base::processResponse();
}
}
void UREDISClusterMaster::calculateNodeMap()
{
U_TRACE_NO_PARAM(0, "UREDISClusterMaster::calculateNodeMap()")
/* /*
127.0.0.1:30001> cluster slots 127.0.0.1:30001> cluster slots
1) 1) (integer) 0 1) 1) (integer) 0
@ -586,14 +628,16 @@ void UREDISClusterClient::calculateNodeMap()
3) "58e6e48d41228013e5d9c1c37c5060693925e97e" 3) "58e6e48d41228013e5d9c1c37c5060693925e97e"
*/ */
// the first node in each array is the master
bool findHashSlots = true; bool findHashSlots = true;
uint16_t workingLowHashSlot; uint16_t workingLowHashSlot;
uint16_t workingHighHashSlot; uint16_t workingHighHashSlot;
(void) UREDISClient_Base::processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); (void) subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS"));
UHashMap<RedisNode *> newNodes; UHashMap<RedisClusterNode *> newNodes;
const UVector<UString>& rawNodes = UREDISClient_Base::vitem; const UVector<UString>& rawNodes = subscriptionClient.vitem;
for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2) for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2)
{ {
@ -612,12 +656,10 @@ void UREDISClusterClient::calculateNodeMap()
} }
else else
{ {
// first node in the array is the master
UString compositeAddress(50U); UString compositeAddress(50U);
compositeAddress.snprintf(U_CONSTANT_TO_PARAM("%v.%v"), first.rep, second.rep); compositeAddress.snprintf(U_CONSTANT_TO_PARAM("%v.%v"), first.rep, second.rep);
RedisNode *workingNode = redisNodes.erase(compositeAddress); RedisClusterNode *workingNode = clusterNodes.erase(compositeAddress);
// in the case of MOVE some nodes will be new, but others we'll already be connected to // in the case of MOVE some nodes will be new, but others we'll already be connected to
if (workingNode) if (workingNode)
@ -625,7 +667,7 @@ void UREDISClusterClient::calculateNodeMap()
workingNode->lowHashSlot = workingLowHashSlot; workingNode->lowHashSlot = workingLowHashSlot;
workingNode->highHashSlot = workingHighHashSlot; workingNode->highHashSlot = workingHighHashSlot;
} }
else workingNode = new RedisNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot); else workingNode = new RedisClusterNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot, this);
newNodes.insert(compositeAddress, workingNode); newNodes.insert(compositeAddress, workingNode);
@ -636,22 +678,22 @@ void UREDISClusterClient::calculateNodeMap()
} }
// if any nodes were taken offline, the clients would've disconnected by default // if any nodes were taken offline, the clients would've disconnected by default
redisNodes.assign(newNodes); clusterNodes.assign(newNodes);
} }
bool UREDISClusterClient::connect(const char* host, unsigned int _port) bool UREDISClusterMaster::connect(const char* host, unsigned int _port)
{ {
U_TRACE(0, "UREDISClusterClient::connect(%S,%u)", host, _port) U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port)
if (UREDISClient<UTCPSocket>::connect(host, _port)) if (subscriptionClient.connect(host, _port))
{ {
calculateNodeMap(); calculateNodeMap();
// self handles the SUB/PUB traffic. Must be a dedicated client pre-Redis 6 subscriptionClient.UEventFd::fd = subscriptionClient.getFd();
UEventFd::op_mask |= EPOLLET; subscriptionClient.UEventFd::op_mask |= EPOLLET;
UEventFd::op_mask &= ~EPOLLRDHUP; subscriptionClient.UEventFd::op_mask &= ~EPOLLRDHUP;
UNotifier::insert(this, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel... UNotifier::insert(&subscriptionClient, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel...
U_RETURN(true); U_RETURN(true);
} }
@ -659,18 +701,13 @@ bool UREDISClusterClient::connect(const char* host, unsigned int _port)
U_RETURN(false); U_RETURN(false);
} }
bool UREDISClusterClient::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels bool UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels
{ {
U_TRACE(0, "UREDISClusterClient::clusterUnsubscribe(%V)", channel.rep) U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep)
if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{ {
if (pchannelCallbackMap == U_NULLPTR) (void)subscriptionClient.UREDISClient_Base::pchannelCallbackMap->erase(channel);
{
U_NEW(UHashMap<void*>, pchannelCallbackMap, UHashMap<void*>);
}
(void) pchannelCallbackMap->erase(channel);
U_RETURN(true); U_RETURN(true);
} }
@ -678,18 +715,18 @@ bool UREDISClusterClient::clusterUnsubscribe(const UString& channel) // unregist
U_RETURN(false); U_RETURN(false);
} }
bool UREDISClusterClient::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels bool UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels
{ {
U_TRACE(0, "UREDISClusterClient::clusterSubscribe(%V,%p)", channel.rep, callback) U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback)
if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel)))
{ {
if (pchannelCallbackMap == U_NULLPTR) if (subscriptionClient.UREDISClient_Base::pchannelCallbackMap == U_NULLPTR)
{ {
U_NEW(UHashMap<void*>, pchannelCallbackMap, UHashMap<void*>); U_NEW(UHashMap<void*>, subscriptionClient.UREDISClient_Base::pchannelCallbackMap, UHashMap<void*>);
} }
pchannelCallbackMap->insert(channel, (const void*)callback); subscriptionClient.UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback);
U_RETURN(true); U_RETURN(true);
} }
@ -697,60 +734,15 @@ bool UREDISClusterClient::clusterSubscribe(const UString& channel, vPFcscs callb
U_RETURN(false); U_RETURN(false);
} }
void UREDISClusterClient::processResponse() template const UVector<UString>& UREDISClusterMaster::processPipeline<true>(UString& pipeline, bool reorderable);
{ template const UVector<UString>& UREDISClusterMaster::processPipeline<false>(UString& pipeline, bool reorderable);
U_TRACE_NO_PARAM(0, "UREDISClusterClient::processResponse()")
if (UClient_Base::response.find("MOVED", 0, 5) != U_NOT_FOUND)
{
// MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node
error = ClusterError::moved;
calculateNodeMap();
}
else if (UClient_Base::response.find("ASK", 0, 3) != U_NOT_FOUND)
{
// ASK 3999 127.0.0.1:6381 => this means that one of the hash slots is being migrated to another server
error = ClusterError::ask;
uint32_t _start = UClient_Base::response.find(' ', 8) + 1,
end = UClient_Base::response.find(':', _start);
(void) temporaryASKip.replace(UClient_Base::response.substr(_start, end - _start));
}
else if (UClient_Base::response.find("TRYAGAIN", 0, 8) != U_NOT_FOUND)
{
/**
* during a resharding the multi-key operations targeting keys that all exist and are all still in the same node (either the source or destination node) are still available.
* Operations on keys that don't exist or are - during the resharding - split between the source and destination nodes, will generate a -TRYAGAIN error. The client can try
* the operation after some time, or report back the error. As soon as migration of the specified hash slot has terminated, all multi-key operations are available again for
* that hash slot
*/
error = ClusterError::tryagain;
UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms
}
else
{
error = ClusterError::none;
UREDISClient<UTCPSocket>::processResponse();
}
}
template const UVector<UString>& UREDISClusterClient::processPipeline<true>(UString& pipeline, bool reorderable);
template const UVector<UString>& UREDISClusterClient::processPipeline<false>(UString& pipeline, bool reorderable);
template <bool silence> template <bool silence>
const UVector<UString>& UREDISClusterClient::processPipeline(UString& pipeline, const bool reorderable) const UVector<UString>& UREDISClusterMaster::processPipeline(UString& pipeline, const bool reorderable)
{ {
U_TRACE(0, "UREDISClusterClient::processPipeline(%V,%b)", pipeline.rep, reorderable) U_TRACE(0, "UREDISClusterMaster::processPipeline(%V,%b)", pipeline.rep, reorderable)
UString workingString(U_CAPACITY); UString workingString(pipeline.size());
UVector<UString> commands(pipeline, "\r\n"); UVector<UString> commands(pipeline, "\r\n");
uint16_t hashslot, workingHashslot, count = 0; uint16_t hashslot, workingHashslot, count = 0;
@ -769,7 +761,7 @@ const UVector<UString>& UREDISClusterClient::processPipeline(UString& pipeline,
} }
} }
UREDISClient<UTCPSocket>& client = clientForHashslot(hashslot); UREDISClusterClient& client = clientForHashslot(hashslot);
if constexpr (silence) (void) client.sendRequest(workingString); if constexpr (silence) (void) client.sendRequest(workingString);
else else
@ -787,7 +779,7 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
case ClusterError::ask: case ClusterError::ask:
{ {
UREDISClient<UTCPSocket>& temporaryClient = clientForASKip(); UREDISClusterClient& temporaryClient = clientForASKip();
(void) temporaryClient.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); (void) temporaryClient.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString));
} }
@ -796,17 +788,15 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
case ClusterError::none: break; case ClusterError::none: break;
} }
if constexpr (silence == false) vitem.move(client.vitem); if constexpr (silence == false) subscriptionClient.vitem.move(client.vitem);
count = 0; count = 0;
workingString.clear(); workingString.clear();
} }
}; };
/* if (reorderable)
*/ {
if (reorderable) {
for (UVectorStringIter it = commands.begin(); it != commands.end(); ) { for (UVectorStringIter it = commands.begin(); it != commands.end(); ) {
if (it == commands.begin()) hashslot = hashslotFromCommand(*it); if (it == commands.begin()) hashslot = hashslotFromCommand(*it);
@ -834,11 +824,10 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
if (commands.size() != 0) it = commands.begin(); if (commands.size() != 0) it = commands.begin();
} }
} }
else { else
{
for (uint32_t index = 0, n = commands.size(); index < n; index++) for (uint32_t index = 0, n = commands.size(); index < n; index++)
{ {
if (index == 0) hashslot = hashslotFromCommand(commands[0]); if (index == 0) hashslot = hashslotFromCommand(commands[0]);
UString command = commands[index]; UString command = commands[index];
@ -848,8 +837,8 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
workingHashslot = hashslotFromCommand(command); workingHashslot = hashslotFromCommand(command);
if (workingHashslot == hashslot) { if (workingHashslot == hashslot)
{
++count; ++count;
// goto isADirective; // goto isADirective;
@ -864,13 +853,13 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS
} }
} }
return vitem; return subscriptionClient.vitem;
} }
# endif # endif
// DEBUG // DEBUG
# if defined(DEBUG) #if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UREDISClient_Base::dump(bool _reset) const const char* UREDISClient_Base::dump(bool _reset) const
{ {
UClient_Base::dump(false); UClient_Base::dump(false);
@ -889,4 +878,3 @@ const char* UREDISClient_Base::dump(bool _reset) const
return U_NULLPTR; return U_NULLPTR;
} }
# endif # endif
#endif

View File

@ -538,7 +538,7 @@ void UClientImage_Base::handlerDelete()
if (U_ClientImage_http(this) == '0') if (U_ClientImage_http(this) == '0')
{ {
if (bsocket_open && if (bsocket_open &&
UWebSocket::sendClose(socket)) UWebSocket::sendClose(true, socket))
{ {
socket->close(); socket->close();
} }

View File

@ -151,7 +151,7 @@ int UProxyPlugIn::handlerRequest()
while (UWebSocket::handleDataFraming(UWebSocket::rbuffer, UServer_Base::csocket) == U_WS_STATUS_CODE_OK && while (UWebSocket::handleDataFraming(UWebSocket::rbuffer, UServer_Base::csocket) == U_WS_STATUS_CODE_OK &&
(client_http->UClient_Base::prepareRequest(*UClientImage_Base::wbuffer), client_http->UClient_Base::sendRequestAndReadResponse()) && (client_http->UClient_Base::prepareRequest(*UClientImage_Base::wbuffer), client_http->UClient_Base::sendRequestAndReadResponse()) &&
UWebSocket::sendData(UServer_Base::csocket, UWebSocket::message_type, client_http->UClient_Base::response)) UWebSocket::sendData(false, UServer_Base::csocket, UWebSocket::message_type, client_http->UClient_Base::response))
{ {
client_http->UClient_Base::clearData(); client_http->UClient_Base::clearData();

View File

@ -90,6 +90,10 @@ int UWebSocketPlugIn::handlerRun()
U_INTERNAL_ASSERT_EQUALS(UWebSocket::rbuffer, U_NULLPTR) U_INTERNAL_ASSERT_EQUALS(UWebSocket::rbuffer, U_NULLPTR)
U_INTERNAL_ASSERT_EQUALS(UWebSocket::message, U_NULLPTR) U_INTERNAL_ASSERT_EQUALS(UWebSocket::message, U_NULLPTR)
#ifndef USE_LIBSSL
U_ERROR("Sorry, I was compiled without SSL support so I can't use websocket");
#endif
U_NEW_STRING(UWebSocket::rbuffer, UString(U_CAPACITY)); U_NEW_STRING(UWebSocket::rbuffer, UString(U_CAPACITY));
U_NEW_STRING(UWebSocket::message, UString(U_CAPACITY)); U_NEW_STRING(UWebSocket::message, UString(U_CAPACITY));

View File

@ -80,7 +80,7 @@ else
{ {
// echo // echo
if (UWebSocket::sendData(UServer_Base::csocket, UWebSocket::message_type, *UWebSocket::message) == false) U_http_info.nResponseCode = HTTP_INTERNAL_ERROR; if (UWebSocket::sendData(false, UServer_Base::csocket, UWebSocket::message_type, *UWebSocket::message) == false) U_http_info.nResponseCode = HTTP_INTERNAL_ERROR;
} }
#endif #endif
--> -->

View File

@ -520,7 +520,8 @@ loop:
case U_WS_OPCODE_PING: case U_WS_OPCODE_PING:
{ {
if (sendControlFrame(socket, U_WS_OPCODE_PONG, application_data, application_data_offset) == false) // implies only client initiates ping pong... so any pong is always server -> client
if (sendControlFrame(true, socket, U_WS_OPCODE_PONG, application_data, application_data_offset) == false)
{ {
U_RETURN(U_WS_STATUS_CODE_PROTOCOL_ERROR); U_RETURN(U_WS_STATUS_CODE_PROTOCOL_ERROR);
} }
@ -600,18 +601,32 @@ next:
goto loop; goto loop;
} }
bool UWebSocket::sendData(USocket* socket, int type, const char* data, uint32_t len) bool UWebSocket::sendData(const bool isServer, USocket* socket, int type, const char* data, uint32_t len)
{ {
U_TRACE(0, "UWebSocket::sendData(%p,%d,%.*S,%u)", socket, type, len, data, len) U_TRACE(0, "UWebSocket::sendData(%b,%p,%d,%.*S,%u)", isServer, socket, type, len, data, len)
if (UNLIKELY(len > 0xffffffff))
{
status_code = U_WS_STATUS_CODE_MESSAGE_TOO_LARGE;
U_RETURN(false);
}
uint32_t header_length = (len > 125U ? 2U : 0) + (len > 0xffff ? 8U : 0);
uint8_t opcode, masking_key[4]; uint8_t opcode, masking_key[4];
uint32_t header_length = 6U + (len > 125U ? 2U : 0) + (len > 0xffff ? 8U : 0), ncount = header_length + len;
if (isServer) header_length += 2U;
else
{
header_length += 6U;
*((uint32_t*)masking_key) = u_get_num_random();
}
uint32_t ncount = header_length + len;
UString tmp(ncount), compressed; UString tmp(ncount), compressed;
unsigned char* header = (unsigned char*)tmp.data(); unsigned char* header = (unsigned char*)tmp.data();
*((uint32_t*)masking_key) = u_get_num_random();
switch (type) switch (type)
{ {
case U_WS_MESSAGE_TYPE_TEXT: case U_WS_MESSAGE_TYPE_TEXT:
@ -649,39 +664,86 @@ bool UWebSocket::sendData(USocket* socket, int type, const char* data, uint32_t
header[0] = (opcode | 0x80); header[0] = (opcode | 0x80);
if (len <= 125) // possible client header lengths
// 2 4 12
// possible server header lengths
// 6 8 16
switch (header_length)
{
// server + len < 125
case 2:
{
header[1] = len;
break;
}
// server + (len > 125 && len <= 0xffff) // 125 && 65535
case 4:
{
header[1] = 126;
u_put_unalignedp16(header+2, htons(len));
break;
}
case 12:
{
header[1] = 127;
u_put_unalignedp64(header+2, htonl(len));
break;
}
// client + len < 125
case 6:
{ {
header[1] = (len | 0x80); header[1] = (len | 0x80);
u_put_unalignedp32(header+2, *((uint32_t*)masking_key)); u_put_unalignedp32(header+2, *((uint32_t*)masking_key));
break;
} }
else if (len > 125 && // client + (len > 125 && len <= 0xffff) // 125 && 65535
len <= 0xffff) // 125 && 65535 case 8:
{ {
header[1] = (126 | 0x80); header[1] = (126 | 0x80);
u_put_unalignedp16(header+2, htons(len)); u_put_unalignedp16(header+2, htons(len));
u_put_unalignedp32(header+4, *((uint32_t*)masking_key)); u_put_unalignedp32(header+4, *((uint32_t*)masking_key));
break;
} }
else if (len > 0xffff && case 16:
len <= 0xffffffff)
{ {
header[1] = (127 | 0x80); header[1] = (127 | 0x80);
u_put_unalignedp64(header+2, htonl(len)); u_put_unalignedp64(header+2, htonl(len));
u_put_unalignedp32(header+10, *((uint32_t*)masking_key)); u_put_unalignedp32(header+10, *((uint32_t*)masking_key));
break;
} }
else
default: break; // never reached
}
switch (header_length)
{ {
status_code = U_WS_STATUS_CODE_MESSAGE_TOO_LARGE; // server
case 2:
U_RETURN(false); case 4:
case 12:
{
for (uint32_t i = 0; i < len; ++i)
{
header[2+i] = data[i];
} }
break;
}
// client
case 6:
case 8:
case 16:
{
for (uint32_t i = 0; i < len; ++i) for (uint32_t i = 0; i < len; ++i)
{ {
header[6+i] = (data[i] ^ masking_key[i % 4]) & 0xff; header[6+i] = (data[i] ^ masking_key[i % 4]) & 0xff;
} }
break;
}
default: break; // never reached
}
U_SRV_LOG_WITH_ADDR("send websocket data (%u+%u bytes) %.*S to", header_length, len, len, data) U_SRV_LOG_WITH_ADDR("send websocket data (%u+%u bytes) %.*S to", header_length, len, len, data)
@ -690,27 +752,40 @@ bool UWebSocket::sendData(USocket* socket, int type, const char* data, uint32_t
U_RETURN(false); U_RETURN(false);
} }
bool UWebSocket::sendControlFrame(USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length) bool UWebSocket::sendControlFrame(const bool isServer, USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length)
{ {
U_TRACE(0, "UWebSocket::sendControlFrame(%p,%d,%.*S,%u)", socket, opcode, payload_length, payload, payload_length) U_TRACE(0, "UWebSocket::sendControlFrame(%b,%p,%d,%.*S,%u)", isServer, socket, opcode, payload_length, payload, payload_length)
uint8_t masking_key[4]; uint32_t ncount = (isServer ? 2U : 6U) + payload_length;
uint32_t ncount = 6U + payload_length;
UString tmp(ncount); UString tmp(ncount);
unsigned char* header = (unsigned char*)tmp.data(); unsigned char* header = (unsigned char*)tmp.data();
*((uint32_t*)masking_key) = u_get_num_random();
header[0] = (opcode | 0x80); header[0] = (opcode | 0x80);
if (isServer)
{
header[1] = payload_length;
for (uint32_t i = 0; i < payload_length; ++i)
{
header[2+i] = payload[i];
}
}
else
{
header[1] = (payload_length | 0x80); header[1] = (payload_length | 0x80);
uint8_t masking_key[4];
*((uint32_t*)masking_key) = u_get_num_random();
u_put_unalignedp32(header+2, *((uint32_t*)masking_key)); u_put_unalignedp32(header+2, *((uint32_t*)masking_key));
for (uint32_t i = 0; i < payload_length; ++i) for (uint32_t i = 0; i < payload_length; ++i)
{ {
header[6+i] = (payload[i] ^ masking_key[i % 4]) & 0xff; header[6+i] = (payload[i] ^ masking_key[i % 4]) & 0xff;
} }
}
if (USocketExt::write(socket, (const char*)header, ncount, UServer_Base::timeoutMS) == ncount) if (USocketExt::write(socket, (const char*)header, ncount, UServer_Base::timeoutMS) == ncount)
{ {
@ -776,7 +851,7 @@ loop:
rbuffer->setEmpty(); rbuffer->setEmpty();
if (UServices::read(UProcess::filedes[2], *rbuffer) && if (UServices::read(UProcess::filedes[2], *rbuffer) &&
sendData(UServer_Base::csocket, message_type, *rbuffer)) sendData(true, UServer_Base::csocket, message_type, *rbuffer))
{ {
rbuffer->setEmpty(); rbuffer->setEmpty();
@ -811,7 +886,7 @@ data: if (handleDataFraming(rbuffer, UServer_Base::csocket) == U_WS_STATUS_CO
// Send server-side closing handshake // Send server-side closing handshake
if (UServer_Base::csocket->isOpen() && if (UServer_Base::csocket->isOpen() &&
sendClose(UServer_Base::csocket)) sendClose(true, UServer_Base::csocket))
{ {
UClientImage_Base::close(); UClientImage_Base::close();
} }

View File

@ -1 +1 @@
05D2 05D8