diff --git a/Makefile.am b/Makefile.am index b2a13e69..5bc168a8 100644 --- a/Makefile.am +++ b/Makefile.am @@ -51,6 +51,7 @@ coverage_html: @genhtml --ignore-errors source -o html coverage.info install-data-local: + @cd $(top_builddir)/src/ulib/net/server/plugin/usp; $(MAKE) $(AM_MAKEFLAGS) usp_compile.sh @${INSTALL} -m 777 $(top_builddir)/libtool $(DESTDIR)${bindir}/usp_libtool.sh; \ ${INSTALL} -m 777 $(top_builddir)/src/ulib/net/server/plugin/usp/usp_compile.sh $(DESTDIR)${bindir} diff --git a/Makefile.in b/Makefile.in index 9d477e02..babbdeac 100644 --- a/Makefile.in +++ b/Makefile.in @@ -957,6 +957,7 @@ coverage_html: @genhtml --ignore-errors source -o html coverage.info install-data-local: + @cd $(top_builddir)/src/ulib/net/server/plugin/usp; $(MAKE) $(AM_MAKEFLAGS) usp_compile.sh @${INSTALL} -m 777 $(top_builddir)/libtool $(DESTDIR)${bindir}/usp_libtool.sh; \ ${INSTALL} -m 777 $(top_builddir)/src/ulib/net/server/plugin/usp/usp_compile.sh $(DESTDIR)${bindir} diff --git a/include/ulib/base/base.h b/include/ulib/base/base.h index 7462fdfc..1cbc365b 100644 --- a/include/ulib/base/base.h +++ b/include/ulib/base/base.h @@ -123,29 +123,28 @@ extern "C" { #endif -typedef int (*iPF) (void); -typedef int (*iPFpv) (void*); -typedef unsigned (*uPFpv) (void*); -typedef int (*iPFpvpv) ( void*, void*); -typedef int (*qcompare)(const void*,const void*); -typedef bool (*bPFi) (int); -typedef bool (*bPF) (void); -typedef bool (*bPFpv) (void*); -typedef bool (*bPFpvpv) (void*,void*); -typedef bool (*bPFpc) (const char*); -typedef bool (*bPFpcu) (const char*, uint32_t); -typedef bool (*bPFpcpc) (const char*,const char*); -typedef bool (*bPFpcpv) (const char*,const void*); -typedef void (*vPF) (void); -typedef void (*vPFi) (int); -typedef void (*vPFpv) (void*); -typedef void (*vPFpc) (const char*); -typedef void (*vPFpvpc) (void*,char*); -typedef void (*vPFpvpv) (void*,void*); -typedef void (*vPFpvu) (void*,uint32_t); -typedef void* (*pvPF) (void); -typedef void* (*pvPFpv) (void*); -typedef void* (*pvPFpvpb)(void*,bool*); +typedef int (*iPF) (void); +typedef int (*iPFpv) (void*); +typedef int (*iPFpvpv) ( void*, void*); +typedef int (*qcompare)(const void*,const void*); +typedef bool (*bPFi) (int); +typedef bool (*bPF) (void); +typedef bool (*bPFpv) (void*); +typedef bool (*bPFpvpv) (void*,void*); +typedef bool (*bPFpc) (const char*); +typedef bool (*bPFpcu) (const char*, uint32_t); +typedef bool (*bPFpcpc) (const char*,const char*); +typedef bool (*bPFpcpv) (const char*,const void*); +typedef void (*vPF) (void); +typedef void (*vPFi) (int); +typedef void (*vPFpv) (void*); +typedef void (*vPFpc) (const char*); +typedef void (*vPFpvpc) (void*,char*); +typedef void (*vPFpvpv) (void*,void*); +typedef void (*vPFpvu) (void*,uint32_t); +typedef void* (*pvPF) (void); +typedef void* (*pvPFpv) (void*); +typedef void* (*pvPFpvpb)(void*,bool*); typedef struct U_DATA { unsigned char* dptr; diff --git a/include/ulib/base/trace.h b/include/ulib/base/trace.h index 117274bf..26b58d6d 100644 --- a/include/ulib/base/trace.h +++ b/include/ulib/base/trace.h @@ -25,7 +25,6 @@ extern "C" { #endif -extern U_EXPORT void* u_plock; extern U_EXPORT int u_trace_fd; extern U_EXPORT int u_trace_signal; extern U_EXPORT int u_trace_suspend; /* on-off */ diff --git a/include/ulib/notifier.h b/include/ulib/notifier.h index 0eb74318..3180df39 100644 --- a/include/ulib/notifier.h +++ b/include/ulib/notifier.h @@ -14,7 +14,7 @@ #ifndef ULIB_NOTIFIER_H #define ULIB_NOTIFIER_H -#include +#include #include /** @@ -40,7 +40,6 @@ #include #include -class UThread; class USocket; class USocketExt; class UHttpPlugIn; @@ -130,9 +129,6 @@ protected: #ifdef USE_LIBEVENT // nothing #elif defined(HAVE_EPOLL_WAIT) -# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - static void* pthread; -# endif static int epollfd; static struct epoll_event* events; #else @@ -174,21 +170,20 @@ private: static void handlerDelete(int fd, int mask); #ifndef USE_LIBEVENT - static void notifyHandlerEvent() - { - U_TRACE(0, "UNotifier::notifyHandlerEvent()") - - U_INTERNAL_ASSERT_POINTER(handler_event) - - U_INTERNAL_DUMP("handler_event = %p bread = %b nfd_ready = %d fd = %d op_mask = %d %B", - handler_event, bread, nfd_ready, handler_event->fd, handler_event->op_mask, handler_event->op_mask) - - if ((bread ? handler_event->handlerRead() - : handler_event->handlerWrite()) == U_NOTIFIER_DELETE) - { - handlerDelete(handler_event); - } - } + static void notifyHandlerEvent() U_NO_EXPORT; +# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) + static UThread* pthread; +# ifdef _MSWINDOWS_ + static CRITICAL_SECTION mutex; +# else + static pthread_mutex_t mutex; +# endif + static void lock() { if (pthread) UThread::lock(&mutex); } + static void unlock() { if (pthread) UThread::unlock(&mutex); } +# else + static void lock() {} + static void unlock() {} +# endif #endif #ifdef U_COMPILER_DELETE_MEMBERS @@ -205,5 +200,4 @@ private: friend class UServer_Base; friend class UClientImage_Base; }; - #endif diff --git a/include/ulib/process.h b/include/ulib/process.h index 0e67a304..07491eba 100644 --- a/include/ulib/process.h +++ b/include/ulib/process.h @@ -50,7 +50,7 @@ public: { U_TRACE_REGISTER_OBJECT(0, UProcess, "", 0) - _pid = (pid_t) -1; + _pid = (pid_t)-1; status = 0; running = false; } diff --git a/include/ulib/thread.h b/include/ulib/thread.h index af3cdf14..255ac9a8 100644 --- a/include/ulib/thread.h +++ b/include/ulib/thread.h @@ -15,6 +15,7 @@ #define ULIB_THREAD_H #include +#include #ifdef _MSWINDOWS_ # undef sleep @@ -26,6 +27,7 @@ #endif class UNotifier; +class UServer_Base; class U_EXPORT UThread { public: @@ -38,44 +40,44 @@ public: // COSTRUTTORI - UThread(int _detachstate); - - virtual ~UThread() - { - U_TRACE_UNREGISTER_OBJECT(0, UThread) - - if (tid) close(); - } + UThread(int detachstate); + virtual ~UThread(); // SERVICES #ifdef _MSWINDOWS_ static DWORD getTID(); -#else - static pthread_t getTID(); -#endif - // Inter Process Communication + static void lock(CRITICAL_SECTION* pmutex) + { + U_TRACE(0, "UThread::lock(%p)", pmutex) + + EnterCriticalSection(pmutex); + } + + static void unlock(CRITICAL_SECTION* pmutex) + { + U_TRACE(0, "UThread::unlock(%p)", pmutex) + + LeaveCriticalSection(pmutex); + } +#else + static pid_t getTID(); static void lock(pthread_mutex_t* pmutex) { U_TRACE(1, "UThread::lock(%p)", pmutex) -# ifndef _MSWINDOWS_ (void) U_SYSCALL(pthread_mutex_lock, "%p", pmutex); -# endif } static void unlock(pthread_mutex_t* pmutex) { U_TRACE(1, "UThread::unlock(%p)", pmutex) -# ifndef _MSWINDOWS_ (void) U_SYSCALL(pthread_mutex_unlock, "%p", pmutex); -# endif } -#ifndef _MSWINDOWS_ static bool initRwLock(pthread_rwlock_t* prwlock) { U_TRACE(1, "UThread::initRwLock(%p)", prwlock) @@ -92,6 +94,8 @@ public: U_RETURN(true); } + // Inter Process Communication + static bool initIPC(pthread_mutex_t* mutex, pthread_cond_t* cond); static void doIPC(pthread_mutex_t* mutex, pthread_cond_t* cond, vPF function, bool wait); #endif @@ -163,37 +167,48 @@ public: * with SIGSTOP, but this behaviour has now been fixed to conform to the Posix standard (so it stops all threads in the process) */ +#ifdef _MSWINDOWS_ + void resume() {} + void suspend() {} +#else void resume() { U_TRACE(0, "UThread::resume()") -# ifndef _MSWINDOWS_ - resume(tid); -# ifndef HAVE_PTHREAD_SUSPEND + U_ASSERT_EQUALS(isCurrentThread(tid), false) + +# ifdef HAVE_PTHREAD_SUSPEND + (void) U_SYSCALL(pthread_resume, "%p", tid); +# else + (void) U_SYSCALL(pthread_kill, "%p,%d", tid, U_SIGCONT); + yield(); // give the signal a time to kick in # endif -# endif } void suspend() { U_TRACE(0, "UThread::suspend()") -# ifndef _MSWINDOWS_ - suspend(tid); -# ifndef HAVE_PTHREAD_SUSPEND + U_ASSERT_EQUALS(isCurrentThread(tid), false) + +# ifdef HAVE_PTHREAD_SUSPEND + (void) U_SYSCALL(pthread_suspend, "%p", tid); +# else + (void) U_SYSCALL(pthread_kill, "%p,%d", tid, U_SIGSTOP); + yield(); // give the signal a time to kick in # endif -# endif } +#endif // Cancellation enum Cancel { - cancelInitial, /* used internally, do not use */ - cancelDeferred, /* exit thread on cancellation pointsuch as yield */ - cancelImmediate, /* exit befor cancellation */ - cancelDisabled, /* ignore cancellation */ + cancelInitial, /* used internally, do not use */ + cancelDeferred, /* exit thread on cancellation pointsuch as yield */ + cancelImmediate, /* exit befor cancellation */ + cancelDisabled, /* ignore cancellation */ cancelManual }; @@ -265,6 +280,8 @@ public: protected: UThread* next; int detachstate, cancel; + pid_t sid; + #ifdef _MSWINDOWS_ DWORD tid; HANDLE cancellation; @@ -272,31 +289,35 @@ protected: pthread_t tid; pthread_attr_t attr; int suspendCount; - - static pthread_mutex_t mlock; #endif static UThread* first; void close(); + static void threadStart(UThread* th) + { + U_TRACE(0, "UThread::threadStart(%p)", th) + + U_INTERNAL_ASSERT_POINTER(th) + + U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid) + + th->setCancel(cancelDeferred); + + th->run(); + + U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid) + + if (th->tid) th->close(); + } + #ifdef _MSWINDOWS_ - static void lock() {} - static void unlock() {} - - static void resume(pthread_t _tid) {} - static void suspend(pthread_t _tid) {} - static unsigned __stdcall execHandler(void* th); #else void sigInstall(int signo); void manageSignal(int signo); - static void lock() { lock(&mlock); } - static void unlock() { unlock(&mlock); } - - static void execHandler(UThread* th); - static void sigHandler(int signo) { U_TRACE(0, "UThread::sigHandler(%d)", signo) @@ -306,73 +327,41 @@ protected: if (th) th->manageSignal(signo); } - static bool isDetached(pthread_attr_t* pattr) + static void execHandler(UThread* th); + + static void threadCleanup(UThread* th) { - U_TRACE(1, "UThread::isDetached(%p)", pattr) + U_TRACE(0, "UThread::threadCleanup(%p)", th) - int state; + U_INTERNAL_ASSERT_POINTER(th) - (void) U_SYSCALL(pthread_attr_getdetachstate, "%p,%p", pattr, &state); + U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid) - if (state == PTHREAD_CREATE_DETACHED) U_RETURN(true); - - U_RETURN(false); + if (th->tid) th->close(); } static void stop(pthread_t _tid, pthread_attr_t* pattr) { U_TRACE(1, "UThread::stop(%p,%p)", _tid, pattr) + int state; + # ifdef HAVE_PTHREAD_CANCEL (void) U_SYSCALL(pthread_cancel, "%p", _tid); # endif - if (isDetached(pattr) == false) (void) U_SYSCALL(pthread_join, "%p,%p", _tid, 0); + (void) U_SYSCALL(pthread_attr_getdetachstate, "%p,%p", pattr, &state); + + if (state != PTHREAD_CREATE_DETACHED) (void) U_SYSCALL(pthread_join, "%p,%p", _tid, 0); # ifdef HAVE_PTHREAD_YIELD else (void) U_SYSCALL_NO_PARAM(pthread_yield); # endif } - - static void suspend(pthread_t _tid) - { - U_TRACE(1, "UThread::suspend(%p)", _tid) - - U_ASSERT_EQUALS(isCurrentThread(_tid), false) - -# ifdef HAVE_PTHREAD_SUSPEND - (void) U_SYSCALL(pthread_suspend, "%p", _tid); -# else - (void) U_SYSCALL(pthread_kill, "%p,%d", _tid, U_SIGSTOP); -# endif - } - - static void resume(pthread_t _tid) - { - U_TRACE(1, "UThread::resume(%p)", _tid) - - U_ASSERT_EQUALS(isCurrentThread(_tid), false) - -# ifdef HAVE_PTHREAD_SUSPEND - (void) U_SYSCALL(pthread_resume, "%p", _tid); -# else - (void) U_SYSCALL(pthread_kill, "%p,%d", _tid, U_SIGCONT); -# endif - } - - static void threadCleanup(void* th) - { - U_TRACE(0, "UThread::threadCleanup(%p)", th) - - U_INTERNAL_ASSERT_POINTER(th) - - U_INTERNAL_DUMP("th->tid = %p", ((UThread*)th)->tid) - - if (((UThread*)th)->tid) ((UThread*)th)->close(); - } #endif private: friend class UNotifier; + friend class UServer_Base; #ifdef U_COMPILER_DELETE_MEMBERS UThread(const UThread&) = delete; @@ -383,7 +372,7 @@ private: #endif }; -class U_EXPORT UThreadPool { +class U_EXPORT UThreadPool : public UThread { public: // Check for memory error U_MEMORY_TEST @@ -391,6 +380,56 @@ public: // Allocator e Deallocator U_MEMORY_ALLOCATOR U_MEMORY_DEALLOCATOR -}; + // COSTRUTTORI + + UThreadPool(uint32_t size); + ~UThreadPool(); + + // define method VIRTUAL of class UThread + + virtual void run() U_DECL_FINAL + { + U_TRACE(0, "UThreadPool::run()") + + /* + for (task in queue) + { + if (task == STOP_WORKING) break; + + do work; + } + */ + } + + // SERVICES + + // DEBUG + +#if defined(U_STDCPP_ENABLE) && defined(DEBUG) + const char* dump(bool reset) const; +#endif + +protected: + UVector pool; // Thread pool storage + UVector queue; // Queue to keep track of incoming tasks + bool active; + +#ifdef _MSWINDOWS_ + CRITICAL_SECTION tasksMutex; // Task queue mutex + CONDITION_VARIABLE condition; // Condition variable +#else + pthread_mutex_t tasksMutex; // Task queue mutex + pthread_cond_t condition; // Condition variable +#endif + +private: +#ifdef U_COMPILER_DELETE_MEMBERS + UThreadPool(const UThreadPool&) = delete; + UThreadPool& operator=(const UThreadPool&) = delete; +#else + UThreadPool(const UThreadPool&) : UThread(PTHREAD_CREATE_DETACHED) {} + UThreadPool& operator=(const UThreadPool&) { return *this; } +#endif +}; #endif diff --git a/src/ulib/base/base_trace.c b/src/ulib/base/base_trace.c index f0a83aa7..d1e4d9da 100644 --- a/src/ulib/base/base_trace.c +++ b/src/ulib/base/base_trace.c @@ -22,17 +22,24 @@ # include #endif -int u_trace_fd = -1; -int u_trace_signal; -int u_trace_suspend; -void* u_plock; -void* u_trace_mask_level; - +int u_trace_fd = -1; +int u_trace_signal; +int u_trace_suspend; +void* u_trace_mask_level; char u_trace_tab[256]; /* 256 max indent */ uint32_t u_trace_num_tab; -static int level_active; +static int level_active; static uint32_t file_size; +#ifdef ENABLE_THREAD +# ifdef _MSWINDOWS_ +static DWORD old_tid; +static CRITICAL_SECTION mutex; +# else +static pthread_t old_tid; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +# endif +#endif static void printInfo(void) { @@ -77,32 +84,36 @@ void u_trace_lock(void) { U_INTERNAL_TRACE("u_trace_lock()") -#if defined(HAVE_SYS_SYSCALL_H) && defined(ENABLE_THREAD) - if (u_plock) +#ifdef ENABLE_THREAD +# ifdef _MSWINDOWS_ + if (old_tid == 0) InitializeCriticalSection(&mutex); + + DWORD tid = GetCurrentThreadId(); + + EnterCriticalSection(&mutex); +# else + pthread_t tid; +# ifdef HAVE_SYS_SYSCALL_H + tid = syscall(SYS_gettid); +# endif + (void) pthread_mutex_lock(&mutex); +# endif + + if (old_tid != tid) { - static pid_t old_tid; + char tid_buffer[32]; + int sz = snprintf(tid_buffer, sizeof(tid_buffer), "[tid %ld]<--\n[tid %ld]-->\n", old_tid, tid); - pid_t tid = syscall(SYS_gettid); + old_tid = tid; - (void) pthread_mutex_lock((pthread_mutex_t*)u_plock); - - if (old_tid != tid) + if (file_size == 0) (void) write(u_trace_fd, tid_buffer, sz); + else { - char tid_buffer[32]; + if ((file_ptr + sz) > file_limit) file_ptr = file_mem; - int sz = snprintf(tid_buffer, sizeof(tid_buffer), "[tid %d]<--\n[tid %d]-->\n", old_tid, tid); + u__memcpy(file_ptr, tid_buffer, sz, __PRETTY_FUNCTION__); - old_tid = tid; - - if (file_size == 0) (void) write(u_trace_fd, tid_buffer, sz); - else - { - if ((file_ptr + sz) > file_limit) file_ptr = file_mem; - - u__memcpy(file_ptr, tid_buffer, sz, __PRETTY_FUNCTION__); - - file_ptr += sz; - } + file_ptr += sz; } } #endif @@ -112,8 +123,12 @@ void u_trace_unlock(void) { U_INTERNAL_TRACE("u_trace_unlock()") -#if defined(HAVE_SYS_SYSCALL_H) && defined(ENABLE_THREAD) - if (u_plock) (void) pthread_mutex_unlock((pthread_mutex_t*)u_plock); +#ifdef ENABLE_THREAD +# ifdef _MSWINDOWS_ + LeaveCriticalSection(&mutex); +# else + (void) pthread_mutex_unlock(&mutex); +# endif #endif } diff --git a/src/ulib/net/server/server.cpp b/src/ulib/net/server/server.cpp index db4a371e..902f0830 100644 --- a/src/ulib/net/server/server.cpp +++ b/src/ulib/net/server/server.cpp @@ -166,9 +166,23 @@ UString* UServer_Base::allow_IP_prv; UVector* UServer_Base::vallow_IP_prv; #endif -#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_) -# include +#ifdef ENABLE_THREAD +#include +class UClientThread U_DECL_FINAL : public UThread { +public: + + UClientThread() : UThread(PTHREAD_CREATE_DETACHED) {} + + virtual void run() + { + U_TRACE(0, "UClientThread::run()") + + while (UServer_Base::flag_loop) UNotifier::waitForEvent(UServer_Base::ptime); + } +}; + +# ifndef _MSWINDOWS_ class UTimeThread U_DECL_FINAL : public UThread { public: @@ -231,22 +245,9 @@ public: } }; -class UClientThread U_DECL_FINAL : public UThread { -public: - - UClientThread() : UThread(PTHREAD_CREATE_DETACHED) {} - - virtual void run() - { - U_TRACE(0, "UClientThread::run()") - - while (UServer_Base::flag_loop) UNotifier::waitForEvent(UServer_Base::ptime); - } -}; - -#if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB) -#include -#include +# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB) +# include +# include class UOCSPStapling U_DECL_FINAL : public UThread { public: @@ -285,6 +286,7 @@ ULock* UServer_Base::lock_ocsp_staple; UOCSPStapling* UServer_Base::pthread_ocsp; #endif #endif +#endif #ifndef _MSWINDOWS_ static int sysctl_somaxconn, tcp_abort_on_overflow, sysctl_max_syn_backlog, tcp_fin_timeout; @@ -352,30 +354,35 @@ UServer_Base::~UServer_Base() U_INTERNAL_ASSERT_POINTER(socket) U_INTERNAL_ASSERT_POINTER(vplugin) -#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_) +#ifdef ENABLE_THREAD + +# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) + if (preforked_num_kids == -1) + { + U_INTERNAL_ASSERT_POINTER(UNotifier::pthread) + + delete UNotifier::pthread; + } +# endif + +# ifndef _MSWINDOWS_ if (u_pthread_time) { - ((UTimeThread*)u_pthread_time)->suspend(); - - delete (UTimeThread*)u_pthread_time; // delete to join + delete (UTimeThread*)u_pthread_time; (void) U_SYSCALL(pthread_rwlock_destroy, "%p", ULog::prwlock); } -# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB) +# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB) if (bssl) { - if (pthread_ocsp) - { - pthread_ocsp->suspend(); - - delete pthread_ocsp; // delete to join - } + if (pthread_ocsp) delete pthread_ocsp; USSLSocket::cleanupStapling(); if (UServer_Base::lock_ocsp_staple) delete UServer_Base::lock_ocsp_staple; } +# endif # endif #endif @@ -1836,7 +1843,7 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo) #if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_) # if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB) - if (pthread_ocsp) pthread_ocsp->suspend(); + if ( pthread_ocsp) pthread_ocsp->suspend(); # endif if (u_pthread_time) ((UTimeThread*)u_pthread_time)->suspend(); #endif @@ -1857,7 +1864,7 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo) #if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_) # if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB) - if (pthread_ocsp) pthread_ocsp->resume(); + if ( pthread_ocsp) pthread_ocsp->resume(); # endif if (u_pthread_time) ((UTimeThread*)u_pthread_time)->resume(); #endif @@ -1932,10 +1939,10 @@ RETSIGTYPE UServer_Base::handlerForSigTERM(int signo) if (preforked_num_kids) { -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (preforked_num_kids == -1) ((UThread*)UNotifier::pthread)->suspend(); -# if defined(HAVE_SYS_SYSCALL_H) && defined(DEBUG) - if (u_plock) (void) pthread_mutex_unlock((pthread_mutex_t*)u_plock); +# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) + if (preforked_num_kids == -1) UNotifier::pthread->suspend(); +# ifdef DEBUG + u_trace_unlock(); # endif # endif @@ -2595,19 +2602,25 @@ void UServer_Base::runLoop(const char* user) if (isLog()) ULog::log("Waiting for connection on port %u", port); #endif -#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) +#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) if (preforked_num_kids == -1) { + U_INTERNAL_ASSERT_EQUALS(UNotifier::pthread, 0) + UNotifier::pthread = U_NEW(UClientThread); - ((UThread*)UNotifier::pthread)->start(50); +# ifdef _MSWINDOWS_ + InitializeCriticalSection(&UNotifier::mutex); +# endif - proc->_pid = ((UThread*)UNotifier::pthread)->getTID(); + UNotifier::pthread->start(50); + + proc->_pid = UNotifier::pthread->sid; U_ASSERT(proc->parent()) } # ifdef DEBUG - else + else { U_INTERNAL_ASSERT_EQUALS(UNotifier::pthread, 0) } diff --git a/src/ulib/notifier.cpp b/src/ulib/notifier.cpp index a873383d..2dbc24fb 100644 --- a/src/ulib/notifier.cpp +++ b/src/ulib/notifier.cpp @@ -17,10 +17,6 @@ #include #include -#if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) -# include "ulib/thread.h" -#endif - #ifdef HAVE_POLL_H # include struct pollfd UNotifier::fds[1]; @@ -70,9 +66,6 @@ void UEventFd::operator()(int _fd, short event) if (ret == U_NOTIFIER_DELETE) UNotifier::erase(this); } #elif defined(HAVE_EPOLL_WAIT) -# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) -void* UNotifier::pthread; -# endif int UNotifier::epollfd; struct epoll_event* UNotifier::events; struct epoll_event* UNotifier::pevents; @@ -98,6 +91,32 @@ UEventFd** UNotifier::lo_map_fd; UGenericHashMap* UNotifier::hi_map_fd; // maps a fd to a node pointer +#ifndef USE_LIBEVENT +U_NO_EXPORT void UNotifier::notifyHandlerEvent() +{ + U_TRACE(0, "UNotifier::notifyHandlerEvent()") + + U_INTERNAL_ASSERT_POINTER(handler_event) + + U_INTERNAL_DUMP("handler_event = %p bread = %b nfd_ready = %d fd = %d op_mask = %d %B", + handler_event, bread, nfd_ready, handler_event->fd, handler_event->op_mask, handler_event->op_mask) + + if ((bread ? handler_event->handlerRead() + : handler_event->handlerWrite()) == U_NOTIFIER_DELETE) + { + handlerDelete(handler_event); + } +} +# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) +UThread* UNotifier::pthread; +# ifdef _MSWINDOWS_ +CRITICAL_SECTION UNotifier::mutex; +# else +pthread_mutex_t UNotifier::mutex = PTHREAD_MUTEX_INITIALIZER; +# endif +# endif +#endif + void UNotifier::init(bool bacquisition) { U_TRACE(0, "UNotifier::init(%b)", bacquisition) @@ -222,15 +241,15 @@ bool UNotifier::isHandler(int fd) U_RETURN(false); } -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->lock(); -# endif + bool result; - if (hi_map_fd->find(fd)) U_RETURN(true); + lock(); -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->unlock(); -# endif + result = hi_map_fd->find(fd); + + unlock(); + + if (result) U_RETURN(true); } U_RETURN(false); @@ -243,15 +262,11 @@ void UNotifier::resetHandler(int fd) if (fd < (int32_t)max_connection) lo_map_fd[fd] = 0; else { -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->lock(); -# endif + lock(); (void) hi_map_fd->erase(fd); -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->unlock(); -# endif + unlock(); } } @@ -275,11 +290,15 @@ bool UNotifier::setHandler(int fd) } } -#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->lock(); -#endif + bool result; - if (hi_map_fd->find(fd)) + lock(); + + result = hi_map_fd->find(fd); + + unlock(); + + if (result) { handler_event = hi_map_fd->elem(); @@ -288,10 +307,6 @@ bool UNotifier::setHandler(int fd) U_RETURN(true); } -#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->unlock(); -#endif - U_RETURN(false); } @@ -310,15 +325,11 @@ void UNotifier::insert(UEventFd* item) if (fd < (int32_t)max_connection) lo_map_fd[fd] = item; else { -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->lock(); -# endif + lock(); hi_map_fd->insert(fd, item); -# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread) ((UThread*)pthread)->unlock(); -# endif + unlock(); } #ifdef USE_LIBEVENT @@ -707,7 +718,9 @@ loop: U_INTERNAL_ASSERT_POINTER(pevents->data.ptr) : handler_event->handlerWrite()) == U_NOTIFIER_DELETE) { handlerDelete(handler_event); +# ifndef U_COVERITY_FALSE_POSITIVE // Improper use of negative value (NEGATIVE_RETURNS) handler_event->fd = -1; +# endif # ifdef U_EPOLLET_POSTPONE_STRATEGY if (bepollet) pevents->events = 0; @@ -763,7 +776,9 @@ loop2: if (pevents->events) if (handler_event->handlerRead() == U_NOTIFIER_DELETE) { handlerDelete(handler_event); +# ifndef U_COVERITY_FALSE_POSITIVE // Improper use of negative value (NEGATIVE_RETURNS) handler_event->fd = -1; +# endif pevents->events = 0; } @@ -890,23 +905,15 @@ void UNotifier::clear() } } -#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread == 0) -#endif - { - UMemoryPool::_free(lo_map_fd, max_connection, sizeof(UEventFd*)); + UMemoryPool::_free(lo_map_fd, max_connection, sizeof(UEventFd*)); - hi_map_fd->deallocate(); + hi_map_fd->deallocate(); - delete hi_map_fd; - } + delete hi_map_fd; #if defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) U_INTERNAL_ASSERT_POINTER(events) -# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT) - if (pthread == 0) -# endif UMemoryPool::_free(events, max_connection + 1, sizeof(struct epoll_event)); (void) U_SYSCALL(close, "%d", epollfd); diff --git a/src/ulib/thread.cpp b/src/ulib/thread.cpp index 3058b160..6f7cd08d 100644 --- a/src/ulib/thread.cpp +++ b/src/ulib/thread.cpp @@ -34,10 +34,6 @@ extern "C" { int nanosleep (const struct timespec* requested_time, UThread* UThread::first; -#ifndef _MSWINDOWS_ -pthread_mutex_t UThread::mlock = PTHREAD_MUTEX_INITIALIZER; -#endif - UThread::UThread(int _detachstate) { U_TRACE_REGISTER_OBJECT(0, UThread, "%d", _detachstate) @@ -61,6 +57,20 @@ UThread::UThread(int _detachstate) #endif } +UThread::~UThread() +{ + U_TRACE_UNREGISTER_OBJECT(0, UThread) + + if (tid) + { +# ifndef _MSWINDOWS_ + if (isDetached()) suspend(); +# endif + + close(); + } +} + void UThread::close() { U_TRACE(0, "UThread::close()") @@ -138,26 +148,6 @@ void UThread::close() #endif } -#ifdef _MSWINDOWS_ -DWORD UThread::getTID() -{ - U_TRACE(0, "UThread::getTID()") - - DWORD _tid = GetCurrentThreadId(); - - U_RETURN(_tid); -} -#else -pthread_t UThread::getTID() -{ - U_TRACE(0, "UThread::getTID()") - - pthread_t _tid = syscall(SYS_gettid); - - U_RETURN(_tid); -} -#endif - void UThread::yield() { U_TRACE(1, "UThread::yield()") @@ -201,7 +191,38 @@ void UThread::yield() #endif } -#ifndef _MSWINDOWS_ +#ifdef _MSWINDOWS_ +DWORD UThread::getTID() +{ + U_TRACE(0, "UThread::getTID()") + + DWORD tid = GetCurrentThreadId(); + + U_RETURN(tid); +} + +unsigned __stdcall UThread::execHandler(void* th) +{ + U_TRACE(0, "UThread::::execHandler(%p)", th) + + threadStart((UThread*)th); + + U_RETURN(0); +} +#else +pid_t UThread::getTID() +{ + U_TRACE(0, "UThread::getTID()") + + pid_t tid; + +#ifdef HAVE_SYS_SYSCALL_H + tid = syscall(SYS_gettid); +#endif + + U_RETURN(tid); +} + void UThread::sigInstall(int signo) { U_TRACE(1, "UThread::sigInstall(%d)", signo) @@ -232,16 +253,17 @@ void UThread::manageSignal(int signo) { U_TRACE(1, "UThread::manageSignal(%d)", signo) + static pthread_cond_t mcond = PTHREAD_COND_INITIALIZER; + static pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; + // Mutexes are used to ensure the exclusive access, where as condition variables are used to synchronize threads based on the events. // We need Mutexes to ensure that condition variables dont end up in an infinite wait. One thing to remember is Mutex operation of lock // and unlock are guaranteed to be atomic, but the condition variables need not be. i.e The thread can get scheduled out while the condition variable wait is half way - static pthread_cond_t mcond = PTHREAD_COND_INITIALIZER; + lock(&mlock); U_INTERNAL_DUMP("suspendCount = %d", suspendCount) - (void) U_SYSCALL(pthread_mutex_lock, "%p", &mlock); - if (signo == U_SIGSTOP && suspendCount++ == 0) { @@ -264,36 +286,20 @@ void UThread::manageSignal(int signo) (void) U_SYSCALL(pthread_cond_signal, "%p", &mcond); } - (void) U_SYSCALL(pthread_mutex_unlock, "%p", &mlock); + unlock(&mlock); } -#endif -#ifdef _MSWINDOWS_ -unsigned __stdcall UThread::execHandler(void* th) -{ - U_TRACE(0, "UThread::::execHandler(%p)", th) - - U_INTERNAL_DUMP("th->tid = %p", ((UThread*)th)->tid) - - ((UThread*)th)->setCancel(cancelDeferred); - - ((UThread*)th)->run(); - - ((UThread*)th)->close(); - - U_RETURN(0); -} -#else void UThread::execHandler(UThread* th) { U_TRACE(1, "UThread::execHandler(%p)", th) +#ifdef HAVE_SYS_SYSCALL_H + th->sid = syscall(SYS_gettid); +#endif + + U_INTERNAL_ASSERT_EQUALS(pthread_self(), th->tid) + sigset_t mask; - - th->tid = (pthread_t) U_SYSCALL_NO_PARAM(pthread_self); - - U_INTERNAL_DUMP("th->tid = %p", th->tid) - #ifdef sigemptyset sigemptyset(&mask); #else @@ -337,17 +343,7 @@ void UThread::execHandler(UThread* th) th->sigInstall(U_SIGCONT); #endif - pthread_cleanup_push(threadCleanup, th); - - th->setCancel(cancelDeferred); - - th->run(); - - pthread_cleanup_pop(0); - - U_INTERNAL_DUMP("th->tid = %p", th->tid) - - if (th->tid) th->close(); + threadStart(th); } #endif @@ -357,22 +353,13 @@ bool UThread::start(uint32_t timeoutMS) U_INTERNAL_ASSERT_EQUALS(tid, 0) -#if defined(DEBUG) && !defined(_MSWINDOWS_) - if (u_plock == 0) - { - static pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER; - - u_plock = &plock; - } -#endif - next = first; first = this; U_INTERNAL_DUMP("first = %p next = %p", first, next) #ifdef _MSWINDOWS_ - (void) _beginthreadex(NULL, 0, execHandler, this, CREATE_SUSPENDED, (unsigned*)&tid); + (void) _beginthreadex(NULL, 0, execHandler, this, CREATE_SUSPENDED, (unsigned*)&sid); if (tid == 0) { @@ -416,8 +403,8 @@ void UThread::setCancel(int mode) switch (mode) { - case cancelImmediate: case cancelDeferred: + case cancelImmediate: { # ifdef HAVE_PTHREAD_CANCEL (void) U_SYSCALL(pthread_setcancelstate, "%d,%p", PTHREAD_CANCEL_ENABLE, &old); @@ -483,6 +470,8 @@ void UThread::sleep(time_t timeoutMS) { U_TRACE(1, "UThread::sleep(%ld)", timeoutMS) + U_ASSERT(isCurrentThread(tid)) + struct timespec ts = { timeoutMS / 1000L, (timeoutMS % 1000L) * 1000000L }; U_INTERNAL_ASSERT(ts.tv_sec >= 0L) @@ -560,28 +549,67 @@ bool UThread::initIPC(pthread_mutex_t* pmutex, pthread_cond_t* pcond) U_RETURN(true); } -void UThread::doIPC(pthread_mutex_t* pmutex, pthread_cond_t* pcond, vPF function, bool wait) +void UThread::doIPC(pthread_mutex_t* plock, pthread_cond_t* pcond, vPF function, bool wait) { - U_TRACE(0, "UThread::doIPC(%p,%p,%p,%b)", pmutex, pcond, function, wait) + U_TRACE(0, "UThread::doIPC(%p,%p,%p,%b)", plock, pcond, function, wait) - lock(pmutex); + lock(plock); - if (wait) (void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, pmutex); // block until we are signalled from other... + if (wait) (void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, plock); // block until we are signalled from other... function(); // ...than call function - unlock(pmutex); + unlock(plock); if (wait == false) (void) U_SYSCALL(pthread_cond_signal, "%p", pcond); // signal to waiting thread... } + +// THREAD POOL + +UThreadPool::UThreadPool(uint32_t size) : UThread(PTHREAD_CREATE_DETACHED), pool(size) +{ + U_TRACE_REGISTER_OBJECT(0, UThreadPool, "%u", size) + +#ifdef _MSWINDOWS_ + InitializeCriticalSection(&tasksMutex); // Task queue mutex + InitializeConditionVariable(&condition); // Condition variable +#else + tasksMutex = PTHREAD_MUTEX_INITIALIZER; // Task queue mutex + condition = PTHREAD_COND_INITIALIZER; // Condition variable +#endif +} + +UThreadPool::~UThreadPool() +{ + U_TRACE_UNREGISTER_OBJECT(0, UThread) +} + +#if defined(U_STDCPP_ENABLE) && defined(DEBUG) +const char* UThreadPool::dump(bool reset) const +{ + *UObjectIO::os << "active " << active << '\n' + << "pool (UVector " << (void*)&pool << ")\n" + << "queue (UVector " << (void*)&queue << ')'; + + if (reset) + { + UObjectIO::output(); + + return UObjectIO::buffer_output; + } + + return 0; +} +#endif #endif #if defined(U_STDCPP_ENABLE) && defined(DEBUG) const char* UThread::dump(bool reset) const { *UObjectIO::os << "tid " << tid << '\n' + << "sid " << sid << '\n' << "cancel " << cancel << '\n' - << "detachstate " << detachstate << '\n' + << "detachstate " << detachstate << '\n' << "next (UThread " << (void*)next << ")\n" << "first (UThread " << (void*)first << ')'; diff --git a/tests/debug/ok/memerror.ok b/tests/debug/ok/memerror.ok index bfb02b4e..a878bb2f 100644 --- a/tests/debug/ok/memerror.ok +++ b/tests/debug/ok/memerror.ok @@ -1,19 +1,21 @@ -test_memerror: WARNING: 30/05/15 16:03:57 (pid 19575) we are going to allocate 64 MB (pid 19575) - address space usage: 122.48 MBytes - rss usage: 7.78 MBytes -{Call main(1,0x7fffd71b9218) +[tid 0]<-- +[tid 32088]--> +test_memerror: WARNING: 28/08/15 15:35:12 (pid 32088) we are going to allocate 64 MB (pid 32088) - address space usage: 130.66 MBytes - rss usage: 8.08 MBytes +{Call main(1,0x7ffccf732938) test_memerror: ERROR ON MEMORY ------------------------------------- - pid: 19575 + pid: 32088 file: test_memerror.cpp line: 26 function: UInt::operator int() const - assertion: "((this)->memory.invariant())" [pobj = 0x24717a0 _this = (nil) - FMR] + assertion: "((this)->memory.invariant())" [pobj = 0x1762a50 _this = (nil) - FMR] ------------------------------------- test_memerror: ERROR ON MEMORY ------------------------------------- - pid: 19575 + pid: 32088 file: ../../include/ulib/debug/error_memory.h line: 28 function: UMemoryError::~UMemoryError() - assertion: "(invariant())" [pobj = 0x7fffd71b8900 _this = 0xa1b2c3d000000ff - ABW] + assertion: "(invariant())" [pobj = 0x7ffccf732020 _this = 0xa1b2c3d000000ff - ABW] ------------------------------------- -}Return main(1,0x7fffd71b9218) = 0 +}Return main(1,0x7ffccf732938) = 0 diff --git a/tests/debug/ok/objectIO.ok b/tests/debug/ok/objectIO.ok index f8221fbc..ecf17c07 100644 --- a/tests/debug/ok/objectIO.ok +++ b/tests/debug/ok/objectIO.ok @@ -1,3 +1,5 @@ +[tid 0]<-- +[tid 32397]--> {Call main(1) Prima="io_sono_la_classe_Prima" Seconda="io_sono_la_classe_Seconda" diff --git a/tests/debug/ok/simerr.ok b/tests/debug/ok/simerr.ok index 23007503..513cd933 100644 --- a/tests/debug/ok/simerr.ok +++ b/tests/debug/ok/simerr.ok @@ -1,16 +1,18 @@ -test_trace: WARNING: 30/05/15 16:03:58 (pid 19663) we are going to allocate 64 MB (pid 19663) - address space usage: 122.40 MBytes - rss usage: 7.77 MBytes +[tid 0]<-- +[tid 32744]--> +test_trace: WARNING: 28/08/15 15:37:35 (pid 32744) we are going to allocate 64 MB (pid 32744) - address space usage: 130.58 MBytes - rss usage: 8.21 MBytes {Call main(2) {Call routine1(2,3) }Return routine1(2,3) = 6 c = 6 - ::signal(4,0x401710) = (nil) + ::signal(4,0x401660) = (nil) {Call manage_sigpipe(4) ::open("/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp"...,2,438) = -1 - ENOENT (2, No such file or directory) ::open("tmp/prova",66,438) = 5 }Return manage_sigpipe(4) ::raise(4) = 0 result raise() = 0 - test_stat() = -1 + test_stat() = 0 test_stat() = -1 test_stat() = 0 test_stat() = 0 diff --git a/tests/debug/ok/trace.ok b/tests/debug/ok/trace.ok index efc89af7..043527eb 100644 --- a/tests/debug/ok/trace.ok +++ b/tests/debug/ok/trace.ok @@ -1,3 +1,5 @@ +[tid 0]<-- +[tid 31949]--> {Call routine1(2,3) }Return routine1(2,3) = 6 {Call manage_sigpipe(4) diff --git a/tests/ulib/test_thread.cpp b/tests/ulib/test_thread.cpp index 3ea2364a..4b309abf 100644 --- a/tests/ulib/test_thread.cpp +++ b/tests/ulib/test_thread.cpp @@ -2,8 +2,6 @@ #include -#include - #undef OK #define OK {printf("ok\n");} #undef ERROR @@ -141,7 +139,7 @@ public: ch->start(); - ch->sleep(100); + sleep(100); U_INTERNAL_DUMP("DELETING CHILD THREAD = %p", ch)