From 28a3a852a80f5b79d544c124de349028076f959a Mon Sep 17 00:00:00 2001 From: stefanocasazza Date: Tue, 3 Sep 2019 18:34:39 +0200 Subject: [PATCH] sync --- include/ulib/net/client/redis.h | 150 +++++++------ include/ulib/net/client/websocket.h | 4 +- include/ulib/utility/websocket.h | 12 +- src/ulib/net/client/redis.cpp | 214 +++++++++---------- src/ulib/net/server/client_image.cpp | 2 +- src/ulib/net/server/plugin/mod_proxy.cpp | 2 +- src/ulib/net/server/plugin/mod_socket.cpp | 4 + src/ulib/net/server/plugin/usp/modsocket.usp | 2 +- src/ulib/utility/websocket.cpp | 155 ++++++++++---- tests/examples/TSA/tsaserial | 2 +- 10 files changed, 304 insertions(+), 243 deletions(-) diff --git a/include/ulib/net/client/redis.h b/include/ulib/net/client/redis.h index 49fb3620..8e5a18ec 100644 --- a/include/ulib/net/client/redis.h +++ b/include/ulib/net/client/redis.h @@ -63,7 +63,7 @@ typedef void (*vPFcs) (const UString&); typedef void (*vPFcscs)(const UString&,const UString&); -class UREDISClusterClient; +class UREDISClusterMaster; class U_EXPORT UREDISClient_Base : public UClient_Base, UEventFd { public: @@ -822,7 +822,7 @@ protected: } void init(); - virtual void processResponse(); + void processResponse(); bool processRequest(char recvtype); bool sendRequest(const UString& pipeline) @@ -898,7 +898,7 @@ protected: private: bool getResponseItem() U_NO_EXPORT; - friend class UREDISClusterClient; + friend class UREDISClusterMaster; // U_DISALLOW_COPY_AND_ASSIGN(UREDISClient_Base) }; @@ -925,6 +925,7 @@ public: #endif private: + // U_DISALLOW_COPY_AND_ASSIGN(UREDISClient) }; @@ -978,45 +979,56 @@ private: // by Victor Stewart #if defined(U_STDCPP_ENABLE) && defined(HAVE_CXX17) -# include -class U_EXPORT UREDISClusterClient : protected UREDISClient { +class U_EXPORT UREDISClusterClient : public UREDISClient { private: - struct RedisNode { - UString ipAddress; - UREDISClient client; - uint16_t port, lowHashSlot, highHashSlot; - - RedisNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot) : - ipAddress(_ipAddress), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) - { - client.connect(ipAddress.data(), port); - } + UREDISClusterMaster *master; - // DEBUG +public: + + void processResponse(); + UREDISClusterClient(UREDISClusterMaster *_master) : UREDISClient(), master(_master) {} +}; + +struct RedisClusterNode { + + UString ipAddress; + UREDISClusterClient client; + uint16_t port, lowHashSlot, highHashSlot; + + RedisClusterNode(const UString& _ipAddress, uint16_t _port, uint16_t _lowHashSlot, uint16_t _highHashSlot, UREDISClusterMaster *master) : ipAddress(_ipAddress), client(master), port(_port), lowHashSlot(_lowHashSlot), highHashSlot(_highHashSlot) + { + client.connect(ipAddress.data(), port); + } #if defined(U_STDCPP_ENABLE) && defined(DEBUG) const char* dump(bool _reset) const { return ""; } #endif - }; +}; - enum class ClusterError : uint8_t { - none, - moved, - ask, - tryagain - }; +enum class ClusterError : uint8_t { + none, + moved, + ask, + tryagain +}; + +class U_EXPORT UREDISClusterMaster { +private: + + friend class UREDISClusterClient; ClusterError error; UString temporaryASKip; - UHashMap redisNodes; - - uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); } + UREDISClusterClient subscriptionClient; + UHashMap clusterNodes; // when these call they need to be processed... also when MOVED... we need to set up and recalculate + uint16_t hashslotForKey(const UString& hashableKey) { return u_crc16(U_STRING_TO_PARAM(hashableKey)); } + uint16_t hashslotFromCommand(const UString& command) - { - U_TRACE(0, "UREDISClusterClient::hashslotFromCommand(%V)", command.rep) + { + U_TRACE(0, "UREDISClusterMaster::hashslotFromCommand(%V)", command.rep) // expects hashable keys to be delivered as abc{hashableKey}xyz value blah \r\n @@ -1024,58 +1036,48 @@ private: end = command.find('}', beginning) - 1; return hashslotForKey(command.substr(beginning, end - beginning)); + } + + UREDISClusterClient& clientForHashslot(uint16_t hashslot) + { + U_TRACE(0, "UREDISClusterMaster::clientForHashslot(%u)", hashslot) + + for (UHashMapNode *node : clusterNodes) + { + RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem); + + if ((workingNode->lowHashSlot <= hashslot) && (workingNode->highHashSlot >= hashslot)) return workingNode->client; } - UREDISClient& clientForHashslot(uint16_t hashslot) + return subscriptionClient; // never reached + } + + UREDISClusterClient& clientForASKip() + { + for (UHashMapNode *node : clusterNodes) { - U_TRACE(0, "UREDISClusterClient::clientForHashslot(%u)", hashslot) - - for (UHashMapNode *node : redisNodes) - { - RedisNode* workingNode = (RedisNode *)(node->elem); - - if ((workingNode->lowHashSlot <= hashslot) || (workingNode->highHashSlot >= hashslot)) return workingNode->client; - } - - return *this; // never reached - } - - UREDISClient& clientForASKip() - { - for (UHashMapNode *node : redisNodes) - { - RedisNode* workingNode = (RedisNode *)(node->elem); + RedisClusterNode* workingNode = (RedisClusterNode *)(node->elem); if (temporaryASKip == workingNode->ipAddress) return workingNode->client; - } - - return *this; // never reached } - UREDISClient& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } - -public: - UREDISClusterClient() : UREDISClient() - { - U_TRACE_CTOR(0, UREDISClusterClient, "") - } - - ~UREDISClusterClient() - { - U_TRACE_DTOR(0, UREDISClusterClient) - } - - void processResponse() final; - void calculateNodeMap(); - - bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379); + return subscriptionClient; // never reached + } + UREDISClusterClient& clientForHashableKey(const UString& hashableKey) { return clientForHashslot(hashslotForKey(hashableKey)); } + template const UVector& processPipeline(UString& pipeline, bool reorderable); + void calculateNodeMap(); + +public: + + bool connect(const char* host = U_NULLPTR, unsigned int _port = 6379); + // all of these multis require all keys to exist within a single hash slot (on the same node isn't good enough) - UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).single(pipeline); } + UString clusterSingle(const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).single(pipeline); } const UVector& clusterMulti( const UString& hashableKey, const UString& pipeline) { return clientForHashableKey(hashableKey).multi(pipeline); } void clusterSilencedMulti( const UString& hashableKey, UString& pipeline) { clientForHashableKey(hashableKey).silencedMulti(pipeline); } @@ -1093,18 +1095,10 @@ public: bool clusterUnsubscribe(const UString& channel); bool clusterSubscribe( const UString& channel, vPFcscs callback); - // DEBUG - -#if defined(U_STDCPP_ENABLE) && defined(DEBUG) - const char* dump(bool _reset) const { return UREDISClient_Base::dump(_reset); } -#endif - -private: - U_DISALLOW_COPY_AND_ASSIGN(UREDISClusterClient) + UREDISClusterMaster() : subscriptionClient(this) {} }; -extern template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); -extern template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); - +extern template const UVector& UREDISClusterMaster::processPipeline(UString& pipeline, bool reorderable); +extern template const UVector& UREDISClusterMaster::processPipeline(UString& pipeline, bool reorderable); #endif #endif diff --git a/include/ulib/net/client/websocket.h b/include/ulib/net/client/websocket.h index 3ebc89d6..5fd16717 100644 --- a/include/ulib/net/client/websocket.h +++ b/include/ulib/net/client/websocket.h @@ -73,7 +73,7 @@ public: U_INTERNAL_ASSERT_POINTER(client) - return UWebSocket::sendData(client->UClient_Base::socket, type, msg); + return UWebSocket::sendData(false, client->UClient_Base::socket, type, msg); } void close() @@ -82,7 +82,7 @@ public: U_INTERNAL_ASSERT_POINTER(client) - (void) UWebSocket::sendClose(client->UClient_Base::socket); + (void) UWebSocket::sendClose(false, client->UClient_Base::socket); client->UClient_Base::close(); } diff --git a/include/ulib/utility/websocket.h b/include/ulib/utility/websocket.h index c9675c93..b853fba2 100644 --- a/include/ulib/utility/websocket.h +++ b/include/ulib/utility/websocket.h @@ -72,13 +72,13 @@ public: } WebSocketFrameData; static bool checkForInitialData(); - static bool sendData(USocket* socket, int type, const char* data, uint32_t len); + static bool sendData(const bool isServer, USocket* socket, int type, const char* data, uint32_t len); - static bool sendData(USocket* socket, int type, const UString& data) { return sendData(socket, type, U_STRING_TO_PARAM(data)); } + static bool sendData(const bool isServer, USocket* socket, int type, const UString& data) { return sendData(isServer, socket, type, U_STRING_TO_PARAM(data)); } - static bool sendClose(USocket* socket) + static bool sendClose(const bool isServer, USocket* socket) { - U_TRACE(0, "UWebSocket::sendClose(%p)", socket) + U_TRACE(0, "UWebSocket::sendClose(%b,%p)", isServer, socket) // Send server-side closing handshake @@ -87,7 +87,7 @@ public: unsigned char status_code_buffer[2] = { (unsigned char)((status_code >> 8) & 0xFF), (unsigned char)( status_code & 0xFF) }; - if (sendControlFrame(socket, U_WS_OPCODE_CLOSE, status_code_buffer, sizeof(status_code_buffer))) U_RETURN(true); + if (sendControlFrame(isServer, socket, U_WS_OPCODE_CLOSE, status_code_buffer, sizeof(status_code_buffer))) U_RETURN(true); U_RETURN(false); } @@ -208,7 +208,7 @@ private: static bool sendAccept(USocket* socket); static RETSIGTYPE handlerForSigTERM(int signo); static int handleDataFraming(UString* pbuffer, USocket* socket); - static bool sendControlFrame(USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length); + static bool sendControlFrame(const bool isServer, USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length); U_DISALLOW_COPY_AND_ASSIGN(UWebSocket) diff --git a/src/ulib/net/client/redis.cpp b/src/ulib/net/client/redis.cpp index 903f5d49..0694bc23 100644 --- a/src/ulib/net/client/redis.cpp +++ b/src/ulib/net/client/redis.cpp @@ -549,15 +549,57 @@ int UREDISClient_Base::handlerRead() U_RETURN(U_NOTIFIER_OK); } -#if defined(U_STDCPP_ENABLE) - // by Victor Stewart # if defined(HAVE_CXX17) -void UREDISClusterClient::calculateNodeMap() -{ - U_TRACE_NO_PARAM(0, "UREDISClusterClient::calculateNodeMap()") +void UREDISClusterClient::processResponse() +{ + U_TRACE_NO_PARAM(0, "UREDISClusterClient::processResponse()") + + if (UClient_Base::response.find("-MOVED", 0, 6) != U_NOT_FOUND) + { + // MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node + + master->error = ClusterError::moved; + + master->calculateNodeMap(); + } + else if (UClient_Base::response.find("-ASK", 1, 4) != U_NOT_FOUND) + { + // ASK 3999 127.0.0.1:6381 => this means that one of the hash slots is being migrated to another server + + master->error = ClusterError::ask; + + uint32_t _start = UClient_Base::response.find(' ', 8) + 1, + end = UClient_Base::response.find(':', _start); + + (void)master->temporaryASKip.assign(UClient_Base::response.substr(_start, end - _start)); + } + else if (UClient_Base::response.find("-TRYAGAIN", 0, 9) != U_NOT_FOUND) + { + // + // * during a resharding the multi-key operations targeting keys that all exist and are all still in the same node (either the source or destination node) are still available. + // * Operations on keys that don't exist or are - during the resharding - split between the source and destination nodes, will generate a -TRYAGAIN error. The client can try + // * the operation after some time, or report back the error. As soon as migration of the specified hash slot has terminated, all multi-key operations are available again for + // * that hash slot + + + master->error = ClusterError::tryagain; + + UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms + } + else + { + master->error = ClusterError::none; + + UREDISClient_Base::processResponse(); + } +} + +void UREDISClusterMaster::calculateNodeMap() +{ + U_TRACE_NO_PARAM(0, "UREDISClusterMaster::calculateNodeMap()") /* 127.0.0.1:30001> cluster slots 1) 1) (integer) 0 @@ -586,14 +628,16 @@ void UREDISClusterClient::calculateNodeMap() 3) "58e6e48d41228013e5d9c1c37c5060693925e97e" */ + // the first node in each array is the master + bool findHashSlots = true; uint16_t workingLowHashSlot; uint16_t workingHighHashSlot; - (void) UREDISClient_Base::processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); - - UHashMap newNodes; - const UVector& rawNodes = UREDISClient_Base::vitem; + (void) subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("CLUSTER SLOTS")); + + UHashMap newNodes; + const UVector& rawNodes = subscriptionClient.vitem; for (uint32_t a = 0, b = rawNodes.size(); a < b; a+=2) { @@ -612,12 +656,10 @@ void UREDISClusterClient::calculateNodeMap() } else { - // first node in the array is the master - UString compositeAddress(50U); compositeAddress.snprintf(U_CONSTANT_TO_PARAM("%v.%v"), first.rep, second.rep); - RedisNode *workingNode = redisNodes.erase(compositeAddress); + RedisClusterNode *workingNode = clusterNodes.erase(compositeAddress); // in the case of MOVE some nodes will be new, but others we'll already be connected to if (workingNode) @@ -625,8 +667,8 @@ void UREDISClusterClient::calculateNodeMap() workingNode->lowHashSlot = workingLowHashSlot; workingNode->highHashSlot = workingHighHashSlot; } - else workingNode = new RedisNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot); - + else workingNode = new RedisClusterNode(first, second.strtoul(), workingLowHashSlot, workingHighHashSlot, this); + newNodes.insert(compositeAddress, workingNode); workingNode = newNodes[compositeAddress]; @@ -636,121 +678,71 @@ void UREDISClusterClient::calculateNodeMap() } // if any nodes were taken offline, the clients would've disconnected by default - redisNodes.assign(newNodes); + clusterNodes.assign(newNodes); } -bool UREDISClusterClient::connect(const char* host, unsigned int _port) +bool UREDISClusterMaster::connect(const char* host, unsigned int _port) { - U_TRACE(0, "UREDISClusterClient::connect(%S,%u)", host, _port) + U_TRACE(0, "UREDISClusterMaster::connect(%S,%u)", host, _port) - if (UREDISClient::connect(host, _port)) - { + if (subscriptionClient.connect(host, _port)) + { calculateNodeMap(); - - // self handles the SUB/PUB traffic. Must be a dedicated client pre-Redis 6 - UEventFd::op_mask |= EPOLLET; - UEventFd::op_mask &= ~EPOLLRDHUP; - UNotifier::insert(this, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel... + subscriptionClient.UEventFd::fd = subscriptionClient.getFd(); + subscriptionClient.UEventFd::op_mask |= EPOLLET; + subscriptionClient.UEventFd::op_mask &= ~EPOLLRDHUP; + + UNotifier::insert(&subscriptionClient, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel... U_RETURN(true); - } + } U_RETURN(false); } -bool UREDISClusterClient::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels +bool UREDISClusterMaster::clusterUnsubscribe(const UString& channel) // unregister the callback for messages published to the given channels { - U_TRACE(0, "UREDISClusterClient::clusterUnsubscribe(%V)", channel.rep) + U_TRACE(0, "UREDISClusterMaster::clusterUnsubscribe(%V)", channel.rep) - if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) - { - if (pchannelCallbackMap == U_NULLPTR) - { - U_NEW(UHashMap, pchannelCallbackMap, UHashMap); - } - - (void) pchannelCallbackMap->erase(channel); + if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), U_STRING_TO_PARAM(channel))) + { + (void)subscriptionClient.UREDISClient_Base::pchannelCallbackMap->erase(channel); U_RETURN(true); - } + } U_RETURN(false); } -bool UREDISClusterClient::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels +bool UREDISClusterMaster::clusterSubscribe(const UString& channel, vPFcscs callback) // register the callback for messages published to the given channels { - U_TRACE(0, "UREDISClusterClient::clusterSubscribe(%V,%p)", channel.rep, callback) + U_TRACE(0, "UREDISClusterMaster::clusterSubscribe(%V,%p)", channel.rep, callback) - if (processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) + if (subscriptionClient.processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), U_STRING_TO_PARAM(channel))) + { + if (subscriptionClient.UREDISClient_Base::pchannelCallbackMap == U_NULLPTR) { - if (pchannelCallbackMap == U_NULLPTR) - { - U_NEW(UHashMap, pchannelCallbackMap, UHashMap); - } + U_NEW(UHashMap, subscriptionClient.UREDISClient_Base::pchannelCallbackMap, UHashMap); + } - pchannelCallbackMap->insert(channel, (const void*)callback); + subscriptionClient.UREDISClient_Base::pchannelCallbackMap->insert(channel, (const void*)callback); U_RETURN(true); - } + } U_RETURN(false); } -void UREDISClusterClient::processResponse() -{ - U_TRACE_NO_PARAM(0, "UREDISClusterClient::processResponse()") - - if (UClient_Base::response.find("MOVED", 0, 5) != U_NOT_FOUND) - { - // MOVED 3999 127.0.0.1:6381 => the hashslot has been moved to another master node - - error = ClusterError::moved; - - calculateNodeMap(); - } - else if (UClient_Base::response.find("ASK", 0, 3) != U_NOT_FOUND) - { - // ASK 3999 127.0.0.1:6381 => this means that one of the hash slots is being migrated to another server - - error = ClusterError::ask; - - uint32_t _start = UClient_Base::response.find(' ', 8) + 1, - end = UClient_Base::response.find(':', _start); - - (void) temporaryASKip.replace(UClient_Base::response.substr(_start, end - _start)); - } - - else if (UClient_Base::response.find("TRYAGAIN", 0, 8) != U_NOT_FOUND) - { - /** - * during a resharding the multi-key operations targeting keys that all exist and are all still in the same node (either the source or destination node) are still available. - * Operations on keys that don't exist or are - during the resharding - split between the source and destination nodes, will generate a -TRYAGAIN error. The client can try - * the operation after some time, or report back the error. As soon as migration of the specified hash slot has terminated, all multi-key operations are available again for - * that hash slot - */ - - error = ClusterError::tryagain; - - UTimeVal(0L, 1000L).nanosleep(); // 0 sec, 1000 microsec = 1ms - } - else - { - error = ClusterError::none; - - UREDISClient::processResponse(); - } -} - -template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); -template const UVector& UREDISClusterClient::processPipeline(UString& pipeline, bool reorderable); +template const UVector& UREDISClusterMaster::processPipeline(UString& pipeline, bool reorderable); +template const UVector& UREDISClusterMaster::processPipeline(UString& pipeline, bool reorderable); template -const UVector& UREDISClusterClient::processPipeline(UString& pipeline, const bool reorderable) +const UVector& UREDISClusterMaster::processPipeline(UString& pipeline, const bool reorderable) { - U_TRACE(0, "UREDISClusterClient::processPipeline(%V,%b)", pipeline.rep, reorderable) + U_TRACE(0, "UREDISClusterMaster::processPipeline(%V,%b)", pipeline.rep, reorderable) - UString workingString(U_CAPACITY); + UString workingString(pipeline.size()); UVector commands(pipeline, "\r\n"); uint16_t hashslot, workingHashslot, count = 0; @@ -769,7 +761,7 @@ const UVector& UREDISClusterClient::processPipeline(UString& pipeline, } } - UREDISClient& client = clientForHashslot(hashslot); + UREDISClusterClient& client = clientForHashslot(hashslot); if constexpr (silence) (void) client.sendRequest(workingString); else @@ -787,7 +779,7 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS case ClusterError::ask: { - UREDISClient& temporaryClient = clientForASKip(); + UREDISClusterClient& temporaryClient = clientForASKip(); (void) temporaryClient.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingString)); } @@ -796,17 +788,15 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS case ClusterError::none: break; } - if constexpr (silence == false) vitem.move(client.vitem); + if constexpr (silence == false) subscriptionClient.vitem.move(client.vitem); count = 0; workingString.clear(); } }; - /* - */ - if (reorderable) { - + if (reorderable) + { for (UVectorStringIter it = commands.begin(); it != commands.end(); ) { if (it == commands.begin()) hashslot = hashslotFromCommand(*it); @@ -834,11 +824,10 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS if (commands.size() != 0) it = commands.begin(); } } - else { - + else + { for (uint32_t index = 0, n = commands.size(); index < n; index++) - { - + { if (index == 0) hashslot = hashslotFromCommand(commands[0]); UString command = commands[index]; @@ -848,8 +837,8 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS workingHashslot = hashslotFromCommand(command); - if (workingHashslot == hashslot) { - + if (workingHashslot == hashslot) + { ++count; // goto isADirective; @@ -863,14 +852,14 @@ replay: (void) client.processRequest(U_RC_MULTIBULK, U_STRING_TO_PARAM(workingS count = 0; } } - - return vitem; + + return subscriptionClient.vitem; } # endif // DEBUG -# if defined(DEBUG) +#if defined(U_STDCPP_ENABLE) && defined(DEBUG) const char* UREDISClient_Base::dump(bool _reset) const { UClient_Base::dump(false); @@ -888,5 +877,4 @@ const char* UREDISClient_Base::dump(bool _reset) const return U_NULLPTR; } -# endif -#endif +# endif diff --git a/src/ulib/net/server/client_image.cpp b/src/ulib/net/server/client_image.cpp index 25ee0eb8..0fddbe55 100644 --- a/src/ulib/net/server/client_image.cpp +++ b/src/ulib/net/server/client_image.cpp @@ -538,7 +538,7 @@ void UClientImage_Base::handlerDelete() if (U_ClientImage_http(this) == '0') { if (bsocket_open && - UWebSocket::sendClose(socket)) + UWebSocket::sendClose(true, socket)) { socket->close(); } diff --git a/src/ulib/net/server/plugin/mod_proxy.cpp b/src/ulib/net/server/plugin/mod_proxy.cpp index 37f84be3..ccb4fb0f 100644 --- a/src/ulib/net/server/plugin/mod_proxy.cpp +++ b/src/ulib/net/server/plugin/mod_proxy.cpp @@ -151,7 +151,7 @@ int UProxyPlugIn::handlerRequest() while (UWebSocket::handleDataFraming(UWebSocket::rbuffer, UServer_Base::csocket) == U_WS_STATUS_CODE_OK && (client_http->UClient_Base::prepareRequest(*UClientImage_Base::wbuffer), client_http->UClient_Base::sendRequestAndReadResponse()) && - UWebSocket::sendData(UServer_Base::csocket, UWebSocket::message_type, client_http->UClient_Base::response)) + UWebSocket::sendData(false, UServer_Base::csocket, UWebSocket::message_type, client_http->UClient_Base::response)) { client_http->UClient_Base::clearData(); diff --git a/src/ulib/net/server/plugin/mod_socket.cpp b/src/ulib/net/server/plugin/mod_socket.cpp index dd089347..c9cdd4c3 100644 --- a/src/ulib/net/server/plugin/mod_socket.cpp +++ b/src/ulib/net/server/plugin/mod_socket.cpp @@ -90,6 +90,10 @@ int UWebSocketPlugIn::handlerRun() U_INTERNAL_ASSERT_EQUALS(UWebSocket::rbuffer, U_NULLPTR) U_INTERNAL_ASSERT_EQUALS(UWebSocket::message, U_NULLPTR) +#ifndef USE_LIBSSL + U_ERROR("Sorry, I was compiled without SSL support so I can't use websocket"); +#endif + U_NEW_STRING(UWebSocket::rbuffer, UString(U_CAPACITY)); U_NEW_STRING(UWebSocket::message, UString(U_CAPACITY)); diff --git a/src/ulib/net/server/plugin/usp/modsocket.usp b/src/ulib/net/server/plugin/usp/modsocket.usp index d523f0a1..e1988636 100644 --- a/src/ulib/net/server/plugin/usp/modsocket.usp +++ b/src/ulib/net/server/plugin/usp/modsocket.usp @@ -80,7 +80,7 @@ else { // echo - if (UWebSocket::sendData(UServer_Base::csocket, UWebSocket::message_type, *UWebSocket::message) == false) U_http_info.nResponseCode = HTTP_INTERNAL_ERROR; + if (UWebSocket::sendData(false, UServer_Base::csocket, UWebSocket::message_type, *UWebSocket::message) == false) U_http_info.nResponseCode = HTTP_INTERNAL_ERROR; } #endif --> diff --git a/src/ulib/utility/websocket.cpp b/src/ulib/utility/websocket.cpp index c1f112c9..a7b0c981 100644 --- a/src/ulib/utility/websocket.cpp +++ b/src/ulib/utility/websocket.cpp @@ -520,7 +520,8 @@ loop: case U_WS_OPCODE_PING: { - if (sendControlFrame(socket, U_WS_OPCODE_PONG, application_data, application_data_offset) == false) + // implies only client initiates ping pong... so any pong is always server -> client + if (sendControlFrame(true, socket, U_WS_OPCODE_PONG, application_data, application_data_offset) == false) { U_RETURN(U_WS_STATUS_CODE_PROTOCOL_ERROR); } @@ -600,18 +601,32 @@ next: goto loop; } -bool UWebSocket::sendData(USocket* socket, int type, const char* data, uint32_t len) +bool UWebSocket::sendData(const bool isServer, USocket* socket, int type, const char* data, uint32_t len) { - U_TRACE(0, "UWebSocket::sendData(%p,%d,%.*S,%u)", socket, type, len, data, len) + U_TRACE(0, "UWebSocket::sendData(%b,%p,%d,%.*S,%u)", isServer, socket, type, len, data, len) + if (UNLIKELY(len > 0xffffffff)) + { + status_code = U_WS_STATUS_CODE_MESSAGE_TOO_LARGE; + + U_RETURN(false); + } + + uint32_t header_length = (len > 125U ? 2U : 0) + (len > 0xffff ? 8U : 0); uint8_t opcode, masking_key[4]; - uint32_t header_length = 6U + (len > 125U ? 2U : 0) + (len > 0xffff ? 8U : 0), ncount = header_length + len; + + if (isServer) header_length += 2U; + else + { + header_length += 6U; + *((uint32_t*)masking_key) = u_get_num_random(); + } + + uint32_t ncount = header_length + len; UString tmp(ncount), compressed; unsigned char* header = (unsigned char*)tmp.data(); - *((uint32_t*)masking_key) = u_get_num_random(); - switch (type) { case U_WS_MESSAGE_TYPE_TEXT: @@ -649,39 +664,86 @@ bool UWebSocket::sendData(USocket* socket, int type, const char* data, uint32_t header[0] = (opcode | 0x80); - if (len <= 125) - { - header[1] = (len | 0x80); + // possible client header lengths + // 2 4 12 + // possible server header lengths + // 6 8 16 - u_put_unalignedp32(header+2, *((uint32_t*)masking_key)); + switch (header_length) + { + // server + len < 125 + case 2: + { + header[1] = len; + break; } - else if (len > 125 && - len <= 0xffff) // 125 && 65535 + // server + (len > 125 && len <= 0xffff) // 125 && 65535 + case 4: { - header[1] = (126 | 0x80); - - u_put_unalignedp16(header+2, htons(len)); - u_put_unalignedp32(header+4, *((uint32_t*)masking_key)); + header[1] = 126; + u_put_unalignedp16(header+2, htons(len)); + break; } - else if (len > 0xffff && - len <= 0xffffffff) + case 12: { - header[1] = (127 | 0x80); - - u_put_unalignedp64(header+2, htonl(len)); - u_put_unalignedp32(header+10, *((uint32_t*)masking_key)); + header[1] = 127; + u_put_unalignedp64(header+2, htonl(len)); + break; } - else + // client + len < 125 + case 6: { - status_code = U_WS_STATUS_CODE_MESSAGE_TOO_LARGE; + header[1] = (len | 0x80); + u_put_unalignedp32(header+2, *((uint32_t*)masking_key)); + break; + } + // client + (len > 125 && len <= 0xffff) // 125 && 65535 + case 8: + { + header[1] = (126 | 0x80); - U_RETURN(false); + u_put_unalignedp16(header+2, htons(len)); + u_put_unalignedp32(header+4, *((uint32_t*)masking_key)); + break; + } + case 16: + { + header[1] = (127 | 0x80); + + u_put_unalignedp64(header+2, htonl(len)); + u_put_unalignedp32(header+10, *((uint32_t*)masking_key)); + break; } - for (uint32_t i = 0; i < len; ++i) + default: break; // never reached + } + switch (header_length) + { + // server + case 2: + case 4: + case 12: { - header[6+i] = (data[i] ^ masking_key[i % 4]) & 0xff; + for (uint32_t i = 0; i < len; ++i) + { + header[2+i] = data[i]; + } + break; } + // client + case 6: + case 8: + case 16: + { + for (uint32_t i = 0; i < len; ++i) + { + header[6+i] = (data[i] ^ masking_key[i % 4]) & 0xff; + } + break; + } + + default: break; // never reached + } U_SRV_LOG_WITH_ADDR("send websocket data (%u+%u bytes) %.*S to", header_length, len, len, data) @@ -690,27 +752,40 @@ bool UWebSocket::sendData(USocket* socket, int type, const char* data, uint32_t U_RETURN(false); } -bool UWebSocket::sendControlFrame(USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length) +bool UWebSocket::sendControlFrame(const bool isServer, USocket* socket, int opcode, const unsigned char* payload, uint32_t payload_length) { - U_TRACE(0, "UWebSocket::sendControlFrame(%p,%d,%.*S,%u)", socket, opcode, payload_length, payload, payload_length) + U_TRACE(0, "UWebSocket::sendControlFrame(%b,%p,%d,%.*S,%u)", isServer, socket, opcode, payload_length, payload, payload_length) - uint8_t masking_key[4]; - uint32_t ncount = 6U + payload_length; + uint32_t ncount = (isServer ? 2U : 6U) + payload_length; UString tmp(ncount); unsigned char* header = (unsigned char*)tmp.data(); - *((uint32_t*)masking_key) = u_get_num_random(); + header[0] = (opcode | 0x80); - header[0] = ( opcode | 0x80); - header[1] = (payload_length | 0x80); + if (isServer) + { + header[1] = payload_length; - u_put_unalignedp32(header+2, *((uint32_t*)masking_key)); - - for (uint32_t i = 0; i < payload_length; ++i) + for (uint32_t i = 0; i < payload_length; ++i) { - header[6+i] = (payload[i] ^ masking_key[i % 4]) & 0xff; + header[2+i] = payload[i]; } + } + else + { + header[1] = (payload_length | 0x80); + + uint8_t masking_key[4]; + *((uint32_t*)masking_key) = u_get_num_random(); + + u_put_unalignedp32(header+2, *((uint32_t*)masking_key)); + + for (uint32_t i = 0; i < payload_length; ++i) + { + header[6+i] = (payload[i] ^ masking_key[i % 4]) & 0xff; + } + } if (USocketExt::write(socket, (const char*)header, ncount, UServer_Base::timeoutMS) == ncount) { @@ -776,7 +851,7 @@ loop: rbuffer->setEmpty(); if (UServices::read(UProcess::filedes[2], *rbuffer) && - sendData(UServer_Base::csocket, message_type, *rbuffer)) + sendData(true, UServer_Base::csocket, message_type, *rbuffer)) { rbuffer->setEmpty(); @@ -811,7 +886,7 @@ data: if (handleDataFraming(rbuffer, UServer_Base::csocket) == U_WS_STATUS_CO // Send server-side closing handshake if (UServer_Base::csocket->isOpen() && - sendClose(UServer_Base::csocket)) + sendClose(true, UServer_Base::csocket)) { UClientImage_Base::close(); } diff --git a/tests/examples/TSA/tsaserial b/tests/examples/TSA/tsaserial index 306beef9..fb22411e 100644 --- a/tests/examples/TSA/tsaserial +++ b/tests/examples/TSA/tsaserial @@ -1 +1 @@ -05D2 +05D8