1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00

thread fix

This commit is contained in:
stefanocasazza 2015-09-03 14:39:31 +02:00
parent 4fcfb40f89
commit c97f456323
16 changed files with 523 additions and 268 deletions

View File

@ -382,7 +382,6 @@ U_EXPORT int inet_aton(const char *cp, struct in_addr *addr);
U_EXPORT int setrlimit(int resource, const struct rlimit* rlim);
U_EXPORT int socketpair(int d, int type, int protocol, int sv[2]);
U_EXPORT ssize_t writev(int fd, const struct iovec* vector, int count);
U_EXPORT const char* inet_ntop(int af, const void* src, char* dst, size_t size);
U_EXPORT int sigprocmask(int how, const sigset_t* set, sigset_t* oldset);
U_EXPORT void* mmap(void* start, size_t length, int prot, int flags, int fd, off_t offset);
U_EXPORT int sigaction(int signum, const struct sigaction* act, struct sigaction* oldact);
@ -392,6 +391,7 @@ U_EXPORT int setitimer(int which, const struct itimerval* value, stru
*
* U_EXPORT int gettimeofday(struct timeval* tv, void* tz);
* U_EXPORT int truncate(const char* fname, off_t distance);
* U_EXPORT const char* inet_ntop(int af, const void* src, char* dst, size_t size);
*/
U_EXPORT int raise_w32(int nsig);

View File

@ -25,6 +25,7 @@
*/
class UHTTP;
class UThreadPool;
class UHttpPlugIn;
class UFileConfig;
class UNoCatPlugIn;
@ -392,6 +393,8 @@ private:
UVector<void*>& operator=(const UVector<void*>&) { return *this; }
#endif
friend class UThreadPool;
template <class T> friend class UOrmTypeHandler;
template <class T> friend class UJsonTypeHandler;
};
@ -904,6 +907,8 @@ public:
#endif
private:
friend class UThreadPool;
#ifdef U_COMPILER_DELETE_MEMBERS
UVector<T*>& operator=(const UVector<T*>&) = delete;
#else

View File

@ -210,7 +210,7 @@
# endif
/* Require for compiling with critical sections */
# ifndef _WIN32_WINNT
# define _WIN32_WINNT 0x0501
# define _WIN32_WINNT 0x0600
# endif
/* Make sure we're consistent with _WIN32_WINNT */
# ifndef WINVER

View File

@ -18,15 +18,20 @@
#include <ulib/container/vector.h>
#ifdef _MSWINDOWS_
# include <synchapi.h>
# undef sleep
# undef signal
# define PTHREAD_CREATE_DETACHED 1
#else
# ifdef HAVE_SYS_SYSCALL_H
# include <sys/syscall.h>
# endif
# define U_SIGSTOP (SIGRTMIN+5)
# define U_SIGCONT (SIGRTMIN+6)
#endif
class UNotifier;
class UThreadPool;
class UServer_Base;
class U_EXPORT UThread {
@ -40,8 +45,35 @@ public:
// COSTRUTTORI
UThread(int detachstate);
virtual ~UThread();
UThread(int _detachstate)
{
U_TRACE_REGISTER_OBJECT(0, UThread, "%d", _detachstate)
next = 0;
detachstate = _detachstate;
cancel = 0;
sid = 0;
tid = 0;
# ifdef _MSWINDOWS_
cancellation = 0;
# else
suspendCount = 0;
# endif
}
virtual ~UThread()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)
if (tid)
{
# ifndef _MSWINDOWS_
if (isDetached()) suspend();
# endif
close();
}
}
// SERVICES
@ -61,6 +93,27 @@ public:
LeaveCriticalSection(pmutex);
}
static void wait(CRITICAL_SECTION* pmutex, CONDITION_VARIABLE* pcond)
{
U_TRACE(0, "UThread::wait(%p,%p)", pmutex, pcond)
SleepConditionVariableCS(pcond, pmutex, INFINITE); // block until we are signalled from other...
}
static void signal(CONDITION_VARIABLE* pcond)
{
U_TRACE(0, "UThread::signal(%p)", pcond)
WakeConditionVariable(pcond); // signal to waiting thread...
}
static void signalAll(CONDITION_VARIABLE* pcond)
{
U_TRACE(0, "UThread::signalAll(%p)", pcond)
WakeAllConditionVariable(pcond); // signal to waiting thread...
}
#else
static pid_t getTID();
@ -78,6 +131,27 @@ public:
(void) U_SYSCALL(pthread_mutex_unlock, "%p", pmutex);
}
static void wait(pthread_mutex_t* pmutex, pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::wait(%p,%p)", pmutex, pcond)
(void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, pmutex); // block until we are signalled from other...
}
static void signal(pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::signal(%p)", pcond)
(void) U_SYSCALL(pthread_cond_signal, "%p", pcond); // signal to waiting thread...
}
static void signalAll(pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::signalAll(%p)", pcond)
(void) U_SYSCALL(pthread_cond_broadcast, "%p", pcond); // signal to waiting thread...
}
static bool initRwLock(pthread_rwlock_t* prwlock)
{
U_TRACE(1, "UThread::initRwLock(%p)", prwlock)
@ -151,7 +225,7 @@ public:
return;
}
UTimeVal(timeoutMS / 1000L, (timeoutMS % 1000L) * 1000L).nanosleep();
UTimeVal::nanosleep(timeoutMS);
}
/**
@ -281,13 +355,11 @@ protected:
UThread* next;
int detachstate, cancel;
pid_t sid;
#ifdef _MSWINDOWS_
DWORD tid;
HANDLE cancellation;
#else
pthread_t tid;
pthread_attr_t attr;
int suspendCount;
#endif
@ -295,26 +367,35 @@ protected:
void close();
static void threadStart(UThread* th)
void threadStart()
{
U_TRACE(0, "UThread::threadStart(%p)", th)
U_TRACE(0, "UThread::threadStart()")
U_INTERNAL_ASSERT_POINTER(th)
U_INTERNAL_DUMP("tid = %p sid = %u", tid, sid)
U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
setCancel(cancelDeferred);
th->setCancel(cancelDeferred);
run();
th->run();
U_INTERNAL_DUMP("tid = %p sid = %u", tid, sid)
U_INTERNAL_DUMP("th->tid = %p th->sid = %u", th->tid, th->sid)
if (th->tid) th->close();
if (tid) close();
}
#ifdef _MSWINDOWS_
static unsigned __stdcall execHandler(void* th);
static unsigned __stdcall execHandler(void* th)
{
U_TRACE(0, "UThread::::execHandler(%p)", th)
U_INTERNAL_ASSERT_POINTER(th)
// U_INTERNAL_ASSERT_EQUALS(GetCurrentThreadId(), th->tid)
((UThread*)th)->threadStart();
U_RETURN(0);
}
#else
void maskSignal();
void sigInstall(int signo);
void manageSignal(int signo);
@ -327,7 +408,22 @@ protected:
if (th) th->manageSignal(signo);
}
static void execHandler(UThread* th);
static void execHandler(UThread* th)
{
U_TRACE(0, "UThread::execHandler(%p)", th)
U_INTERNAL_ASSERT_POINTER(th)
# ifdef HAVE_SYS_SYSCALL_H
th->sid = syscall(SYS_gettid);
# endif
// U_INTERNAL_ASSERT_EQUALS(pthread_self(), th->tid)
th->maskSignal();
th->threadStart();
}
static void threadCleanup(UThread* th)
{
@ -361,17 +457,19 @@ protected:
private:
friend class UNotifier;
friend class UThreadPool;
friend class UServer_Base;
#ifdef U_COMPILER_DELETE_MEMBERS
UThread(const UThread&) = delete;
UThread& operator=(const UThread&) = delete;
#else
UThread(const UThread&) {}
UThread& operator=(const UThread&) { return *this; }
#endif
};
// UThreadPool class manages all the UThreadPool related activities. This includes keeping track of idle threads and snchronizations between all threads.
// Using UThreadPool is advantageous only when the work to be done is really time consuming. (at least 1 or 2 seconds)
class U_EXPORT UThreadPool : public UThread {
public:
// Check for memory error
@ -386,23 +484,40 @@ public:
UThreadPool(uint32_t size);
~UThreadPool();
// define method VIRTUAL of class UThread
// SERVICES
virtual void run() U_DECL_FINAL
void addTask(UThread* task)
{
U_TRACE(0, "UThreadPool::run()")
U_TRACE(0, "UThreadPool::addTask(%p)", task)
/*
for (task in queue)
{
if (task == STOP_WORKING) break;
U_INTERNAL_ASSERT(active)
do work;
}
*/
lock(&tasks_mutex);
queue.push(task);
unlock(&tasks_mutex);
signal(&condition); // Waking up the threads so they will know there is a job to do
}
// SERVICES
// This function gives the user the ability to send 10 tasks to the thread pool then to wait till
// all the tasks completed, and give the next 10 which are dependand on the result of the previous ones
void waitForWorkToBeFinished()
{
U_TRACE(0, "UThreadPool::waitForWorkToBeFinished()")
lock(&tasks_mutex);
while (queue._length != 0) wait(&tasks_mutex, &condition_task_finished);
unlock(&tasks_mutex);
}
// define method VIRTUAL of class UThread
virtual void run() U_DECL_OVERRIDE;
// DEBUG
@ -416,11 +531,11 @@ protected:
bool active;
#ifdef _MSWINDOWS_
CRITICAL_SECTION tasksMutex; // Task queue mutex
CONDITION_VARIABLE condition; // Condition variable
CRITICAL_SECTION tasks_mutex; // Task queue mutex
CONDITION_VARIABLE condition, condition_task_finished; // Condition variable
#else
pthread_mutex_t tasksMutex; // Task queue mutex
pthread_cond_t condition; // Condition variable
pthread_mutex_t tasks_mutex; // Task queue mutex
pthread_cond_t condition, condition_task_finished; // Condition variable
#endif
private:

View File

@ -347,7 +347,8 @@ public:
// SERVICES
void nanosleep();
void nanosleep();
static void nanosleep(time_t timeoutMS) { UTimeVal(timeoutMS / 1000L, (timeoutMS % 1000L) * 1000L).nanosleep(); }
// CHRONOMETER

View File

@ -86,10 +86,10 @@ void u_trace_lock(void)
#ifdef ENABLE_THREAD
# ifdef _MSWINDOWS_
if (old_tid == 0) InitializeCriticalSection(&mutex);
DWORD tid = GetCurrentThreadId();
if (old_tid == 0) InitializeCriticalSection(&mutex);
EnterCriticalSection(&mutex);
# else
pthread_t tid;

View File

@ -86,6 +86,7 @@ int inet_aton(const char* src, struct in_addr* addr)
return 1;
}
/*
const char* inet_ntop(int af, const void* src, char* dst, size_t size)
{
U_INTERNAL_TRACE("inet_ntop(%d,%p,%s,%d)", af, src, dst, size)
@ -122,6 +123,7 @@ const char* inet_ntop(int af, const void* src, char* dst, size_t size)
return 0;
}
*/
#define isWindow9x() (version.dwPlatformId == VER_PLATFORM_WIN32_WINDOWS)
#define isWindowNT() (version.dwPlatformId == VER_PLATFORM_WIN32_NT)

View File

@ -378,7 +378,7 @@ U_NO_EXPORT bool UCommand::postCommand(UString* input, UString* output)
UProcess::kill(pid, SIGTERM);
UTimeVal(1L).nanosleep();
UTimeVal::nanosleep(1L);
UProcess::kill(pid, SIGKILL);
}

View File

@ -392,7 +392,7 @@ char* UIPAddress::resolveStrAddress(int iAddressType, const void* src, char* ip)
char* result = 0;
#ifdef HAVE_INET_NTOP
result = (char*) U_SYSCALL(inet_ntop, "%d,%p,%p,%u", iAddressType, src, ip, U_INET_ADDRSTRLEN);
result = (char*) U_SYSCALL(inet_ntop, "%d,%p,%p,%u", iAddressType, (void*)src, ip, U_INET_ADDRSTRLEN);
#else
result = U_SYSCALL(inet_ntoa, "%u", *((struct in_addr*)src));

View File

@ -367,7 +367,7 @@ fd_set* UPing::checkForPingAsyncCompletion(uint32_t nfds)
if (nfds &&
SHM_counter < nfds)
{
UTimeVal(1L).nanosleep();
UTimeVal::nanosleep(1L);
U_INTERNAL_DUMP("SHM_counter = %u addrmask = %B", SHM_counter, __FDS_BITS(addrmask)[0])

View File

@ -947,7 +947,7 @@ bool UClientImage_Base::genericRead()
if (advise_for_parallelization)
{
# ifdef DEBUG
U_MESSAGE("UClientImage_Base::genericRead(): time_between_request(%ld) < time_run(%ld)", time_between_request, time_run);
U_MESSAGE("%9D (pid %P) UClientImage_Base::genericRead(): time_between_request(%ld) < time_run(%ld)", time_between_request, time_run);
# endif
if (UServer_Base::startParallelization(UServer_Base::num_client_threshold))

View File

@ -73,8 +73,6 @@
# include <ulib/net/server/plugin/mod_http.h>
#endif
#define U_DEFAULT_PORT 80
int UServer_Base::rkids;
int UServer_Base::old_pid;
int UServer_Base::timeoutMS;
@ -169,12 +167,12 @@ UVector<UIPAllow*>* UServer_Base::vallow_IP_prv;
#ifdef ENABLE_THREAD
#include <ulib/thread.h>
class UClientThread U_DECL_FINAL : public UThread {
class UClientThread : public UThread {
public:
UClientThread() : UThread(PTHREAD_CREATE_DETACHED) {}
virtual void run()
virtual void run() U_DECL_FINAL
{
U_TRACE(0, "UClientThread::run()")
@ -183,12 +181,12 @@ public:
};
# ifndef _MSWINDOWS_
class UTimeThread U_DECL_FINAL : public UThread {
class UTimeThread : public UThread {
public:
UTimeThread() : UThread(PTHREAD_CREATE_DETACHED) {}
virtual void run()
virtual void run() U_DECL_FINAL
{
U_TRACE(0, "UTimeThread::run()")
@ -249,12 +247,12 @@ public:
# include <ulib/net/tcpsocket.h>
# include <ulib/net/client/client.h>
class UOCSPStapling U_DECL_FINAL : public UThread {
class UOCSPStapling : public UThread {
public:
UOCSPStapling() : UThread(PTHREAD_CREATE_DETACHED) {}
virtual void run()
virtual void run() U_DECL_FINAL
{
U_TRACE(0, "UOCSPStapling::run()")
@ -300,7 +298,7 @@ UServer_Base::UServer_Base(UFileConfig* pcfg)
U_INTERNAL_ASSERT_EQUALS(cenvironment, 0)
U_INTERNAL_ASSERT_EQUALS(senvironment, 0)
port = U_DEFAULT_PORT;
port = 80;
pthis = this;
as_user = U_NEW(UString);
@ -355,8 +353,7 @@ UServer_Base::~UServer_Base()
U_INTERNAL_ASSERT_POINTER(vplugin)
#ifdef ENABLE_THREAD
# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# if !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1)
{
U_INTERNAL_ASSERT_POINTER(UNotifier::pthread)
@ -617,8 +614,8 @@ void UServer_Base::loadConfigParam()
// ENABLE_RFC1918_FILTER reject request from private IP to public server address
// MIN_SIZE_FOR_SENDFILE for major size it is better to use sendfile() to serve static content
//
// LISTEN_BACKLOG max number of ready to be delivered connections to accept()
// SET_REALTIME_PRIORITY flag indicating that the preforked processes will be scheduled under the real-time policies SCHED_FIFO
// LISTEN_BACKLOG max number of ready to be delivered connections to accept()
// SET_REALTIME_PRIORITY flag indicating that the preforked processes will be scheduled under the real-time policies SCHED_FIFO
//
// CLIENT_THRESHOLD min number of clients to active polling
// CLIENT_FOR_PARALLELIZATION min number of clients to active parallelization
@ -681,9 +678,9 @@ void UServer_Base::loadConfigParam()
if (timeoutMS > 0) timeoutMS *= 1000;
port = cfg->readLong(*UString::str_PORT, U_DEFAULT_PORT);
port = cfg->readLong(*UString::str_PORT, 80);
if (port == U_DEFAULT_PORT &&
if (port == 80 &&
UServices::isSetuidRoot() == false)
{
port = 8080;
@ -777,12 +774,12 @@ void UServer_Base::loadConfigParam()
enable_rfc1918_filter = cfg->readBoolean(U_CONSTANT_TO_PARAM("ENABLE_RFC1918_FILTER"));
#endif
// write pid on file...
x = (*cfg)[*UString::str_PID_FILE];
if (x)
{
// write pid on file
U_INTERNAL_ASSERT(x.isNullTerminated())
old_pid = UFile::getSysParam(x.data());
@ -834,7 +831,7 @@ void UServer_Base::loadConfigParam()
# endif
}
// DOCUMENT_ROOT: The directory out of which you will serve your documents
// DOCUMENT_ROOT: The directory out of which we will serve your documents
if (setDocumentRoot(cfg->at(U_CONSTANT_TO_PARAM("DOCUMENT_ROOT"))) == false)
{
@ -885,8 +882,6 @@ void UServer_Base::loadConfigParam()
if (loadPlugins(plugin_dir, plugin_list) == U_PLUGIN_HANDLER_ERROR) U_ERROR("Plugins stage load failed");
}
// load plugin modules and call server-wide hooks handlerConfig()...
U_NO_EXPORT void UServer_Base::loadStaticLinkedModules(const char* name)
{
U_TRACE(0, "UServer_Base::loadStaticLinkedModules(%S)", name)
@ -1061,7 +1056,7 @@ int UServer_Base::loadPlugins(UString& plugin_dir, const UString& plugin_list)
if (cfg)
{
// NB: we load configuration in reverse order respect to config var PLUGIN...
// NB: we load configuration in reverse order respect to the content of config var PLUGIN...
i = vplugin_size;
@ -1193,7 +1188,7 @@ int UServer_Base::pluginsHandler##xxx() \
U_PLUGIN_HANDLER(Request)
U_PLUGIN_HANDLER(Reset)
// NB: we call the various handlerXXX() in reverse order respect to config var PLUGIN...
// NB: we call the various handlerXXX() in reverse order respect to the content of config var PLUGIN...
#ifdef U_LOG_DISABLE
# define U_PLUGIN_HANDLER_REVERSE(xxx) \
@ -1274,14 +1269,14 @@ int UServer_Base::pluginsHandler##xxx() \
#endif
// Server-wide hooks
U_PLUGIN_HANDLER_REVERSE(Init) // NB: we call handlerInit() in reverse order respect to config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Run) // NB: we call handlerRun() in reverse order respect to config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Fork) // NB: we call handlerFork() in reverse order respect to config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Stop) // NB: we call handlerStop() in reverse order respect to config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Init) // NB: we call handlerInit() in reverse order respect to the content of config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Run) // NB: we call handlerRun() in reverse order respect to the content of config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Fork) // NB: we call handlerFork() in reverse order respect to the content of config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(Stop) // NB: we call handlerStop() in reverse order respect to the content of config var PLUGIN...
// Connection-wide hooks
U_PLUGIN_HANDLER_REVERSE(READ) // NB: we call handlerREAD() in reverse order respect to config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(READ) // NB: we call handlerREAD() in reverse order respect to the content of config var PLUGIN...
// SigHUP hook
U_PLUGIN_HANDLER_REVERSE(SigHUP) // NB: we call handlerSigHUP() in reverse order respect to config var PLUGIN...
U_PLUGIN_HANDLER_REVERSE(SigHUP) // NB: we call handlerSigHUP() in reverse order respect to the content of config var PLUGIN...
void UServer_Base::init()
{
@ -1341,7 +1336,7 @@ void UServer_Base::init()
host = U_NEW(UString(server ? *server : USocketExt::getNodeName()));
if (port != U_DEFAULT_PORT)
if (port != 80)
{
host->push_back(':');
@ -1463,7 +1458,7 @@ void UServer_Base::init()
delete str_preforked_num_kids;
str_preforked_num_kids = 0;
# if !defined(ENABLE_THREAD) || !defined(HAVE_EPOLL_WAIT) || !defined(U_SERVER_THREAD_APPROACH_SUPPORT) || defined(USE_LIBEVENT)
# if !defined(ENABLE_THREAD) || defined(USE_LIBEVENT) || !defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1)
{
U_WARNING("Sorry, I was compiled without server thread approach so I can't accept PREFORK_CHILD == -1");
@ -1614,7 +1609,7 @@ void UServer_Base::init()
#ifdef U_LOG_ENABLE
if (isLog())
{
// NB: if log is mapped must be always shared because of possibility of fork() by parallelization...
// NB: if log is mapped must be always shared because the possibility of fork() by parallelization...
if (log->isMemoryMapped()) log->setShared(&(ptr_shared_data->log_data_shared), log_rotate_size);
@ -1633,17 +1628,19 @@ void UServer_Base::init()
// ---------------------------------------------------------------------------------------------------------
// init notifier event manager
// ---------------------------------------------------------------------------------------------------------
// NB: in the classic model we don't need to be notified for request of connection (loop: accept-fork)
// and the forked child don't accept new client, but maybe we need anyway the event manager because
// the forked child must feel the possibly timeout for request from the new client...
// ---------------------------------------------------------------------------------------------------------
socket_flags |= O_RDWR | O_CLOEXEC;
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids != -1)
#endif
{
// ---------------------------------------------------------------------------------------------------------
// NB: in the classic model we don't need to be notified for request of connection (loop: accept-fork)
// and the forked child don't accept new client, but we need anyway the event manager because
// the forked child must feel the possibly timeout for request from the new client...
// ---------------------------------------------------------------------------------------------------------
if (timeoutMS > 0 ||
isClassic() == false)
{
@ -1671,12 +1668,12 @@ void UServer_Base::init()
UNotifier::max_connection = (UNotifier::max_connection ? UNotifier::max_connection : USocket::iBackLog / 2) + (UNotifier::num_connection = UNotifier::min_connection);
if (num_client_threshold == 0)
#ifdef U_SERVER_CHECK_TIME_BETWEEN_REQUEST
num_client_threshold = (UNotifier::max_connection * 2) / 3;
#else
num_client_threshold = U_NOT_FOUND;
#endif
if (num_client_threshold == 0) num_client_threshold =
# ifndef U_SERVER_CHECK_TIME_BETWEEN_REQUEST
U_NOT_FOUND;
# else
(UNotifier::max_connection * 2) / 3;
# endif
if (num_client_for_parallelization == 0) num_client_for_parallelization = UNotifier::max_connection / 2;
@ -1729,8 +1726,8 @@ void UServer_Base::init()
UInterrupt::exit_loop_wait_event_for_signal = true;
#if !defined(USE_LIBEVENT) && !defined(USE_RUBY)
UInterrupt::insert( SIGHUP, (sighandler_t)UServer_Base::handlerForSigHUP); // async signal
UInterrupt::insert(SIGTERM, (sighandler_t)UServer_Base::handlerForSigTERM); // async signal
UInterrupt::insert( SIGHUP, (sighandler_t)UServer_Base::handlerForSigHUP); // async signal
UInterrupt::insert( SIGTERM, (sighandler_t)UServer_Base::handlerForSigTERM); // async signal
#else
UInterrupt::setHandlerForSignal( SIGHUP, (sighandler_t)UServer_Base::handlerForSigHUP); // sync signal
UInterrupt::setHandlerForSignal(SIGTERM, (sighandler_t)UServer_Base::handlerForSigTERM); // sync signal
@ -1804,6 +1801,8 @@ RETSIGTYPE UServer_Base::handlerForSigCHLD(int signo)
{
U_TRACE(0, "[SIGCHLD] UServer_Base::handlerForSigCHLD(%d)", signo)
U_INTERNAL_ASSERT_POINTER(proc)
if (proc->parent()) proc->wait();
}
@ -1822,10 +1821,14 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo)
{
U_TRACE(0, "[SIGHUP] UServer_Base::handlerForSigHUP(%d)", signo)
U_INTERNAL_ASSERT_POINTER(proc)
U_INTERNAL_ASSERT_POINTER(pthis)
U_INTERNAL_ASSERT(proc->parent())
#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1) return;
#endif
// NB: for logrotate...
#ifdef U_LOG_ENABLE
@ -1842,10 +1845,11 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo)
(void) U_SYSCALL(gettimeofday, "%p,%p", u_now, 0);
#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_)
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if ( pthread_ocsp) pthread_ocsp->suspend();
# endif
if (u_pthread_time) ((UTimeThread*)u_pthread_time)->suspend();
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if (pthread_ocsp) pthread_ocsp->suspend();
# endif
#endif
pthis->handlerSignal(); // manage signal before we regenering the preforked pool of children...
@ -1863,10 +1867,11 @@ RETSIGTYPE UServer_Base::handlerForSigHUP(int signo)
#endif
#if defined(ENABLE_THREAD) && !defined(_MSWINDOWS_)
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if ( pthread_ocsp) pthread_ocsp->resume();
# endif
if (u_pthread_time) ((UTimeThread*)u_pthread_time)->resume();
# if defined(USE_LIBSSL) && !defined(OPENSSL_NO_OCSP) && defined(SSL_CTRL_SET_TLSEXT_STATUS_REQ_CB)
if (pthread_ocsp) pthread_ocsp->resume();
# endif
#endif
#ifdef U_LOG_ENABLE
@ -1979,7 +1984,7 @@ U_NO_EXPORT bool UServer_Base::clientImageHandlerRead()
U_RETURN(true);
}
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# define CSOCKET psocket
# define CLIENT_INDEX lClientIndex
# define CLIENT_ADDRESS lclient_address
@ -1994,7 +1999,7 @@ U_NO_EXPORT bool UServer_Base::clientImageHandlerRead()
# define CLIENT_IMAGE_HANDLER_READ clientImageHandlerRead()
#endif
int UServer_Base::handlerRead() // This method is called to accept a new connection on the server socket
int UServer_Base::handlerRead() // This method is called to accept a new connection on the server socket (listening)
{
U_TRACE(1, "UServer_Base::handlerRead()")
@ -2005,17 +2010,17 @@ int UServer_Base::handlerRead() // This method is called to accept a new connect
pClientImage = vClientImage + nClientIndex;
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
USocket* psocket;
char* lclient_address;
uint32_t lclient_address_len;
UClientImage_Base* lClientIndex = pClientImage;
#endif
int cround = 0;
#ifdef DEBUG
uint32_t nothing = 0;
CLIENT_ADDRESS_LEN = 0;
#endif
int cround = 0;
loop:
U_INTERNAL_ASSERT_MINOR(CLIENT_INDEX, eClientImage)
@ -2187,13 +2192,13 @@ try_accept:
U_SRV_LOG("WARNING: new client connected from %.*S, connection denied by Access Control List", CLIENT_ADDRESS_LEN, CLIENT_ADDRESS);
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids != -1)
# endif
{
U_INTERNAL_ASSERT_DIFFERS(socket_flags & O_NONBLOCK, 0)
# if defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT)
# ifndef USE_LIBEVENT
goto try_next;
# endif
}
@ -2214,13 +2219,13 @@ try_accept:
U_SRV_LOG("WARNING: new client connected from %.*S, connection denied by RFC1918 filtering (reject request from private IP to public server address)",
CLIENT_ADDRESS_LEN, CLIENT_ADDRESS);
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids != -1)
# endif
{
U_INTERNAL_ASSERT_DIFFERS(socket_flags & O_NONBLOCK, 0)
# if defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT)
# ifndef USE_LIBEVENT
goto try_next;
# endif
}
@ -2311,7 +2316,7 @@ retry: pid = UProcess::waitpid(-1, &status, WNOHANG); // NB: to avoid too much
ULog::log("New client connected from %v, %.*s clients currently connected", CLIENT_INDEX->logbuf->rep, len, buffer);
# ifdef U_WELCOME_SUPPORT
if (msg_welcome) ULog::log("Send welcome message to %v", CLIENT_INDEX->logbuf->rep);
if (msg_welcome) ULog::log("Sending welcome message to %v", CLIENT_INDEX->logbuf->rep);
# endif
}
#endif
@ -2328,7 +2333,7 @@ retry: pid = UProcess::waitpid(-1, &status, WNOHANG); // NB: to avoid too much
}
#endif
#if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
#if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids == -1) lClientIndex->UEventFd::fd = psocket->iSockDesc;
else
#endif
@ -2350,10 +2355,13 @@ retry: pid = UProcess::waitpid(-1, &status, WNOHANG); // NB: to avoid too much
next:
last_event = u_now->tv_sec;
#if defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT)
# if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
#ifdef USE_LIBEVENT
goto end;
#endif
#if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids != -1)
# endif
#endif
{
U_INTERNAL_ASSERT_DIFFERS(socket_flags & O_NONBLOCK, 0)
@ -2374,7 +2382,6 @@ next:
goto loop;
}
#endif
end:
#if defined(HAVE_EPOLL_CTL_BATCH) && !defined(USE_LIBEVENT)
@ -2410,13 +2417,13 @@ uint32_t UServer_Base::getNumConnection(char* ptr)
{
char* start = ptr;
*ptr++ = '(';
ptr += u_num2str32(ptr, UNotifier::num_connection - UNotifier::min_connection - 1);
*ptr++ = '/';
ptr += u_num2str32(ptr, U_TOT_CONNECTION - flag_loop); // NB: check for SIGTERM event...
*ptr++ = ')';
*ptr = '(';
ptr += 1+u_num2str32(ptr+1, UNotifier::num_connection - UNotifier::min_connection - 1);
*ptr = '/';
ptr += 1+u_num2str32(ptr+1, U_TOT_CONNECTION - flag_loop); // NB: check for SIGTERM event...
*ptr = ')';
len = ptr - start;
len = ptr-start+1;
}
U_RETURN(len);
@ -2461,7 +2468,7 @@ bool UServer_Base::handlerTimeoutConnection(void* cimg)
}
# endif
U_RETURN(true); // NB: erase item...
U_RETURN(true); // NB: return true mean that we want erase the item...
}
U_RETURN(false);
@ -2594,7 +2601,7 @@ void UServer_Base::runLoop(const char* user)
if (UNotifier::min_connection)
{
if (binsert) UNotifier::insert(pthis); // NB: we ask to be notified for request of connection (=> accept)
if (handler_other) UNotifier::insert(handler_other); // NB: we ask to be notified for request from request
if (handler_other) UNotifier::insert(handler_other); // NB: we ask to be notified for request from generic system
if (handler_inotify) UNotifier::insert(handler_inotify); // NB: we ask to be notified for change of file system (=> inotify)
}
@ -2638,7 +2645,7 @@ void UServer_Base::runLoop(const char* user)
U_INTERNAL_DUMP("ptime = %p handler_other = %p handler_inotify = %p UNotifier::num_connection = %u UNotifier::min_connection = %u",
ptime, handler_other, handler_inotify, UNotifier::num_connection, UNotifier::min_connection)
# if defined(ENABLE_THREAD) && defined(HAVE_EPOLL_WAIT) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# if defined(ENABLE_THREAD) && !defined(USE_LIBEVENT) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
if (preforked_num_kids != -1)
# endif
{
@ -2657,7 +2664,7 @@ void UServer_Base::runLoop(const char* user)
U_INTERNAL_ASSERT_EQUALS(socket_flags & O_NONBLOCK, 0)
# if !defined(ENABLE_THREAD) || !defined(HAVE_EPOLL_WAIT) || defined(USE_LIBEVENT) || !defined(U_SERVER_THREAD_APPROACH_SUPPORT)
# if !defined(ENABLE_THREAD) || defined(USE_LIBEVENT) || !defined(U_SERVER_THREAD_APPROACH_SUPPORT)
U_INTERNAL_ASSERT(UNotifier::min_connection == UNotifier::num_connection)
# endif

View File

@ -327,7 +327,7 @@ loop:
errno == EADDRINUSE &&
++counter <= 3)
{
UTimeVal(1L).nanosleep();
UTimeVal::nanosleep(1L);
goto loop;
}

View File

@ -15,10 +15,6 @@
#include <time.h>
#ifdef HAVE_SYS_SYSCALL_H
# include <sys/syscall.h>
#endif
#ifndef HAVE_PTHREAD_CANCEL
# ifdef SIGCANCEL
# define U_SIG_THREAD_CANCEL SIGCANCEL
@ -34,43 +30,6 @@ extern "C" { int nanosleep (const struct timespec* requested_time,
UThread* UThread::first;
UThread::UThread(int _detachstate)
{
U_TRACE_REGISTER_OBJECT(0, UThread, "%d", _detachstate)
next = 0;
tid = 0;
detachstate = _detachstate;
cancel = 0;
#ifdef _MSWINDOWS_
HANDLE process = GetCurrentProcess();
DuplicateHandle(process, GetCurrentThread(), process, (LPHANDLE)&tid, 0, FALSE, DUPLICATE_SAME_ACCESS);
cancellation = CreateEvent(NULL, TRUE, FALSE, NULL);
#else
suspendCount = 0;
(void) U_SYSCALL(pthread_attr_init, "%p", &attr);
(void) U_SYSCALL(pthread_attr_setdetachstate, "%p,%d", &attr, _detachstate);
#endif
}
UThread::~UThread()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)
if (tid)
{
# ifndef _MSWINDOWS_
if (isDetached()) suspend();
# endif
close();
}
}
void UThread::close()
{
U_TRACE(0, "UThread::close()")
@ -122,12 +81,12 @@ void UThread::close()
default: SetEvent(cancellation);
}
(void) ::WaitForSingleObject((HANDLE)_tid, INFINITE);
(void) WaitForSingleObject((HANDLE)_tid, INFINITE);
(void) U_SYSCALL(CloseHandle, "%p", cancellation);
(void) U_SYSCALL(CloseHandle, "%p", (HANDLE)_tid);
ExitThread(0);
_endthreadex(0);
# else
# ifdef HAVE_PTHREAD_CANCEL
(void) U_SYSCALL(pthread_cancel, "%p", _tid);
@ -142,10 +101,6 @@ void UThread::close()
# endif
# endif
}
#ifndef _MSWINDOWS_
(void) pthread_attr_destroy(&attr);
#endif
}
void UThread::yield()
@ -200,15 +155,6 @@ DWORD UThread::getTID()
U_RETURN(tid);
}
unsigned __stdcall UThread::execHandler(void* th)
{
U_TRACE(0, "UThread::::execHandler(%p)", th)
threadStart((UThread*)th);
U_RETURN(0);
}
#else
pid_t UThread::getTID()
{
@ -269,7 +215,7 @@ void UThread::manageSignal(int signo)
{
U_INTERNAL_DUMP("SUSPEND: start(%2D)")
// NB: A thread can wake up from pthread_cond_wait() at any name, not necessarily only when it is signalled.
// A thread can wake up from pthread_cond_wait() at any name, not necessarily only when it is signalled.
// This means that you need to pair pthread_cond_wait() with some shared state that encodes the condition that the thread is really waiting for
do {
@ -289,17 +235,12 @@ void UThread::manageSignal(int signo)
unlock(&mlock);
}
void UThread::execHandler(UThread* th)
void UThread::maskSignal()
{
U_TRACE(1, "UThread::execHandler(%p)", th)
#ifdef HAVE_SYS_SYSCALL_H
th->sid = syscall(SYS_gettid);
#endif
U_INTERNAL_ASSERT_EQUALS(pthread_self(), th->tid)
U_TRACE(1, "UThread::maskSignal()")
sigset_t mask;
#ifdef sigemptyset
sigemptyset(&mask);
#else
@ -339,11 +280,56 @@ void UThread::execHandler(UThread* th)
(void) U_SYSCALL(pthread_sigmask, "%d,%p,%p", SIG_UNBLOCK, &mask, 0);
th->sigInstall(U_SIGSTOP);
th->sigInstall(U_SIGCONT);
sigInstall(U_SIGSTOP);
sigInstall(U_SIGCONT);
#endif
}
threadStart(th);
bool UThread::initIPC(pthread_mutex_t* pmutex, pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::initIPC(%p,%p)", pmutex, pcond)
if (pmutex) /* initialize mutex */
{
pthread_mutexattr_t mutexattr;
if (U_SYSCALL(pthread_mutexattr_init, "%p", &mutexattr) != 0 ||
U_SYSCALL(pthread_mutexattr_setrobust, "%p,%d", &mutexattr, PTHREAD_MUTEX_ROBUST) != 0 ||
U_SYSCALL(pthread_mutexattr_setpshared, "%p,%d", &mutexattr, PTHREAD_PROCESS_SHARED) != 0 ||
U_SYSCALL(pthread_mutex_init, "%p,%p", pmutex, &mutexattr) != 0)
{
U_RETURN(false);
}
}
if (pcond) /* initialize condition variable */
{
pthread_condattr_t condattr;
if (U_SYSCALL(pthread_condattr_init, "%p", &condattr) != 0 ||
U_SYSCALL(pthread_condattr_setpshared, "%p,%d", &condattr, PTHREAD_PROCESS_SHARED) != 0 ||
U_SYSCALL(pthread_cond_init, "%p,%p", pcond, &condattr) != 0)
{
U_RETURN(false);
}
}
U_RETURN(true);
}
void UThread::doIPC(pthread_mutex_t* plock, pthread_cond_t* pcond, vPF function, bool wait)
{
U_TRACE(0, "UThread::doIPC(%p,%p,%p,%b)", plock, pcond, function, wait)
lock(plock);
if (wait) (void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, plock); // block until we are signalled from other...
function(); // ...than call function
unlock(plock);
if (wait == false) (void) U_SYSCALL(pthread_cond_signal, "%p", pcond); // signal to waiting thread...
}
#endif
@ -359,23 +345,53 @@ bool UThread::start(uint32_t timeoutMS)
U_INTERNAL_DUMP("first = %p next = %p", first, next)
#ifdef _MSWINDOWS_
(void) _beginthreadex(NULL, 0, execHandler, this, CREATE_SUSPENDED, (unsigned*)&sid);
HANDLE process = GetCurrentProcess();
if (tid == 0)
{
CloseHandle(cancellation);
cancellation = 0;
(void) DuplicateHandle(process, GetCurrentThread(), process, (LPHANDLE)&tid, 0, FALSE, DUPLICATE_SAME_ACCESS);
U_RETURN(false);
}
cancellation = CreateEvent(NULL, TRUE, FALSE, NULL);
// This starts a free standing procedure as a thread.
// That thread instantiates the class and calls its main method
void* NO_SECURITY_ATTRIBUTES = NULL;
const unsigned CREATE_IN_RUN_STATE = 0;
const unsigned USE_DEFAULT_STACK_SIZE = 0;
if (_beginthreadex(NO_SECURITY_ATTRIBUTES, USE_DEFAULT_STACK_SIZE, execHandler, this, CREATE_IN_RUN_STATE, (unsigned*)&sid) == 0)
{
int m_thread_start_error;
errno_t m_return_value = _get_errno(&m_thread_start_error);
if (m_return_value == 0)
{
U_WARNING("Create Thread Fail, error %d", m_thread_start_error);
}
else
{
U_WARNING("Create Thread Fail, get_errno fail, returned %d", m_return_value);
}
CloseHandle(cancellation);
cancellation = 0;
U_RETURN(false);
}
setCancel(cancelInitial);
SetThreadPriority((HANDLE)tid, THREAD_PRIORITY_NORMAL);
ResumeThread((HANDLE)tid);
#else
if (U_SYSCALL(pthread_create, "%p,%p,%p,%p", &tid, &attr, (pvPFpv)execHandler, this) == 0)
bool result;
pthread_attr_t attr;
(void) U_SYSCALL(pthread_attr_init, "%p", &attr);
(void) U_SYSCALL(pthread_attr_setdetachstate, "%p,%d", &attr, detachstate);
result = (U_SYSCALL(pthread_create, "%p,%p,%p,%p", &tid, &attr, (pvPFpv)execHandler, this) == 0);
(void) pthread_attr_destroy(&attr);
if (result)
{
if (timeoutMS)
{
@ -416,9 +432,11 @@ void UThread::setCancel(int mode)
case cancelInitial:
case cancelDisabled:
# ifdef HAVE_PTHREAD_CANCEL
{
# ifdef HAVE_PTHREAD_CANCEL
(void) U_SYSCALL(pthread_setcancelstate, "%d,%p", PTHREAD_CANCEL_DISABLE, &old);
# endif
# endif
}
break;
}
@ -470,7 +488,7 @@ void UThread::sleep(time_t timeoutMS)
{
U_TRACE(1, "UThread::sleep(%ld)", timeoutMS)
U_ASSERT(isCurrentThread(tid))
// U_ASSERT(isCurrentThread(tid))
struct timespec ts = { timeoutMS / 1000L, (timeoutMS % 1000L) * 1000000L };
@ -514,94 +532,150 @@ void UThread::sleep(time_t timeoutMS)
#endif
}
// Inter Process Communication
#ifndef _MSWINDOWS_
bool UThread::initIPC(pthread_mutex_t* pmutex, pthread_cond_t* pcond)
{
U_TRACE(0, "UThread::initIPC(%p,%p)", pmutex, pcond)
if (pmutex) /* initialize mutex */
{
pthread_mutexattr_t mutexattr;
if (U_SYSCALL(pthread_mutexattr_init, "%p", &mutexattr) != 0 ||
U_SYSCALL(pthread_mutexattr_setrobust, "%p,%d", &mutexattr, PTHREAD_MUTEX_ROBUST) != 0 ||
U_SYSCALL(pthread_mutexattr_setpshared, "%p,%d", &mutexattr, PTHREAD_PROCESS_SHARED) != 0 ||
U_SYSCALL(pthread_mutex_init, "%p,%p", pmutex, &mutexattr) != 0)
{
U_RETURN(false);
}
}
if (pcond) /* initialize condition variable */
{
pthread_condattr_t condattr;
if (U_SYSCALL(pthread_condattr_init, "%p", &condattr) != 0 ||
U_SYSCALL(pthread_condattr_setpshared, "%p,%d", &condattr, PTHREAD_PROCESS_SHARED) != 0 ||
U_SYSCALL(pthread_cond_init, "%p,%p", pcond, &condattr) != 0)
{
U_RETURN(false);
}
}
U_RETURN(true);
}
void UThread::doIPC(pthread_mutex_t* plock, pthread_cond_t* pcond, vPF function, bool wait)
{
U_TRACE(0, "UThread::doIPC(%p,%p,%p,%b)", plock, pcond, function, wait)
lock(plock);
if (wait) (void) U_SYSCALL(pthread_cond_wait, "%p,%p", pcond, plock); // block until we are signalled from other...
function(); // ...than call function
unlock(plock);
if (wait == false) (void) U_SYSCALL(pthread_cond_signal, "%p", pcond); // signal to waiting thread...
}
// THREAD POOL
UThreadPool::UThreadPool(uint32_t size) : UThread(PTHREAD_CREATE_DETACHED), pool(size)
{
U_TRACE_REGISTER_OBJECT(0, UThreadPool, "%u", size)
UThread* th;
active = true;
#ifdef _MSWINDOWS_
InitializeCriticalSection(&tasksMutex); // Task queue mutex
InitializeConditionVariable(&condition); // Condition variable
// Task queue mutex
InitializeCriticalSection(&tasks_mutex);
// Condition variable
InitializeConditionVariable(&condition);
InitializeConditionVariable(&condition_task_finished);
// This starts a free standing procedure as a thread.
// That thread instantiates the class and calls its main method
void* NO_SECURITY_ATTRIBUTES = NULL;
const unsigned CREATE_IN_RUN_STATE = 0;
const unsigned USE_DEFAULT_STACK_SIZE = 0;
for (uint32_t i = 0; i < size; ++i)
{
th = U_NEW(UThread(UThread::detachstate));
if (_beginthreadex(NO_SECURITY_ATTRIBUTES, USE_DEFAULT_STACK_SIZE, execHandler, this, CREATE_IN_RUN_STATE, (unsigned*)&sid) == 0)
{
int m_thread_start_error;
errno_t m_return_value = _get_errno(&m_thread_start_error);
if (m_return_value == 0)
{
U_WARNING("Create Thread Fail, error %d", m_thread_start_error);
}
else
{
U_WARNING("Create Thread Fail, get_errno fail, returned %d", m_return_value);
}
delete th;
continue;
}
pool.push_back(th);
}
#else
tasksMutex = PTHREAD_MUTEX_INITIALIZER; // Task queue mutex
condition = PTHREAD_COND_INITIALIZER; // Condition variable
pthread_attr_t attr;
// Task queue mutex
tasks_mutex = PTHREAD_MUTEX_INITIALIZER;
// Condition variable
condition =
condition_task_finished = PTHREAD_COND_INITIALIZER;
(void) U_SYSCALL(pthread_attr_init, "%p", &attr);
(void) U_SYSCALL(pthread_attr_setdetachstate, "%p,%d", &attr, UThread::detachstate);
for (uint32_t i = 0; i < size; ++i)
{
th = U_NEW(UThread(UThread::detachstate));
if (U_SYSCALL(pthread_create, "%p,%p,%p,%p", &(th->tid), &attr, (pvPFpv)execHandler, this))
{
delete th;
continue;
}
pool.push(th);
}
(void) pthread_attr_destroy(&attr);
#endif
UTimeVal::nanosleep(200); // wait for UThreadPool init completion
}
UThreadPool::~UThreadPool()
{
U_TRACE_UNREGISTER_OBJECT(0, UThread)
active = false;
lock(&tasks_mutex);
signalAll(&condition);
queue.clear();
unlock(&tasks_mutex);
#ifdef _MSWINDOWS_
DeleteCriticalSection(&tasks_mutex);
#endif
}
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UThreadPool::dump(bool reset) const
// define method VIRTUAL of class UThread
void UThreadPool::run()
{
*UObjectIO::os << "active " << active << '\n'
<< "pool (UVector " << (void*)&pool << ")\n"
<< "queue (UVector " << (void*)&queue << ')';
U_TRACE(0, "UThreadPool::run()")
if (reset)
{
UObjectIO::output();
UThread* current_task;
return UObjectIO::buffer_output;
do {
// We need to put pthread_cond_wait in a loop for two reasons:
//
// 1. There can be spurious wakeups (due to signal/ENITR)
//
// 2. When tasks_mutex is released for waiting, another thread can be waken up from a signal/broadcast and that
// thread can miss up the condition. So when the current thread wakes up the condition may no longer be actually true!
lock(&tasks_mutex);
while (queue._length == 0 && active)
{
// Wait until there is a task in the queue
wait(&tasks_mutex, &condition); // Unlock tasks_mutex while wait, then lock it back when signaled
}
if (active == false) // Destructor ordered on abort
{
unlock(&tasks_mutex);
return;
}
// If we got here, we successfully acquired the lock and the queue<Task> is not empty
current_task = queue.pop();
unlock(&tasks_mutex);
current_task->run(); // execute the task
delete current_task;
signal(&condition_task_finished);
}
return 0;
while (active);
}
#endif
#endif
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UThread::dump(bool reset) const
@ -622,4 +696,20 @@ const char* UThread::dump(bool reset) const
return 0;
}
const char* UThreadPool::dump(bool reset) const
{
*UObjectIO::os << "active " << active << '\n'
<< "pool (UVector " << (void*)&pool << ")\n"
<< "queue (UVector " << (void*)&queue << ')';
if (reset)
{
UObjectIO::output();
return UObjectIO::buffer_output;
}
return 0;
}
#endif

View File

@ -43,5 +43,13 @@ child end
father end
starting thread
thread delete
starting thread pool
Hello World!
Hello World!
Hello World!
Hello World!
thread pool finished
Now program should finish... :)

View File

@ -1,6 +1,7 @@
// test_thread.cpp
#include <ulib/thread.h>
#include <ulib/timeval.h>
#undef OK
#define OK {printf("ok\n");}
@ -175,6 +176,22 @@ public:
}
};
class Task : public UThread {
public:
Task() : UThread(PTHREAD_CREATE_JOINABLE) {}
~Task() {}
void run()
{
U_TRACE(5, "Task::run()")
printf("Hello World!\n");
sleep(1000);
}
};
int U_EXPORT main(int argc, char* argv[])
{
U_ULIB_INIT(argv);
@ -255,7 +272,7 @@ int U_EXPORT main(int argc, char* argv[])
th->start();
UThread::nanosleep(200);
UTimeVal::nanosleep(200);
U_INTERNAL_DUMP("FATHER DELETE = %p", th)
@ -269,10 +286,20 @@ int U_EXPORT main(int argc, char* argv[])
th->start();
UThread::nanosleep(100); // 150 millisecond
UTimeVal::nanosleep(100); // 150 millisecond
delete th; // delete to join
cout << "thread delete\n\nstarting thread pool" << endl;
UThreadPool tp(2);
for (int i = 0; i < 4; ++i) tp.addTask(U_NEW(Task));
tp.waitForWorkToBeFinished();
cout << "thread pool finished" << endl;
printf("\nNow program should finish... :)\n");
return 0;