diff --git a/include/ulib/debug/trace.h b/include/ulib/debug/trace.h index 11b11717..15f3364d 100644 --- a/include/ulib/debug/trace.h +++ b/include/ulib/debug/trace.h @@ -135,5 +135,4 @@ private: void set(int level) U_NO_EXPORT; }; - #endif diff --git a/include/ulib/file.h b/include/ulib/file.h index c807f42e..0f517c94 100644 --- a/include/ulib/file.h +++ b/include/ulib/file.h @@ -49,6 +49,7 @@ #define U_FILE_TO_PARAM(file) (file).getPathRelativ(),(file).getPathRelativLen() #define U_FILE_TO_TRACE(file) (file).getPathRelativLen(),(file).getPathRelativ() #define U_FILE_TO_STRING(file) (file).getPath().substr((file).getPath().distance((file).getPathRelativ()),(file).getPathRelativLen()) +#define U_FILE_STREQ(file,str) U_STREQ((file).getPathRelativ(),(file).getPathRelativLen(),str) class URDB; class UHTTP; diff --git a/include/ulib/net/server/server.h b/include/ulib/net/server/server.h index 17f4dd9a..02f434e5 100644 --- a/include/ulib/net/server/server.h +++ b/include/ulib/net/server/server.h @@ -746,7 +746,7 @@ public: va_end(argp); - lockSSE(); + // lockSSE(); (void) U_SYSCALL(write, "%u,%S,%u", sse_event_fd, buffer, len); } @@ -755,13 +755,26 @@ public: { U_TRACE(0, "UServer_Base::sendToIdSSE(%V,%V)", id.rep, data.rep) + U_ASSERT_EQUALS(u_find(U_STRING_TO_PARAM(data),"\n",1), U_NULLPTR) + eventSSE(U_CONSTANT_TO_PARAM("MSG %v %v\n"), id.rep, data.rep); } + static void sendToAllSSE(const UString& subscribe, const UString& data) + { + U_TRACE(0, "UServer_Base::sendToAllSSE(%V,%V)", subscribe.rep, data.rep) + + U_ASSERT_EQUALS(u_find(U_STRING_TO_PARAM(data),"\n",1), U_NULLPTR) + + eventSSE(U_CONSTANT_TO_PARAM("%v=%v\n"), subscribe.rep, data.rep); + } + static void sendToAllSSE(const UString& data) { U_TRACE(0, "UServer_Base::sendToAllSSE(%V)", data.rep) + U_ASSERT_EQUALS(u_find(U_STRING_TO_PARAM(data),"\n",1), U_NULLPTR) + eventSSE(U_CONSTANT_TO_PARAM("*=%v\n"), data.rep); } @@ -769,6 +782,8 @@ public: { U_TRACE(0, "UServer_Base::sendToAllExceptSSE(%V)", data.rep) + U_ASSERT_EQUALS(u_find(U_STRING_TO_PARAM(data),"\n",1), U_NULLPTR) + eventSSE(U_CONSTANT_TO_PARAM("%v-%v=%v\n"), (sse_event ? sse_event : str_asterisk)->rep, sse_id->rep, data.rep); } #endif diff --git a/include/ulib/utility/uhttp.h b/include/ulib/utility/uhttp.h index dc4344e8..479eb956 100644 --- a/include/ulib/utility/uhttp.h +++ b/include/ulib/utility/uhttp.h @@ -685,6 +685,11 @@ public: static int sse_pipe_fd; static const char* sse_corsbase; +# ifdef USE_LIBSSL + static UString SSE_event() { return UString::getStringNull(); } +# endif + + static void manageSSE(); static void readSSE(int timeoutMS) __noreturn; static void sendSSE(const UString& data) { diff --git a/src/ulib/base/base_trace.c b/src/ulib/base/base_trace.c index 1437a448..863bbb0b 100644 --- a/src/ulib/base/base_trace.c +++ b/src/ulib/base/base_trace.c @@ -117,14 +117,17 @@ void u_trace_writev(const struct iovec* restrict iov, int n) U_INTERNAL_ASSERT_MINOR(u_trace_num_tab, sizeof(u_trace_tab)) - u_trace_lock(); - - if (file_size == 0) (void) writev(u_trace_fd, iov, n); + if (file_size == 0) + { + if (u_trace_fd != -1) (void) writev(u_trace_fd, iov, n); + } else { - int i = 0; + int i; - for (; i < n; ++i) + u_trace_lock(); + + for (i = 0; i < n; ++i) { /* U_INTERNAL_PRINT("iov[%d].iov_len = %d iov[%d].iov_base = %p", i, iov[i].iov_len, i, iov[i].iov_base) */ @@ -139,9 +142,9 @@ void u_trace_writev(const struct iovec* restrict iov, int n) file_ptr += iov[i].iov_len; } } - } - u_trace_unlock(); + u_trace_unlock(); + } } void u_trace_write(const char* restrict t, uint32_t tlen) @@ -280,11 +283,13 @@ void u_trace_init(bool bsignal) /* NB: O_RDWR is needed for mmap(MAP_SHARED)... */ - u_trace_fd = open(name, O_CREAT | O_RDWR | O_BINARY | (u_fork_called ? O_APPEND : 0), 0666); + u_trace_fd = open(name, O_CREAT | O_TRUNC | O_RDWR | O_BINARY | O_APPEND, 0666); if (u_trace_fd == -1) { - U_WARNING("Failed to create file %S - current working directory: %.*S - UTRACE_FOLDER: %S", name, u_cwd_len, u_cwd, u_trace_folder); + U_WARNING("Failed to create file %S%R - current working directory: %.*S - UTRACE_FOLDER: %S", name, 0, u_cwd_len, u_cwd, u_trace_folder); + + file_size = 0; return; } @@ -387,6 +392,8 @@ int u_trace_check_if_active(int level) u_trace_handlerSignal(); u_print_status_trace(); + + if (u_trace_fd == -1) return 0; } U_INTERNAL_PRINT("u_trace_fd = %d level_active = %d u_trace_mask_level = %p", u_trace_fd, level_active, u_trace_mask_level) diff --git a/src/ulib/file.cpp b/src/ulib/file.cpp index a76eac8c..f9f6198c 100644 --- a/src/ulib/file.cpp +++ b/src/ulib/file.cpp @@ -21,6 +21,10 @@ # include #endif +#ifdef HAVE_NUMA +# include +#endif + char* UFile::cwd_save; char* UFile::pfree; long UFile::nr_hugepages; @@ -384,6 +388,9 @@ char* UFile::mmap_anon_huge(uint32_t* plength, int flags) { U_TRACE(1, "UFile::mmap_anon_huge(%p,%d,%u)", plength, flags) + char* ptr; + uint32_t length; + #ifdef U_LINUX if (nr_hugepages) { @@ -394,35 +401,51 @@ char* UFile::mmap_anon_huge(uint32_t* plength, int flags) # ifdef MAP_HUGE_1GB /* (since Linux 3.8) */ if (*plength >= U_1G) { - uint32_t length = (*plength + U_1G_MASK) & ~U_1G_MASK; // NB: munmap() length of MAP_HUGETLB memory must be hugepage aligned... + length = (*plength + U_1G_MASK) & ~U_1G_MASK; // NB: munmap() length of MAP_HUGETLB memory must be hugepage aligned... U_INTERNAL_ASSERT_EQUALS(length & U_1G_MASK, 0) U_DEBUG("We are going to allocate (%u GB - %u bytes) MAP_HUGE_1GB - nfree = %u flags = %B", length / U_1G, length, nfree, flags | U_MAP_ANON_HUGE | MAP_HUGE_1GB) - char* ptr = (char*) U_SYSCALL(mmap, "%p,%u,%d,%d,%d,%I", U_MAP_ANON_HUGE_ADDR, length, PROT_READ | PROT_WRITE, flags | U_MAP_ANON_HUGE | MAP_HUGE_1GB, -1, 0); + ptr = (char*) U_SYSCALL(mmap, "%p,%u,%d,%d,%d,%I", U_MAP_ANON_HUGE_ADDR, length, PROT_READ | PROT_WRITE, flags | U_MAP_ANON_HUGE | MAP_HUGE_1GB, -1, 0); if (ptr != (char*)MAP_FAILED) { *plength = length; +# ifdef HAVE_NUMA + if ((flags & MAP_SHARED) == 0 && + U_SYSCALL(mbind, "%p,%lu,%d,%p,%lu,%u", ptr, length, MPOL_PREFERRED, U_NULLPTR, 0UL, MPOL_MF_MOVE) == -1) + { + WARNING("unable to mbind memory; performance may suffer"); + } +# endif + return ptr; } } # endif # ifdef MAP_HUGE_2MB /* (since Linux 3.8) */ - uint32_t length = (*plength + U_2M_MASK) & ~U_2M_MASK; // NB: munmap() length of MAP_HUGETLB memory must be hugepage aligned... + length = (*plength + U_2M_MASK) & ~U_2M_MASK; // NB: munmap() length of MAP_HUGETLB memory must be hugepage aligned... U_INTERNAL_ASSERT_EQUALS(length & U_2M_MASK, 0) U_DEBUG("We are going to allocate (%u MB - %u bytes) MAP_HUGE_2MB - nfree = %u flags = %B", length / (1024U*1024U), length, nfree, flags | U_MAP_ANON_HUGE | MAP_HUGE_2MB) - char* ptr = (char*) U_SYSCALL(mmap, "%p,%u,%d,%d,%d,%I", U_MAP_ANON_HUGE_ADDR, length, PROT_READ | PROT_WRITE, flags | U_MAP_ANON_HUGE | MAP_HUGE_2MB, -1, 0); + ptr = (char*) U_SYSCALL(mmap, "%p,%u,%d,%d,%d,%I", U_MAP_ANON_HUGE_ADDR, length, PROT_READ | PROT_WRITE, flags | U_MAP_ANON_HUGE | MAP_HUGE_2MB, -1, 0); if (ptr != (char*)MAP_FAILED) { *plength = length; +# ifdef HAVE_NUMA + if ((flags & MAP_SHARED) == 0 && + U_SYSCALL(mbind, "%p,%lu,%d,%p,%lu,%u", ptr, length, MPOL_PREFERRED, U_NULLPTR, 0UL, MPOL_MF_MOVE) == -1) + { + WARNING("unable to mbind memory; performance may suffer"); + } +# endif + return ptr; } @@ -450,7 +473,17 @@ char* UFile::mmap_anon_huge(uint32_t* plength, int flags) U_DEBUG("We are going to allocate (%u KB - %u bytes) - nfree = %u flags = %B", *plength / 1024U, *plength, nfree, flags) - return (char*) U_SYSCALL(mmap, "%p,%u,%d,%d,%d,%I", U_NULLPTR, *plength, PROT_READ | PROT_WRITE, flags, -1, 0); + ptr = (char*) U_SYSCALL(mmap, "%p,%u,%d,%d,%d,%I", U_NULLPTR, *plength, PROT_READ | PROT_WRITE, flags, -1, 0); + +#ifdef HAVE_NUMA + if ((flags & MAP_SHARED) == 0 && + U_SYSCALL(mbind, "%p,%lu,%d,%p,%lu,%u", ptr, *plength, MPOL_PREFERRED, U_NULLPTR, 0UL, MPOL_MF_MOVE) == -1) + { + WARNING("unable to mbind memory; performance may suffer"); + } +#endif + + return ptr; } char* UFile::mmap(uint32_t* plength, int _fd, int prot, int flags, uint32_t offset) diff --git a/src/ulib/net/server/client_image.cpp b/src/ulib/net/server/client_image.cpp index 4b35661c..6079c8b1 100644 --- a/src/ulib/net/server/client_image.cpp +++ b/src/ulib/net/server/client_image.cpp @@ -1641,11 +1641,18 @@ bool UClientImage_Base::writeResponse() idx = 0; iovcnt = 4; +# ifdef U_SSE_ENABLE // SERVER SENT EVENTS (SSE) + U_INTERNAL_DUMP("UHTTP::sse_func = %p", UHTTP::sse_func) + + if (UHTTP::sse_func != (void*)1L) +# endif + { if (U_ClientImage_close && U_ClientImage_pipeline == false) { iov_vec[1].iov_len += 17+2; // Connection: close\r\n } + } ncount += iov_vec[0].iov_len + iov_vec[1].iov_len; diff --git a/src/ulib/net/server/plugin/mod_nocat.cpp b/src/ulib/net/server/plugin/mod_nocat.cpp index 83e0d7e4..1a4c6b18 100644 --- a/src/ulib/net/server/plugin/mod_nocat.cpp +++ b/src/ulib/net/server/plugin/mod_nocat.cpp @@ -2672,7 +2672,7 @@ next: (void) getARPCache(); goto end; } */ - + (void) buffer.assign(U_CONSTANT_TO_PARAM("IPHONE")); goto set_redirect_to_AUTH; diff --git a/src/ulib/net/server/plugin/usp/usp_translator.cpp b/src/ulib/net/server/plugin/usp/usp_translator.cpp index e7def9ed..fb275b04 100644 --- a/src/ulib/net/server/plugin/usp/usp_translator.cpp +++ b/src/ulib/net/server/plugin/usp/usp_translator.cpp @@ -119,7 +119,8 @@ public: pos = tmp.find('('); id = (pos == U_NOT_FOUND ? tmp : tmp.substr(0U, pos)); - output0.snprintf_add(U_CONSTANT_TO_PARAM("\n\tUString %v = %s(%u);\n"), id.rep, name, i+binc); + if (id.first_char() != '&') output0.snprintf_add(U_CONSTANT_TO_PARAM("\n\tUString %v = %s(%u);\n"), id.rep, name, i+binc); + else output0.snprintf_add(U_CONSTANT_TO_PARAM("\n\t%.*s = %s(%u);\n"), id.size()-1, id.c_pointer(1), name, i+binc); if (pos != U_NOT_FOUND) { @@ -295,7 +296,21 @@ public: setDirectiveItem(directive, U_CONSTANT_SIZE("args")); - (void) output0.append(U_CONSTANT_TO_PARAM("\t\n\tif (UHTTP::isGETorPOST()) (void) UHTTP::processForm();\n")); + if (token && + token.first_char() == ':') + { + char buffer[128]; + const char* data = token.c_pointer(1); + uint32_t sz = (*data == 'G' ? 3 : 4); // 3 => GET | 4 => POST + + (void) output0.append(buffer, u__snprintf(buffer, sizeof(buffer), U_CONSTANT_TO_PARAM("\t\n\tif (UHTTP::is%.*s()) (void) UHTTP::processForm();\n"), sz, data)); + + (void) token.erase(0, 1+sz+1); + } + else + { + (void) output0.append(U_CONSTANT_TO_PARAM("\t\n\tif (UHTTP::isGETorPOST()) (void) UHTTP::processForm();\n")); + } if (token) manageDirectiveArgsOrCpath("USP_FORM_VALUE", false); } diff --git a/src/ulib/net/server/server.cpp b/src/ulib/net/server/server.cpp index b31d72f6..5c57083d 100644 --- a/src/ulib/net/server/server.cpp +++ b/src/ulib/net/server/server.cpp @@ -1452,7 +1452,7 @@ public: UServices::read(UServer_Base::sse_event_fd, input)) { /** - * NEW + * NEW * DEL * * LIST - List clients to /tmp/SSE_list.txt @@ -1472,14 +1472,15 @@ public: { ptr = row.data(); - if (u_get_unalignedp32(ptr) == U_MULTICHAR_CONSTANT32('N','E','W',' ')) // NEW + if (u_get_unalignedp32(ptr) == U_MULTICHAR_CONSTANT32('N','E','W',' ')) // NEW { - _id = vec[k+1]; - token = vec[k+2]; - last_event_id = vec[k+3].strtoul(); - bprocess = vec[k+4].strtoul(); + _id = vec[k+1]; + token = vec[k+2]; + last_event_id = vec[k+3].strtoul(); + bprocess = ((fd = vec[k+4].strtol()) == -1); + + U_INTERNAL_DUMP("_id = %V token = %V fd = %d bprocess = %b last_event_id = %u U_SRV_SSE_CNT1 = %u", _id.rep, token.rep, fd, bprocess, last_event_id, U_SRV_SSE_CNT1) - U_INTERNAL_DUMP("_id = %V token = %V bprocess = %b last_event_id = %u U_SRV_SSE_CNT1 = %u", _id.rep, token.rep, bprocess, last_event_id, U_SRV_SSE_CNT1) if ((ball = token.equal(*UServer_Base::str_asterisk))) token = *UServer_Base::str_asterisk; else token.duplicate(); @@ -1513,7 +1514,16 @@ public: output.setEmpty(); } - if (bprocess == false) fd = (U_SYSCALL(recvmsg, "%u,%p,%u", UServer_Base::sse_socketpair[0], &UServer_Base::msg, 0) == 1 ? UServer_Base::cmsg.cmsg_data : -1); + if (bprocess == false) + { + // int rfd = fd; + + fd = (U_SYSCALL(recvmsg, "%u,%p,%u", UServer_Base::sse_socketpair[0], &UServer_Base::msg, 0) == 1 ? UServer_Base::cmsg.cmsg_data : -1); + + U_INTERNAL_DUMP("fd = %d", fd) + + // U_INTERNAL_ASSERT_EQUALS(fd, rfd) + } else { UServer_Base::setFIFOForSSE(_id); @@ -1655,7 +1665,7 @@ public: } } - UServer_Base::unlockSSE(); + // UServer_Base::unlockSSE(); vec.clear(); rID.clear(); diff --git a/src/ulib/process.cpp b/src/ulib/process.cpp index 08dab98c..e0df656b 100644 --- a/src/ulib/process.cpp +++ b/src/ulib/process.cpp @@ -81,11 +81,11 @@ U_NO_EXPORT void UProcess::setStdInOutErr(bool fd_stdin, bool fd_stdout, bool fd # else U_INTERNAL_ASSERT_MAJOR(filedes[0], STDERR_FILENO) -# ifndef HAVE_DUP3 +# ifndef HAVE_DUP3 (void) U_SYSCALL(dup2, "%d,%d", filedes[0], STDIN_FILENO); -# else +# else (void) U_SYSCALL(dup3, "%d,%d,%d", filedes[0], STDIN_FILENO, O_CLOEXEC); -# endif +# endif U_INTERNAL_ASSERT_EQUALS(::fcntl(STDIN_FILENO,F_GETFD,FD_CLOEXEC), 0) # endif @@ -113,11 +113,11 @@ U_NO_EXPORT void UProcess::setStdInOutErr(bool fd_stdin, bool fd_stdout, bool fd # else U_INTERNAL_ASSERT_MAJOR(filedes[3], STDOUT_FILENO) -# ifndef HAVE_DUP3 +# ifndef HAVE_DUP3 (void) U_SYSCALL(dup2, "%d,%d", filedes[3], STDOUT_FILENO); -# else +# else (void) U_SYSCALL(dup3, "%d,%d,%d", filedes[3], STDOUT_FILENO, O_CLOEXEC); -# endif +# endif U_INTERNAL_ASSERT_EQUALS(::fcntl(STDOUT_FILENO,F_GETFD,FD_CLOEXEC), 0) # endif @@ -145,11 +145,11 @@ U_NO_EXPORT void UProcess::setStdInOutErr(bool fd_stdin, bool fd_stdout, bool fd # else U_INTERNAL_ASSERT(filedes[5] >= STDIN_FILENO) -# ifndef HAVE_DUP3 +# ifndef HAVE_DUP3 (void) U_SYSCALL(dup2, "%d,%d", filedes[5], STDERR_FILENO); -# else +# else (void) U_SYSCALL(dup3, "%d,%d,%d", filedes[5], STDERR_FILENO, O_CLOEXEC); -# endif +# endif U_INTERNAL_ASSERT_EQUALS(::fcntl(STDERR_FILENO,F_GETFD,FD_CLOEXEC), 0) # endif diff --git a/src/ulib/utility/uhttp.cpp b/src/ulib/utility/uhttp.cpp index 032bd817..974d456a 100644 --- a/src/ulib/utility/uhttp.cpp +++ b/src/ulib/utility/uhttp.cpp @@ -4478,11 +4478,19 @@ file_in_cache: { U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_PARENT) +# ifdef U_SSE_ENABLE + U_INTERNAL_DUMP("sse_func = %p", sse_func) + + if (sse_func) manageSSE(); + else +# endif + { # ifdef USE_LOAD_BALANCE if (UClientImage_Base::isNoHeaderForResponse() == false) # endif setDynamicResponse(); } + } U_RESET_MODULE_NAME; @@ -4586,6 +4594,25 @@ from_cache: } #endif +#ifdef U_SSE_ENABLE + if (UClientImage_Base::isRequestNotFound() && // => 3) + U_FILE_STREQ(*file, "sse_event") && + u_get_unalignedp64(U_http_info.accept+8) == U_MULTICHAR_CONSTANT64('n','t','-','s','t','r','e','a')) + { + usp = U_NULLPTR; + +# ifdef USE_LIBSSL + if (UServer_Base::bssl) sse_func = SSE_event; + else +# endif + sse_func = (UHTTP::strPF)(void*)1L; + + manageSSE(); + + U_RETURN(U_PLUGIN_HANDLER_OK); + } +#endif + #ifndef U_SERVER_CAPTIVE_PORTAL U_INTERNAL_DUMP("U_http_websocket_len = %u", U_http_websocket_len) @@ -5318,18 +5345,14 @@ void UHTTP::setEndRequestProcessing() if (sse_func) { - if (UServer_Base::startParallelization()) + if (sse_func == (void*)1L || + UServer_Base::startParallelization()) // parent { - // parent - sse_func = U_NULLPTR; return; } - U_INTERNAL_ASSERT_POINTER(usp) - U_INTERNAL_ASSERT_POINTER(usp->runDynamicPage) - U_SET_MODULE_NAME(sse); UServer_Base::setFIFOForSSE(*UServer_Base::sse_id); @@ -5350,6 +5373,12 @@ loop: sse_pipe_fd = UFile::open(UServer_Base::sse_fifo_name, O_RDONLY, 0); U_ERROR("Error on opening SSE FIFO: %S", UServer_Base::sse_fifo_name); } + U_INTERNAL_DUMP("usp = %p", usp) + + if (usp == U_NULLPTR) readSSE(-1); + + U_INTERNAL_ASSERT_POINTER(usp->runDynamicPage) + usp->runDynamicPage(1); } #endif @@ -7142,6 +7171,114 @@ __noreturn void UHTTP::readSSE(int timeoutMS) } } } + +void UHTTP::manageSSE() +{ + U_TRACE_NO_PARAM(1, "UHTTP::manageSSE()") + + U_INTERNAL_DUMP("Accept: = %.*S", U_HTTP_ACCEPT_TO_TRACE) + + U_ASSERT(isGET()) + + /** + * we must insert: + * + * + * + * + */ + + UString subscribe; + bool bprocess = (sse_func != (void*)1L); + const char* ptr = getHeaderValuePtr(U_CONSTANT_TO_PARAM("last-event-id"), true); + uint32_t last_event_id = (ptr ? u_atoi(ptr) : 0); + + UServer_Base::sse_id->clear(); + + (void) UClientImage_Base::wbuffer->snprintf(U_CONSTANT_TO_PARAM("Access-Control-Allow-Origin: %s\r\n" + "Access-Control-Allow-Methods: GET\r\n" + "Access-Control-Allow-Headers: cache-control, last-event-id, X-Requested-With\r\n" + "Content-Type: text/event-stream\r\n" + "Cache-Control: no-cache\r\n\r\n"), sse_corsbase); + + if (processForm()) + { + getFormValue(subscribe, 1); + getFormValue(*UServer_Base::sse_id, 3); + } + + if (subscribe) UServer_Base::sse_event = &subscribe; + else + { + subscribe = *UServer_Base::str_asterisk; + UServer_Base::sse_event = U_NULLPTR; + } + + if (UServer_Base::sse_id->empty()) + { + U_SRV_SSE_CNT2++; + + U_INTERNAL_DUMP("U_SRV_SSE_CNT2 = %u", U_SRV_SSE_CNT2) + + *UServer_Base::sse_id = UStringExt::numberToString(U_SRV_SSE_CNT2); + } + + U_INTERNAL_DUMP("last_event_id = %u U_SRV_SSE_CNT1 = %u", last_event_id, U_SRV_SSE_CNT1) + + UServer_Base::eventSSE(U_CONSTANT_TO_PARAM("NEW %v %v %u %d\n"), UServer_Base::sse_id->rep, subscribe.rep, last_event_id, bprocess ? -1 : UServer_Base::csocket->getFd()); + + if (ptr && + last_event_id < U_SRV_SSE_CNT1) + { + UString buffer(U_CAPACITY); + + (void) UServices::read(UServer_Base::sse_event_fd, buffer); + + uint32_t sz = buffer.size(); + + if (sz > 1) + { + U_INTERNAL_ASSERT_EQUALS(u_get_unalignedp16(buffer.data()), U_MULTICHAR_CONSTANT16('i','d')) + + UClientImage_Base::wbuffer->append(buffer.data(), sz-1); + } + } + + if (bprocess) + { + UString sse_data = sse_func(); + + if (sse_data) + { + UServer_Base::sendToAllExceptSSE(sse_data); + + UClientImage_Base::wbuffer->append(UServer_Base::printSSE(U_SRV_SSE_CNT1, sse_data, UServer_Base::sse_event)); + } + } + else + { + UServer_Base::cmsg.cmsg_data = UServer_Base::csocket->getFd(); + + (void) U_SYSCALL(sendmsg, "%u,%p,%u", UServer_Base::sse_socketpair[1], &UServer_Base::msg, 0); + + UClientImage_Base::resetPipelineAndSetCloseConnection(); + } + + *ext = *UClientImage_Base::wbuffer; + + handlerResponse(); + + if (UServer_Base::bssl == false) (void) UServer_Base::csocket->shutdown(SHUT_RD); // disables input from the socket... +} #endif void UHTTP::setDynamicResponse() @@ -7153,121 +7290,6 @@ void UHTTP::setDynamicResponse() U_INTERNAL_ASSERT_MAJOR(U_http_info.nResponseCode, 0) -#ifdef U_SSE_ENABLE - U_INTERNAL_DUMP("sse_func = %p", sse_func) - - if (sse_func) - { - U_INTERNAL_DUMP("Accept: = %.*S", U_HTTP_ACCEPT_TO_TRACE) - - U_ASSERT(isGET()) - U_INTERNAL_ASSERT_EQUALS(u_get_unalignedp64(U_http_info.accept+8), U_MULTICHAR_CONSTANT64('n','t','-','s','t','r','e','a')) - - /** - * we must insert: - * - * - * - * - */ - - UString subscribe; - bool bprocess = (sse_func != (void*)1L); - const char* ptr = getHeaderValuePtr(U_CONSTANT_TO_PARAM("last-event-id"), true); - uint32_t last_event_id = (ptr ? u_atoi(ptr) : 0); - - UServer_Base::sse_id->clear(); - - if (UServer_Base::bssl == false) (void) UServer_Base::csocket->shutdown(SHUT_RD); - - (void) UClientImage_Base::wbuffer->snprintf(U_CONSTANT_TO_PARAM("Access-Control-Allow-Origin: %s\r\n" - "Access-Control-Allow-Methods: GET\r\n" - "Access-Control-Allow-Headers: cache-control, last-event-id, X-Requested-With\r\n" - "Content-Type: text/event-stream\r\n" - "Cache-Control: no-cache\r\n\r\n"), sse_corsbase); - - if (processForm()) - { - getFormValue(subscribe, 1); - getFormValue(*UServer_Base::sse_id, 3); - } - - if (subscribe) UServer_Base::sse_event = &subscribe; - else - { - subscribe = *UServer_Base::str_asterisk; - UServer_Base::sse_event = U_NULLPTR; - } - - if (UServer_Base::sse_id->empty()) - { - U_SRV_SSE_CNT2++; - - U_INTERNAL_DUMP("U_SRV_SSE_CNT2 = %u", U_SRV_SSE_CNT2) - - *UServer_Base::sse_id = UStringExt::numberToString(U_SRV_SSE_CNT2); - } - - U_INTERNAL_DUMP("last_event_id = %u U_SRV_SSE_CNT1 = %u", last_event_id, U_SRV_SSE_CNT1) - - UServer_Base::eventSSE(U_CONSTANT_TO_PARAM("NEW %v %v %u %u\n"), UServer_Base::sse_id->rep, subscribe.rep, last_event_id, bprocess); - - if (ptr && - last_event_id < U_SRV_SSE_CNT1) - { - UString buffer(U_CAPACITY); - - (void) UServices::read(UServer_Base::sse_event_fd, buffer); - - uint32_t sz = buffer.size(); - - if (sz > 1) - { - U_INTERNAL_ASSERT_EQUALS(u_get_unalignedp16(buffer.data()), U_MULTICHAR_CONSTANT16('i','d')) - - UClientImage_Base::wbuffer->append(buffer.data(), sz-1); - } - } - - if (bprocess) - { - UString sse_data = sse_func(); - - if (sse_data) - { - UServer_Base::sendToAllExceptSSE(sse_data); - - UClientImage_Base::wbuffer->append(UServer_Base::printSSE(U_SRV_SSE_CNT1, sse_data, UServer_Base::sse_event)); - } - } - else - { - sse_func = U_NULLPTR; - - UServer_Base::cmsg.cmsg_data = UServer_Base::csocket->getFd(); - - (void) U_SYSCALL(sendmsg, "%u,%p,%u", UServer_Base::sse_socketpair[1], &UServer_Base::msg, 0); - - UClientImage_Base::resetPipelineAndSetCloseConnection(); - } - - *ext = *UClientImage_Base::wbuffer; - - handlerResponse(); - - return; - } -#endif - char* ptr; const char* pEndHeader; uint32_t clength = UClientImage_Base::wbuffer->size(); diff --git a/tests/examples/sse_example/chat.usp b/tests/examples/sse_example/chat.usp new file mode 100644 index 00000000..b2405209 --- /dev/null +++ b/tests/examples/sse_example/chat.usp @@ -0,0 +1,84 @@ + + + + + + SSE (Server Sent Events) demo page + + + + + +

SSE (Server Sent Events) demo page

+ +

+ + diff --git a/tests/examples/sse_example/get_ticker1.usp b/tests/examples/sse_example/get_ticker1.usp deleted file mode 100644 index 49c7b49d..00000000 --- a/tests/examples/sse_example/get_ticker1.usp +++ /dev/null @@ -1,2 +0,0 @@ - diff --git a/tests/examples/sse_example/index1.html b/tests/examples/sse_example/index1.html index 3657b41b..8f51a3d8 100644 --- a/tests/examples/sse_example/index1.html +++ b/tests/examples/sse_example/index1.html @@ -8,7 +8,7 @@