1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
This commit is contained in:
stefanocasazza 2015-08-28 18:44:48 +02:00
parent 61ad7266fc
commit 4fcfb40f89
16 changed files with 442 additions and 340 deletions

View File

@ -51,6 +51,7 @@ coverage_html:
@genhtml --ignore-errors source -o html coverage.info
install-data-local:
@cd $(top_builddir)/src/ulib/net/server/plugin/usp; $(MAKE) $(AM_MAKEFLAGS) usp_compile.sh
@${INSTALL} -m 777 $(top_builddir)/libtool $(DESTDIR)${bindir}/usp_libtool.sh; \
${INSTALL} -m 777 $(top_builddir)/src/ulib/net/server/plugin/usp/usp_compile.sh $(DESTDIR)${bindir}

View File

@ -957,6 +957,7 @@ coverage_html:
@genhtml --ignore-errors source -o html coverage.info
install-data-local:
@cd $(top_builddir)/src/ulib/net/server/plugin/usp; $(MAKE) $(AM_MAKEFLAGS) usp_compile.sh
@${INSTALL} -m 777 $(top_builddir)/libtool $(DESTDIR)${bindir}/usp_libtool.sh; \
${INSTALL} -m 777 $(top_builddir)/src/ulib/net/server/plugin/usp/usp_compile.sh $(DESTDIR)${bindir}

View File

@ -125,7 +125,6 @@ extern "C" {
typedef int (*iPF) (void);
typedef int (*iPFpv) (void*);
typedef unsigned (*uPFpv) (void*);
typedef int (*iPFpvpv) ( void*, void*);
typedef int (*qcompare)(const void*,const void*);
typedef bool (*bPFi) (int);

View File

@ -25,7 +25,6 @@
extern "C" {
#endif
extern U_EXPORT void* u_plock;
extern U_EXPORT int u_trace_fd;
extern U_EXPORT int u_trace_signal;
extern U_EXPORT int u_trace_suspend; /* on-off */

View File

@ -14,7 +14,7 @@
#ifndef ULIB_NOTIFIER_H
#define ULIB_NOTIFIER_H
#include <ulib/container/vector.h>
#include <ulib/thread.h>
#include <ulib/container/gen_hash_map.h>
/**
@ -40,7 +40,6 @@
#include <ulib/event/event_fd.h>
#include <ulib/event/event_time.h>
class UThread;
class USocket;
class USocketExt;
class UHttpPlugIn;
@ -130,9 +129,6 @@ protected:
#ifdef USE_LIBEVENT
// nothing
#elif defined(HAVE_EPOLL_WAIT)
# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
static void* pthread;
# endif
static int epollfd;
static struct epoll_event* events;
#else
@ -174,21 +170,20 @@ private:
static void handlerDelete(int fd, int mask);
#ifndef USE_LIBEVENT
static void notifyHandlerEvent()
{
U_TRACE(0, "UNotifier::notifyHandlerEvent()")
U_INTERNAL_ASSERT_POINTER(handler_event)
U_INTERNAL_DUMP("handler_event = %p bread = %b nfd_ready = %d fd = %d op_mask = %d %B",
handler_event, bread, nfd_ready, handler_event->fd, handler_event->op_mask, handler_event->op_mask)
if ((bread ? handler_event->handlerRead()
: handler_event->handlerWrite()) == U_NOTIFIER_DELETE)
{
handlerDelete(handler_event);
}
}
static void notifyHandlerEvent() U_NO_EXPORT;
# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
static UThread* pthread;
# ifdef _MSWINDOWS_
static CRITICAL_SECTION mutex;
# else
static pthread_mutex_t mutex;
# endif
static void lock() { if (pthread) UThread::lock(&mutex); }
static void unlock() { if (pthread) UThread::unlock(&mutex); }
# else
static void lock() {}
static void unlock() {}
# endif
#endif
#ifdef U_COMPILER_DELETE_MEMBERS
@ -205,5 +200,4 @@ private:
friend class UServer_Base;
friend class UClientImage_Base;
};
#endif

View File

@ -50,7 +50,7 @@ public:
{
U_TRACE_REGISTER_OBJECT(0, UProcess, "", 0)
_pid = (pid_t) -1;
_pid = (pid_t)-1;
status = 0;
running = false;
}

View File

@ -15,6 +15,7 @@
#define ULIB_THREAD_H
#include <ulib/timeval.h>
#include <ulib/container/vector.h>
#ifdef _MSWINDOWS_
# undef sleep
@ -26,6 +27,7 @@
#endif
class UNotifier;
class UServer_Base;
class U_EXPORT UThread {
public:
@ -38,44 +40,44 @@ public:
// COSTRUTTORI
UThread(int _detachstate);
virtual ~UThread()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)
if (tid) close();
}
UThread(int detachstate);
virtual ~UThread();
// SERVICES
#ifdef _MSWINDOWS_
static DWORD getTID();
#else
static pthread_t getTID();
#endif
// Inter Process Communication
static void lock(CRITICAL_SECTION* pmutex)
{
U_TRACE(0, "UThread::lock(%p)", pmutex)
EnterCriticalSection(pmutex);
}
static void unlock(CRITICAL_SECTION* pmutex)
{
U_TRACE(0, "UThread::unlock(%p)", pmutex)
LeaveCriticalSection(pmutex);
}
#else
static pid_t getTID();
static void lock(pthread_mutex_t* pmutex)
{
U_TRACE(1, "UThread::lock(%p)", pmutex)
# ifndef _MSWINDOWS_
(void) U_SYSCALL(pthread_mutex_lock, "%p", pmutex);
# endif
}
static void unlock(pthread_mutex_t* pmutex)
{
U_TRACE(1, "UThread::unlock(%p)", pmutex)
# ifndef _MSWINDOWS_
(void) U_SYSCALL(pthread_mutex_unlock, "%p", pmutex);
# endif
}
#ifndef _MSWINDOWS_
static bool initRwLock(pthread_rwlock_t* prwlock)
{
U_TRACE(1, "UThread::initRwLock(%p)", prwlock)
@ -92,6 +94,8 @@ public:
U_RETURN(true);
}
// Inter Process Communication
static bool initIPC(pthread_mutex_t* mutex, pthread_cond_t* cond);
static void doIPC(pthread_mutex_t* mutex, pthread_cond_t* cond, vPF function, bool wait);
#endif
@ -163,15 +167,22 @@ public:
* with SIGSTOP, but this behaviour has now been fixed to conform to the Posix standard (so it stops all threads in the process)
*/
#ifdef _MSWINDOWS_
void resume() {}
void suspend() {}
#else
void resume()
{
U_TRACE(0, "UThread::resume()")
# ifndef _MSWINDOWS_
resume(tid);
# ifndef HAVE_PTHREAD_SUSPEND
U_ASSERT_EQUALS(isCurrentThread(tid), false)
# ifdef HAVE_PTHREAD_SUSPEND
(void) U_SYSCALL(pthread_resume, "%p", tid);
# else
(void) U_SYSCALL(pthread_kill, "%p,%d", tid, U_SIGCONT);
yield(); // give the signal a time to kick in
# endif
# endif
}
@ -179,13 +190,17 @@ public:
{
U_TRACE(0, "UThread::suspend()")
# ifndef _MSWINDOWS_
suspend(tid);
# ifndef HAVE_PTHREAD_SUSPEND
U_ASSERT_EQUALS(isCurrentThread(tid), false)
# ifdef HAVE_PTHREAD_SUSPEND
(void) U_SYSCALL(pthread_suspend, "%p", tid);
# else
(void) U_SYSCALL(pthread_kill, "%p,%d", tid, U_SIGSTOP);
yield(); // give the signal a time to kick in
# endif
# endif
}
#endif
// Cancellation
@ -265,6 +280,8 @@ public:
protected:
UThread* next;
int detachstate, cancel;
pid_t sid;
#ifdef _MSWINDOWS_
DWORD tid;
HANDLE cancellation;
@ -272,31 +289,35 @@ protected:
pthread_t tid;
pthread_attr_t attr;
int suspendCount;
static pthread_mutex_t mlock;
#endif
static UThread* first;
void close();
static void threadStart(UThread* th)
{
U_TRACE(0, "UThread::threadStart(%p)", th)
U_INTERNAL_ASSERT_POINTER(th)
U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
th->setCancel(cancelDeferred);
th->run();
U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
if (th->tid) th->close();
}
#ifdef _MSWINDOWS_
static void lock() {}
static void unlock() {}
static void resume(pthread_t _tid) {}
static void suspend(pthread_t _tid) {}
static unsigned __stdcall execHandler(void* th);
#else
void sigInstall(int signo);
void manageSignal(int signo);
static void lock() { lock(&mlock); }
static void unlock() { unlock(&mlock); }
static void execHandler(UThread* th);
static void sigHandler(int signo)
{
U_TRACE(0, "UThread::sigHandler(%d)", signo)
@ -306,73 +327,41 @@ protected:
if (th) th->manageSignal(signo);
}
static bool isDetached(pthread_attr_t* pattr)
static void execHandler(UThread* th);
static void threadCleanup(UThread* th)
{
U_TRACE(1, "UThread::isDetached(%p)", pattr)
U_TRACE(0, "UThread::threadCleanup(%p)", th)
int state;
U_INTERNAL_ASSERT_POINTER(th)
(void) U_SYSCALL(pthread_attr_getdetachstate, "%p,%p", pattr, &state);
U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
if (state == PTHREAD_CREATE_DETACHED) U_RETURN(true);
U_RETURN(false);
if (th->tid) th->close();
}
static void stop(pthread_t _tid, pthread_attr_t* pattr)
{
U_TRACE(1, "UThread::stop(%p,%p)", _tid, pattr)
int state;
# ifdef HAVE_PTHREAD_CANCEL
(void) U_SYSCALL(pthread_cancel, "%p", _tid);
# endif
if (isDetached(pattr) == false) (void) U_SYSCALL(pthread_join, "%p,%p", _tid, 0);
(void) U_SYSCALL(pthread_attr_getdetachstate, "%p,%p", pattr, &state);
if (state != PTHREAD_CREATE_DETACHED) (void) U_SYSCALL(pthread_join, "%p,%p", _tid, 0);
# ifdef HAVE_PTHREAD_YIELD
else (void) U_SYSCALL_NO_PARAM(pthread_yield);
# endif
}
static void suspend(pthread_t _tid)
{
U_TRACE(1, "UThread::suspend(%p)", _tid)
U_ASSERT_EQUALS(isCurrentThread(_tid), false)
# ifdef HAVE_PTHREAD_SUSPEND
(void) U_SYSCALL(pthread_suspend, "%p", _tid);
# else
(void) U_SYSCALL(pthread_kill, "%p,%d", _tid, U_SIGSTOP);
# endif
}
static void resume(pthread_t _tid)
{
U_TRACE(1, "UThread::resume(%p)", _tid)
U_ASSERT_EQUALS(isCurrentThread(_tid), false)
# ifdef HAVE_PTHREAD_SUSPEND
(void) U_SYSCALL(pthread_resume, "%p", _tid);
# else
(void) U_SYSCALL(pthread_kill, "%p,%d", _tid, U_SIGCONT);
# endif
}
static void threadCleanup(void* th)
{
U_TRACE(0, "UThread::threadCleanup(%p)", th)
U_INTERNAL_ASSERT_POINTER(th)
U_INTERNAL_DUMP("th->tid = %p", ((UThread*)th)->tid)
if (((UThread*)th)->tid) ((UThread*)th)->close();
}
#endif
private:
friend class UNotifier;
friend class UServer_Base;
#ifdef U_COMPILER_DELETE_MEMBERS
UThread(const UThread&) = delete;
@ -383,7 +372,7 @@ private:
#endif
};
class U_EXPORT UThreadPool {
class U_EXPORT UThreadPool : public UThread {
public:
// Check for memory error
U_MEMORY_TEST
@ -391,6 +380,56 @@ public:
// Allocator e Deallocator
U_MEMORY_ALLOCATOR
U_MEMORY_DEALLOCATOR
};
// COSTRUTTORI
UThreadPool(uint32_t size);
~UThreadPool();
// define method VIRTUAL of class UThread
virtual void run() U_DECL_FINAL
{
U_TRACE(0, "UThreadPool::run()")
/*
for (task in queue)
{
if (task == STOP_WORKING) break;
do work;
}
*/
}
// SERVICES
// DEBUG
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* dump(bool reset) const;
#endif
protected:
UVector<UThread*> pool; // Thread pool storage
UVector<UThread*> queue; // Queue to keep track of incoming tasks
bool active;
#ifdef _MSWINDOWS_
CRITICAL_SECTION tasksMutex; // Task queue mutex
CONDITION_VARIABLE condition; // Condition variable
#else
pthread_mutex_t tasksMutex; // Task queue mutex
pthread_cond_t condition; // Condition variable
#endif
private:
#ifdef U_COMPILER_DELETE_MEMBERS
UThreadPool(const UThreadPool&) = delete;
UThreadPool& operator=(const UThreadPool&) = delete;
#else
UThreadPool(const UThreadPool&) : UThread(PTHREAD_CREATE_DETACHED) {}
UThreadPool& operator=(const UThreadPool&) { return *this; }
#endif
};
#endif

View File

@ -25,14 +25,21 @@
int u_trace_fd = -1;
int u_trace_signal;
int u_trace_suspend;
void* u_plock;
void* u_trace_mask_level;
char u_trace_tab[256]; /* 256 max indent */
uint32_t u_trace_num_tab;
static int level_active;
static uint32_t file_size;
#ifdef ENABLE_THREAD
# ifdef _MSWINDOWS_
static DWORD old_tid;
static CRITICAL_SECTION mutex;
# else
static pthread_t old_tid;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
# endif
#endif
static void printInfo(void)
{
@ -77,20 +84,25 @@ void u_trace_lock(void)
{
U_INTERNAL_TRACE("u_trace_lock()")
#if defined(HAVE_SYS_SYSCALL_H) && defined(ENABLE_THREAD)
if (u_plock)
{
static pid_t old_tid;
#ifdef ENABLE_THREAD
# ifdef _MSWINDOWS_
if (old_tid == 0) InitializeCriticalSection(&mutex);
pid_t tid = syscall(SYS_gettid);
DWORD tid = GetCurrentThreadId();
(void) pthread_mutex_lock((pthread_mutex_t*)u_plock);
EnterCriticalSection(&mutex);
# else
pthread_t tid;
# ifdef HAVE_SYS_SYSCALL_H
tid = syscall(SYS_gettid);
# endif
(void) pthread_mutex_lock(&mutex);
# endif
if (old_tid != tid)
{
char tid_buffer[32];
int sz = snprintf(tid_buffer, sizeof(tid_buffer), "[tid %d]<--\n[tid %d]-->\n", old_tid, tid);
int sz = snprintf(tid_buffer, sizeof(tid_buffer), "[tid %ld]<--\n[tid %ld]-->\n", old_tid, tid);
old_tid = tid;
@ -104,7 +116,6 @@ void u_trace_lock(void)
file_ptr += sz;
}
}
}
#endif
}
@ -112,8 +123,12 @@ void u_trace_unlock(void)
{
U_INTERNAL_TRACE("u_trace_unlock()")
#if defined(HAVE_SYS_SYSCALL_H) && defined(ENABLE_THREAD)
if (u_plock) (void) pthread_mutex_unlock((pthread_mutex_t*)u_plock);
#ifdef ENABLE_THREAD
# ifdef _MSWINDOWS_
LeaveCriticalSection(&mutex);
# else
(void) pthread_mutex_unlock(&mutex);
# endif
#endif
}

View File

@ -166,9 +166,23 @@ UString* UServer_Base::allow_IP_prv;
UVector<UIPAllow*>* UServer_Base::vallow_IP_prv;
#endif
#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_)
# include <ulib/thread.h>
#ifdef ENABLE_THREAD
#include <ulib/thread.h>
class UClientThread U_DECL_FINAL : public UThread {
public:
UClientThread() : UThread(PTHREAD_CREATE_DETACHED) {}
virtual void run()
{
U_TRACE(0, "UClientThread::run()")
while (UServer_Base::flag_loop) UNotifier::waitForEvent(UServer_Base::ptime);
}
};
# ifndef _MSWINDOWS_
class UTimeThread U_DECL_FINAL : public UThread {
public:
@ -231,22 +245,9 @@ public:
}
};
class UClientThread U_DECL_FINAL : public UThread {
public:
UClientThread() : UThread(PTHREAD_CREATE_DETACHED) {}
virtual void run()
{
U_TRACE(0, "UClientThread::run()")
while (UServer_Base::flag_loop) UNotifier::waitForEvent(UServer_Base::ptime);
}
};
#if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
#include <ulib/net/tcpsocket.h>
#include <ulib/net/client/client.h>
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
# include <ulib/net/tcpsocket.h>
# include <ulib/net/client/client.h>
class UOCSPStapling U_DECL_FINAL : public UThread {
public:
@ -285,6 +286,7 @@ ULock* UServer_Base::lock_ocsp_staple;
UOCSPStapling* UServer_Base::pthread_ocsp;
#endif
#endif
#endif
#ifndef _MSWINDOWS_
static int sysctl_somaxconn, tcp_abort_on_overflow, sysctl_max_syn_backlog, tcp_fin_timeout;
@ -352,12 +354,21 @@ UServer_Base::~UServer_Base()
U_INTERNAL_ASSERT_POINTER(socket)
U_INTERNAL_ASSERT_POINTER(vplugin)
#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_)
#ifdef ENABLE_THREAD
# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1)
{
U_INTERNAL_ASSERT_POINTER(UNotifier::pthread)
delete UNotifier::pthread;
}
# endif
# ifndef _MSWINDOWS_
if (u_pthread_time)
{
((UTimeThread*)u_pthread_time)->suspend();
delete (UTimeThread*)u_pthread_time; // delete to join
delete (UTimeThread*)u_pthread_time;
(void) U_SYSCALL(pthread_rwlock_destroy, "%p", ULog::prwlock);
}
@ -365,18 +376,14 @@ UServer_Base::~UServer_Base()
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if (bssl)
{
if (pthread_ocsp)
{
pthread_ocsp->suspend();
delete pthread_ocsp; // delete to join
}
if (pthread_ocsp) delete pthread_ocsp;
USSLSocket::cleanupStapling();
if (UServer_Base::lock_ocsp_staple) delete UServer_Base::lock_ocsp_staple;
}
# endif
# endif
#endif
UClientImage_Base::clear();
@ -1836,7 +1843,7 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo)
#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_)
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if (pthread_ocsp) pthread_ocsp->suspend();
if ( pthread_ocsp) pthread_ocsp->suspend();
# endif
if (u_pthread_time) ((UTimeThread*)u_pthread_time)->suspend();
#endif
@ -1857,7 +1864,7 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo)
#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_)
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if (pthread_ocsp) pthread_ocsp->resume();
if ( pthread_ocsp) pthread_ocsp->resume();
# endif
if (u_pthread_time) ((UTimeThread*)u_pthread_time)->resume();
#endif
@ -1932,10 +1939,10 @@ RETSIGTYPE UServer_Base::handlerForSigTERM(int signo)
if (preforked_num_kids)
{
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1) ((UThread*)UNotifier::pthread)->suspend();
# if defined(HAVE_SYS_SYSCALL_H) && defined(DEBUG)
if (u_plock) (void) pthread_mutex_unlock((pthread_mutex_t*)u_plock);
# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1) UNotifier::pthread->suspend();
# ifdef DEBUG
u_trace_unlock();
# endif
# endif
@ -2595,14 +2602,20 @@ void UServer_Base::runLoop(const char* user)
if (isLog()) ULog::log("Waiting for connection on port %u", port);
#endif
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1)
{
U_INTERNAL_ASSERT_EQUALS(UNotifier::pthread, 0)
UNotifier::pthread = U_NEW(UClientThread);
((UThread*)UNotifier::pthread)->start(50);
# ifdef _MSWINDOWS_
InitializeCriticalSection(&UNotifier::mutex);
# endif
proc->_pid = ((UThread*)UNotifier::pthread)->getTID();
UNotifier::pthread->start(50);
proc->_pid = UNotifier::pthread->sid;
U_ASSERT(proc->parent())
}

View File

@ -17,10 +17,6 @@
#include <ulib/utility/interrupt.h>
#include <ulib/net/server/server_plugin.h>
#if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# include "ulib/thread.h"
#endif
#ifdef HAVE_POLL_H
# include <poll.h>
struct pollfd UNotifier::fds[1];
@ -70,9 +66,6 @@ void UEventFd::operator()(int _fd, short event)
if (ret == U_NOTIFIER_DELETE) UNotifier::erase(this);
}
#elif defined(HAVE_EPOLL_WAIT)
# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
void* UNotifier::pthread;
# endif
int UNotifier::epollfd;
struct epoll_event* UNotifier::events;
struct epoll_event* UNotifier::pevents;
@ -98,6 +91,32 @@ UEventFd** UNotifier::lo_map_fd;
UGenericHashMap<int,UEventFd*>* UNotifier::hi_map_fd; // maps a fd to a node pointer
#ifndef USE_LIBEVENT
U_NO_EXPORT void UNotifier::notifyHandlerEvent()
{
U_TRACE(0, "UNotifier::notifyHandlerEvent()")
U_INTERNAL_ASSERT_POINTER(handler_event)
U_INTERNAL_DUMP("handler_event = %p bread = %b nfd_ready = %d fd = %d op_mask = %d %B",
handler_event, bread, nfd_ready, handler_event->fd, handler_event->op_mask, handler_event->op_mask)
if ((bread ? handler_event->handlerRead()
: handler_event->handlerWrite()) == U_NOTIFIER_DELETE)
{
handlerDelete(handler_event);
}
}
# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
UThread* UNotifier::pthread;
# ifdef _MSWINDOWS_
CRITICAL_SECTION UNotifier::mutex;
# else
pthread_mutex_t UNotifier::mutex = PTHREAD_MUTEX_INITIALIZER;
# endif
# endif
#endif
void UNotifier::init(bool bacquisition)
{
U_TRACE(0, "UNotifier::init(%b)", bacquisition)
@ -222,15 +241,15 @@ bool UNotifier::isHandler(int fd)
U_RETURN(false);
}
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->lock();
# endif
bool result;
if (hi_map_fd->find(fd)) U_RETURN(true);
lock();
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->unlock();
# endif
result = hi_map_fd->find(fd);
unlock();
if (result) U_RETURN(true);
}
U_RETURN(false);
@ -243,15 +262,11 @@ void UNotifier::resetHandler(int fd)
if (fd < (int32_t)max_connection) lo_map_fd[fd] = 0;
else
{
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->lock();
# endif
lock();
(void) hi_map_fd->erase(fd);
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->unlock();
# endif
unlock();
}
}
@ -275,11 +290,15 @@ bool UNotifier::setHandler(int fd)
}
}
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->lock();
#endif
bool result;
if (hi_map_fd->find(fd))
lock();
result = hi_map_fd->find(fd);
unlock();
if (result)
{
handler_event = hi_map_fd->elem();
@ -288,10 +307,6 @@ bool UNotifier::setHandler(int fd)
U_RETURN(true);
}
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->unlock();
#endif
U_RETURN(false);
}
@ -310,15 +325,11 @@ void UNotifier::insert(UEventFd* item)
if (fd < (int32_t)max_connection) lo_map_fd[fd] = item;
else
{
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->lock();
# endif
lock();
hi_map_fd->insert(fd, item);
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread) ((UThread*)pthread)->unlock();
# endif
unlock();
}
#ifdef USE_LIBEVENT
@ -707,7 +718,9 @@ loop: U_INTERNAL_ASSERT_POINTER(pevents->data.ptr)
: handler_event->handlerWrite()) == U_NOTIFIER_DELETE)
{
handlerDelete(handler_event);
# ifndef U_COVERITY_FALSE_POSITIVE // Improper use of negative value (NEGATIVE_RETURNS)
handler_event->fd = -1;
# endif
# ifdef U_EPOLLET_POSTPONE_STRATEGY
if (bepollet) pevents->events = 0;
@ -763,7 +776,9 @@ loop2: if (pevents->events)
if (handler_event->handlerRead() == U_NOTIFIER_DELETE)
{
handlerDelete(handler_event);
# ifndef U_COVERITY_FALSE_POSITIVE // Improper use of negative value (NEGATIVE_RETURNS)
handler_event->fd = -1;
# endif
pevents->events = 0;
}
@ -890,23 +905,15 @@ void UNotifier::clear()
}
}
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread == 0)
#endif
{
UMemoryPool::_free(lo_map_fd, max_connection, sizeof(UEventFd*));
hi_map_fd->deallocate();
delete hi_map_fd;
}
#if defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT)
U_INTERNAL_ASSERT_POINTER(events)
# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (pthread == 0)
# endif
UMemoryPool::_free(events, max_connection + 1, sizeof(struct epoll_event));
(void) U_SYSCALL(close, "%d", epollfd);

View File

@ -34,10 +34,6 @@ extern "C" { int nanosleep (const struct timespec* requested_time,
UThread* UThread::first;
#ifndef _MSWINDOWS_
pthread_mutex_t UThread::mlock = PTHREAD_MUTEX_INITIALIZER;
#endif
UThread::UThread(int _detachstate)
{
U_TRACE_REGISTER_OBJECT(0, UThread, "%d", _detachstate)
@ -61,6 +57,20 @@ UThread::UThread(int _detachstate)
#endif
}
UThread::~UThread()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)
if (tid)
{
# ifndef _MSWINDOWS_
if (isDetached()) suspend();
# endif
close();
}
}
void UThread::close()
{
U_TRACE(0, "UThread::close()")
@ -138,26 +148,6 @@ void UThread::close()
#endif
}
#ifdef _MSWINDOWS_
DWORD UThread::getTID()
{
U_TRACE(0, "UThread::getTID()")
DWORD _tid = GetCurrentThreadId();
U_RETURN(_tid);
}
#else
pthread_t UThread::getTID()
{
U_TRACE(0, "UThread::getTID()")
pthread_t _tid = syscall(SYS_gettid);
U_RETURN(_tid);
}
#endif
void UThread::yield()
{
U_TRACE(1, "UThread::yield()")
@ -201,7 +191,38 @@ void UThread::yield()
#endif
}
#ifndef _MSWINDOWS_
#ifdef _MSWINDOWS_
DWORD UThread::getTID()
{
U_TRACE(0, "UThread::getTID()")
DWORD tid = GetCurrentThreadId();
U_RETURN(tid);
}
unsigned __stdcall UThread::execHandler(void* th)
{
U_TRACE(0, "UThread::::execHandler(%p)", th)
threadStart((UThread*)th);
U_RETURN(0);
}
#else
pid_t UThread::getTID()
{
U_TRACE(0, "UThread::getTID()")
pid_t tid;
#ifdef HAVE_SYS_SYSCALL_H
tid = syscall(SYS_gettid);
#endif
U_RETURN(tid);
}
void UThread::sigInstall(int signo)
{
U_TRACE(1, "UThread::sigInstall(%d)", signo)
@ -232,16 +253,17 @@ void UThread::manageSignal(int signo)
{
U_TRACE(1, "UThread::manageSignal(%d)", signo)
static pthread_cond_t mcond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;
// Mutexes are used to ensure the exclusive access, where as condition variables are used to synchronize threads based on the events.
// We need Mutexes to ensure that condition variables dont end up in an infinite wait. One thing to remember is Mutex operation of lock
// and unlock are guaranteed to be atomic, but the condition variables need not be. i.e The thread can get scheduled out while the condition variable wait is half way
static pthread_cond_t mcond = PTHREAD_COND_INITIALIZER;
lock(&mlock);
U_INTERNAL_DUMP("suspendCount = %d", suspendCount)
(void) U_SYSCALL(pthread_mutex_lock, "%p", &mlock);
if (signo == U_SIGSTOP &&
suspendCount++ == 0)
{
@ -264,36 +286,20 @@ void UThread::manageSignal(int signo)
(void) U_SYSCALL(pthread_cond_signal, "%p", &mcond);
}
(void) U_SYSCALL(pthread_mutex_unlock, "%p", &mlock);
unlock(&mlock);
}
#endif
#ifdef _MSWINDOWS_
unsigned __stdcall UThread::execHandler(void* th)
{
U_TRACE(0, "UThread::::execHandler(%p)", th)
U_INTERNAL_DUMP("th->tid = %p", ((UThread*)th)->tid)
((UThread*)th)->setCancel(cancelDeferred);
((UThread*)th)->run();
((UThread*)th)->close();
U_RETURN(0);
}
#else
void UThread::execHandler(UThread* th)
{
U_TRACE(1, "UThread::execHandler(%p)", th)
#ifdef HAVE_SYS_SYSCALL_H
th->sid = syscall(SYS_gettid);
#endif
U_INTERNAL_ASSERT_EQUALS(pthread_self(), th->tid)
sigset_t mask;
th->tid = (pthread_t) U_SYSCALL_NO_PARAM(pthread_self);
U_INTERNAL_DUMP("th->tid = %p", th->tid)
#ifdef sigemptyset
sigemptyset(&mask);
#else
@ -337,17 +343,7 @@ void UThread::execHandler(UThread* th)
th->sigInstall(U_SIGCONT);
#endif
pthread_cleanup_push(threadCleanup, th);
th->setCancel(cancelDeferred);
th->run();
pthread_cleanup_pop(0);
U_INTERNAL_DUMP("th->tid = %p", th->tid)
if (th->tid) th->close();
threadStart(th);
}
#endif
@ -357,22 +353,13 @@ bool UThread::start(uint32_t timeoutMS)
U_INTERNAL_ASSERT_EQUALS(tid, 0)
#if defined(DEBUG) && !defined(_MSWINDOWS_)
if (u_plock == 0)
{
static pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;
u_plock = &plock;
}
#endif
next = first;
first = this;
U_INTERNAL_DUMP("first = %p next = %p", first, next)
#ifdef _MSWINDOWS_
(void) _beginthreadex(NULL, 0, execHandler, this, CREATE_SUSPENDED, (unsigned*)&tid);
(void) _beginthreadex(NULL, 0, execHandler, this, CREATE_SUSPENDED, (unsigned*)&sid);
if (tid == 0)
{
@ -416,8 +403,8 @@ void UThread::setCancel(int mode)
switch (mode)
{
case cancelImmediate:
case cancelDeferred:
case cancelImmediate:
{
# ifdef HAVE_PTHREAD_CANCEL
(void) U_SYSCALL(pthread_setcancelstate, "%d,%p", PTHREAD_CANCEL_ENABLE, &old);
@ -483,6 +470,8 @@ void UThread::sleep(time_t timeoutMS)
{
U_TRACE(1, "UThread::sleep(%ld)", timeoutMS)
U_ASSERT(isCurrentThread(tid))
struct timespec ts = { timeoutMS / 1000L, (timeoutMS % 1000L) * 1000000L };
U_INTERNAL_ASSERT(ts.tv_sec >= 0L)
@ -560,26 +549,65 @@ bool UThread::initIPC(pthread_mutex_t* pmutex, pthread_cond_t* pcond)
U_RETURN(true);
}
void UThread::doIPC(pthread_mutex_t* pmutex, pthread_cond_t* pcond, vPF function, bool wait)
void UThread::doIPC(pthread_mutex_t* plock, pthread_cond_t* pcond, vPF function, bool wait)
{
U_TRACE(0, "UThread::doIPC(%p,%p,%p,%b)", pmutex, pcond, function, wait)
U_TRACE(0, "UThread::doIPC(%p,%p,%p,%b)", plock, pcond, function, wait)
lock(pmutex);
lock(plock);
if (wait) (void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, pmutex); // block until we are signalled from other...
if (wait) (void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, plock); // block until we are signalled from other...
function(); // ...than call function
unlock(pmutex);
unlock(plock);
if (wait == false) (void) U_SYSCALL(pthread_cond_signal, "%p", pcond); // signal to waiting thread...
}
// THREAD POOL
UThreadPool::UThreadPool(uint32_t size) : UThread(PTHREAD_CREATE_DETACHED), pool(size)
{
U_TRACE_REGISTER_OBJECT(0, UThreadPool, "%u", size)
#ifdef _MSWINDOWS_
InitializeCriticalSection(&tasksMutex); // Task queue mutex
InitializeConditionVariable(&condition); // Condition variable
#else
tasksMutex = PTHREAD_MUTEX_INITIALIZER; // Task queue mutex
condition = PTHREAD_COND_INITIALIZER; // Condition variable
#endif
}
UThreadPool::~UThreadPool()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)
}
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UThreadPool::dump(bool reset) const
{
*UObjectIO::os << "active " << active << '\n'
<< "pool (UVector " << (void*)&pool << ")\n"
<< "queue (UVector " << (void*)&queue << ')';
if (reset)
{
UObjectIO::output();
return UObjectIO::buffer_output;
}
return 0;
}
#endif
#endif
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UThread::dump(bool reset) const
{
*UObjectIO::os << "tid " << tid << '\n'
<< "sid " << sid << '\n'
<< "cancel " << cancel << '\n'
<< "detachstate " << detachstate << '\n'
<< "next (UThread " << (void*)next << ")\n"

View File

@ -1,19 +1,21 @@
test_memerror: WARNING: 30/05/15 16:03:57 (pid 19575) we are going to allocate 64 MB (pid 19575) - address space usage: 122.48 MBytes - rss usage: 7.78 MBytes
{Call main(1,0x7fffd71b9218)
[tid 0]<--
[tid 32088]-->
test_memerror: WARNING: 28/08/15 15:35:12 (pid 32088) we are going to allocate 64 MB (pid 32088) - address space usage: 130.66 MBytes - rss usage: 8.08 MBytes
{Call main(1,0x7ffccf732938)
test_memerror: ERROR ON MEMORY
-------------------------------------
pid: 19575
pid: 32088
file: test_memerror.cpp
line: 26
function: UInt::operator int() const
assertion: "((this)->memory.invariant())" [pobj = 0x24717a0 _this = (nil) - FMR]
assertion: "((this)->memory.invariant())" [pobj = 0x1762a50 _this = (nil) - FMR]
-------------------------------------
test_memerror: ERROR ON MEMORY
-------------------------------------
pid: 19575
pid: 32088
file: ../../include/ulib/debug/error_memory.h
line: 28
function: UMemoryError::~UMemoryError()
assertion: "(invariant())" [pobj = 0x7fffd71b8900 _this = 0xa1b2c3d000000ff - ABW]
assertion: "(invariant())" [pobj = 0x7ffccf732020 _this = 0xa1b2c3d000000ff - ABW]
-------------------------------------
}Return main(1,0x7fffd71b9218) = 0
}Return main(1,0x7ffccf732938) = 0

View File

@ -1,3 +1,5 @@
[tid 0]<--
[tid 32397]-->
{Call main(1)
Prima="io_sono_la_classe_Prima"
Seconda="io_sono_la_classe_Seconda"

View File

@ -1,16 +1,18 @@
test_trace: WARNING: 30/05/15 16:03:58 (pid 19663) we are going to allocate 64 MB (pid 19663) - address space usage: 122.40 MBytes - rss usage: 7.77 MBytes
[tid 0]<--
[tid 32744]-->
test_trace: WARNING: 28/08/15 15:37:35 (pid 32744) we are going to allocate 64 MB (pid 32744) - address space usage: 130.58 MBytes - rss usage: 8.21 MBytes
{Call main(2)
{Call routine1(2,3)
}Return routine1(2,3) = 6
c = 6
::signal(4,0x401710) = (nil)
::signal(4,0x401660) = (nil)
{Call manage_sigpipe(4)
::open("/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp/tmp"...,2,438) = -1 - ENOENT (2, No such file or directory)
::open("tmp/prova",66,438) = 5
}Return manage_sigpipe(4)
::raise(4) = 0
result raise() = 0
test_stat() = -1
test_stat() = 0
test_stat() = -1
test_stat() = 0
test_stat() = 0

View File

@ -1,3 +1,5 @@
[tid 0]<--
[tid 31949]-->
{Call routine1(2,3)
}Return routine1(2,3) = 6
{Call manage_sigpipe(4)

View File

@ -2,8 +2,6 @@
#include <ulib/thread.h>
#include <iostream>
#undef OK
#define OK {printf("ok\n");}
#undef ERROR
@ -141,7 +139,7 @@ public:
ch->start();
ch->sleep(100);
sleep(100);
U_INTERNAL_DUMP("DELETING CHILD THREAD = %p", ch)