mirror of
https://github.com/stefanocasazza/ULib.git
synced 2025-09-28 19:05:55 +08:00
1560 lines
43 KiB
C++
1560 lines
43 KiB
C++
// ============================================================================
|
|
//
|
|
// = LIBRARY
|
|
// ULib - c++ library
|
|
//
|
|
// = FILENAME
|
|
// notifier.cpp
|
|
//
|
|
// = AUTHOR
|
|
// Stefano Casazza
|
|
//
|
|
// ============================================================================
|
|
|
|
#include <ulib/timer.h>
|
|
#include <ulib/notifier.h>
|
|
#include <ulib/net/socket.h>
|
|
#include <ulib/internal/chttp.h>
|
|
#include <ulib/utility/interrupt.h>
|
|
#include <ulib/net/server/server_plugin.h>
|
|
|
|
#ifdef HAVE_KQUEUE
|
|
# include <sys/event.h>
|
|
#endif
|
|
|
|
#ifndef HAVE_POLL_H
|
|
UEventTime* UNotifier::time_obj;
|
|
#else
|
|
# include <poll.h>
|
|
struct pollfd UNotifier::fds[1];
|
|
#endif
|
|
|
|
#ifdef USE_FSTACK
|
|
# include <ff_epoll.h>
|
|
#endif
|
|
|
|
/**
|
|
* typedef union epoll_data {
|
|
* void* ptr;
|
|
* int fd;
|
|
* uint32_t u32;
|
|
* uint64_t u64;
|
|
* } epoll_data_t;
|
|
*
|
|
* struct epoll_event {
|
|
* uint32_t events; // Epoll events
|
|
* epoll_data_t data; // User data variable
|
|
* };
|
|
*
|
|
* struct epoll_ctl_cmd {
|
|
* int flags; // Reserved flags for future extension, must be 0
|
|
* int op; // The same as epoll_ctl() op parameter
|
|
* int fd; // The same as epoll_ctl() fd parameter
|
|
* uint32_t events; // The same as the "events" field in struct epoll_event
|
|
* uint64_t data; // The same as the "data" field in struct epoll_event
|
|
* int error_hint; // Output field, will be set to the return code after this command is executed by kernel
|
|
* };
|
|
*/
|
|
|
|
int UNotifier::nfd_ready; // the number of file descriptors ready for the requested I/O
|
|
bool UNotifier::flag_sigterm;
|
|
long UNotifier::last_event;
|
|
uint32_t UNotifier::min_connection;
|
|
uint32_t UNotifier::num_connection;
|
|
uint32_t UNotifier::max_connection;
|
|
uint32_t UNotifier::lo_map_fd_len;
|
|
uint32_t UNotifier::bepollet_threshold = 10;
|
|
UEventFd* UNotifier::handler_event;
|
|
UEventFd** UNotifier::lo_map_fd;
|
|
|
|
UGenericHashMap<unsigned int,UEventFd*>* UNotifier::hi_map_fd; // maps a fd to a node pointer
|
|
|
|
#ifdef DEBUG
|
|
uint32_t UNotifier::nwatches;
|
|
uint32_t UNotifier::max_nfd_ready;
|
|
#endif
|
|
|
|
#if defined(ENABLE_THREAD) && defined(U_SERVER_THREAD_APPROACH_SUPPORT)
|
|
UThread* UNotifier::pthread;
|
|
# ifdef _MSWINDOWS_
|
|
CRITICAL_SECTION UNotifier::mutex;
|
|
# else
|
|
pthread_mutex_t UNotifier::mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
# endif
|
|
#endif
|
|
|
|
#ifdef USE_LIBEVENT
|
|
void UEventFd::operator()(int _fd, short event)
|
|
{
|
|
U_TRACE(0, "UEventFd::operator()(%d,%hd)", _fd, event)
|
|
|
|
int ret = (event == EV_READ ? handlerRead() : handlerWrite());
|
|
|
|
U_INTERNAL_DUMP("ret = %d", ret)
|
|
|
|
if (ret == U_NOTIFIER_DELETE) UNotifier::handlerDelete(this);
|
|
}
|
|
#else
|
|
# ifdef HAVE_EPOLL_WAIT
|
|
int UNotifier::epollfd;
|
|
struct epoll_event* UNotifier::events;
|
|
struct epoll_event* UNotifier::pevents;
|
|
# ifdef U_EPOLLET_POSTPONE_STRATEGY
|
|
bool UNotifier::bepollet;
|
|
# endif
|
|
# ifdef HAVE_EPOLL_CTL_BATCH
|
|
int UNotifier::ctl_cmd_cnt;
|
|
struct epoll_ctl_cmd UNotifier::ctl_cmd[U_EPOLL_CTL_CMD_SIZE];
|
|
|
|
void UNotifier::batch(UEventFd* item)
|
|
{
|
|
U_TRACE(0, "UNotifier::batch(%p)", item)
|
|
|
|
U_INTERNAL_ASSERT_POINTER(item)
|
|
|
|
U_INTERNAL_DUMP("item->fd = %d item->op_mask = %B num_connection = %d", item->fd, item->op_mask, num_connection)
|
|
|
|
U_INTERNAL_ASSERT_EQUALS(ctl_cmd[ctl_cmd_cnt].op, EPOLL_CTL_ADD)
|
|
U_INTERNAL_ASSERT_EQUALS(ctl_cmd[ctl_cmd_cnt].events, EPOLLIN | EPOLLRDHUP | EPOLLET)
|
|
|
|
ctl_cmd[ctl_cmd_cnt].fd = item->fd;
|
|
ctl_cmd[ctl_cmd_cnt].data = item;
|
|
|
|
if (++ctl_cmd_cnt >= U_EPOLL_CTL_CMD_SIZE) insertBatch();
|
|
}
|
|
# endif
|
|
# elif defined(HAVE_KQUEUE)
|
|
int UNotifier::kq;
|
|
int UNotifier::nkqevents;
|
|
struct kevent* UNotifier::kqevents;
|
|
struct kevent* UNotifier::kqrevents;
|
|
# else
|
|
int UNotifier::fd_set_max;
|
|
int UNotifier::fd_read_cnt;
|
|
int UNotifier::fd_write_cnt;
|
|
fd_set UNotifier::fd_set_read;
|
|
fd_set UNotifier::fd_set_write;
|
|
# endif
|
|
bool UNotifier::bread;
|
|
|
|
U_NO_EXPORT void UNotifier::notifyHandlerEvent()
|
|
{
|
|
U_TRACE_NO_PARAM(0, "UNotifier::notifyHandlerEvent()")
|
|
|
|
U_INTERNAL_ASSERT_POINTER(handler_event)
|
|
|
|
U_INTERNAL_DUMP("handler_event = %p bread = %b nfd_ready = %d fd = %d op_mask = %d %B",
|
|
handler_event, bread, nfd_ready, handler_event->fd, handler_event->op_mask, handler_event->op_mask)
|
|
|
|
if ((bread ? handler_event->handlerRead()
|
|
: handler_event->handlerWrite()) == U_NOTIFIER_DELETE)
|
|
{
|
|
handlerDelete(handler_event);
|
|
}
|
|
}
|
|
|
|
void UNotifier::init()
|
|
{
|
|
U_TRACE(0, "UNotifier::init()")
|
|
|
|
#ifdef HAVE_EPOLL_WAIT
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
int old = epollfd;
|
|
|
|
U_INTERNAL_DUMP("old = %d", old)
|
|
|
|
# if defined(HAVE_EPOLL_CREATE1) && !defined(USE_FSTACK)
|
|
epollfd = U_SYSCALL(epoll_create1, "%d", EPOLL_CLOEXEC);
|
|
if (epollfd != -1 || errno != ENOSYS) goto next;
|
|
# endif
|
|
epollfd = U_FF_SYSCALL(epoll_create, "%u", max_connection);
|
|
# if defined(HAVE_EPOLL_CREATE1) && !defined(USE_FSTACK)
|
|
next:
|
|
# endif
|
|
U_INTERNAL_ASSERT_DIFFERS(epollfd, -1)
|
|
|
|
if (old)
|
|
{
|
|
U_INTERNAL_DUMP("num_connection = %u", num_connection)
|
|
|
|
if (num_connection)
|
|
{
|
|
U_INTERNAL_ASSERT_POINTER(lo_map_fd)
|
|
U_INTERNAL_ASSERT_POINTER(hi_map_fd)
|
|
|
|
// NB: reinitialized all after fork()...
|
|
|
|
for (int fd = 1; fd < (int32_t)lo_map_fd_len; ++fd)
|
|
{
|
|
if ((handler_event = lo_map_fd[fd]))
|
|
{
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %d %B", fd, handler_event->op_mask, handler_event->op_mask)
|
|
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", old, EPOLL_CTL_DEL, fd, (struct epoll_event*)1);
|
|
|
|
struct epoll_event _events = { handler_event->op_mask | EPOLLEXCLUSIVE | EPOLLROUNDROBIN, { handler_event } };
|
|
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", epollfd, EPOLL_CTL_ADD, fd, &_events);
|
|
}
|
|
}
|
|
|
|
if (hi_map_fd->first())
|
|
{
|
|
do {
|
|
handler_event = hi_map_fd->elem();
|
|
|
|
U_INTERNAL_DUMP("op_mask = %d %B", handler_event->op_mask, handler_event->op_mask)
|
|
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", old, EPOLL_CTL_DEL, handler_event->fd, (struct epoll_event*)1);
|
|
|
|
struct epoll_event _events = { handler_event->op_mask | EPOLLEXCLUSIVE | EPOLLROUNDROBIN, { handler_event } };
|
|
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", epollfd, EPOLL_CTL_ADD, handler_event->fd, &_events);
|
|
}
|
|
while (hi_map_fd->next());
|
|
}
|
|
}
|
|
|
|
(void) U_FF_SYSCALL(close, "%d", old);
|
|
|
|
return;
|
|
}
|
|
#elif defined(HAVE_KQUEUE)
|
|
int old = kq;
|
|
|
|
U_INTERNAL_DUMP("old = %d", old)
|
|
|
|
# ifdef HAVE_KQUEUE1
|
|
kq = U_SYSCALL(kqueue1, "%d", O_CLOEXEC);
|
|
if (kq != -1 || errno != ENOSYS) goto next;
|
|
# endif
|
|
kq = U_SYSCALL_NO_PARAM(kqueue);
|
|
next:
|
|
U_INTERNAL_ASSERT_DIFFERS(kq, -1)
|
|
|
|
if (old)
|
|
{
|
|
U_INTERNAL_DUMP("num_connection = %u", num_connection)
|
|
|
|
if (num_connection)
|
|
{
|
|
U_INTERNAL_ASSERT_POINTER(kqevents)
|
|
U_INTERNAL_ASSERT_POINTER(kqrevents)
|
|
U_INTERNAL_ASSERT_POINTER(lo_map_fd)
|
|
U_INTERNAL_ASSERT_POINTER(hi_map_fd)
|
|
|
|
// NB: reinitialized all after fork()...
|
|
|
|
for (int fd = 1; fd < (int32_t)lo_map_fd_len; ++fd)
|
|
{
|
|
if ((handler_event = lo_map_fd[fd]))
|
|
{
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %d %B", fd, handler_event->op_mask, handler_event->op_mask)
|
|
|
|
EV_SET(kqevents+nkqevents++, handler_event->fd,
|
|
((handler_event->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0) ? EVFILT_READ : EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, (void*)handler_event);
|
|
}
|
|
}
|
|
|
|
if (hi_map_fd->first())
|
|
{
|
|
do {
|
|
handler_event = hi_map_fd->elem();
|
|
|
|
U_INTERNAL_DUMP("op_mask = %d %B", handler_event->op_mask, handler_event->op_mask)
|
|
|
|
EV_SET(kqevents+nkqevents++, handler_event->fd,
|
|
((handler_event->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0) ? EVFILT_READ : EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, (void*)handler_event);
|
|
}
|
|
while (hi_map_fd->next());
|
|
}
|
|
}
|
|
|
|
(void) U_FF_SYSCALL(close, "%d", old);
|
|
|
|
return;
|
|
}
|
|
|
|
U_INTERNAL_ASSERT_EQUALS(kqevents, 0)
|
|
U_INTERNAL_ASSERT_EQUALS(kqrevents, 0)
|
|
|
|
kqevents = (struct kevent*) UMemoryPool::_malloc(max_connection, sizeof(struct kevent), true);
|
|
kqrevents = (struct kevent*) UMemoryPool::_malloc(max_connection, sizeof(struct kevent), true);
|
|
|
|
// Check for Mac OS X kqueue bug. If kqueue works, then kevent will succeed, and it will stick an error in events[0].
|
|
// If kqueue is broken, then kevent will fail (This detects an old bug in Mac OS X 10.4, fixed in 10.5)
|
|
|
|
EV_SET(kqevents, -1, EVFILT_READ, EV_ADD, 0, 0, 0);
|
|
|
|
if (U_SYSCALL(kevent, "%d,%p,%d,%p,%d,%p", kq, kqevents, 1, kqrevents, max_connection, 0) != 1 ||
|
|
kqrevents[0].ident != -1 ||
|
|
(kqrevents[0].flags & EV_ERROR) == 0)
|
|
{
|
|
U_ERROR("Detected broken kevent()...");
|
|
}
|
|
#endif
|
|
|
|
if (lo_map_fd == U_NULLPTR)
|
|
{
|
|
createMapFd();
|
|
|
|
#ifdef HAVE_EPOLL_WAIT
|
|
U_INTERNAL_ASSERT_EQUALS(events, U_NULLPTR)
|
|
|
|
events =
|
|
pevents = (struct epoll_event*) UMemoryPool::_malloc(max_connection+1, sizeof(struct epoll_event), true);
|
|
|
|
# ifdef HAVE_EPOLL_CTL_BATCH
|
|
for (int i = 0; i < U_EPOLL_CTL_CMD_SIZE; ++i)
|
|
{
|
|
ctl_cmd[i].op = EPOLL_CTL_ADD;
|
|
ctl_cmd[i].events = EPOLLIN | EPOLLRDHUP | EPOLLET;
|
|
}
|
|
# endif
|
|
#endif
|
|
}
|
|
}
|
|
|
|
void UNotifier::resume(UEventFd* item, uint32_t flags)
|
|
{
|
|
U_TRACE(0, "UNotifier::resume(%p,%u)", item, flags)
|
|
|
|
U_INTERNAL_ASSERT_POINTER(item)
|
|
|
|
#ifdef HAVE_EPOLL_WAIT
|
|
struct epoll_event _events = { flags, { item } };
|
|
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", epollfd, EPOLL_CTL_ADD, item->fd, &_events);
|
|
#elif defined(HAVE_KQUEUE)
|
|
U_INTERNAL_ASSERT_MAJOR(kq, 0)
|
|
U_INTERNAL_ASSERT_MINOR(nkqevents, max_connection)
|
|
|
|
EV_SET(kqevents+nkqevents++, item->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, (void*)item);
|
|
#else
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(item->fd, &fd_set_write), 0)
|
|
|
|
FD_SET(item->fd, &fd_set_write);
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_write = %B", __FDS_BITS(&fd_set_write)[0])
|
|
# endif
|
|
|
|
++fd_write_cnt;
|
|
|
|
U_INTERNAL_ASSERT(fd_write_cnt >= 0)
|
|
#endif
|
|
}
|
|
|
|
void UNotifier::suspend(UEventFd* item)
|
|
{
|
|
U_TRACE(0, "UNotifier::suspend(%p)", item)
|
|
|
|
U_INTERNAL_ASSERT_POINTER(item)
|
|
|
|
#ifdef HAVE_EPOLL_WAIT
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", epollfd, EPOLL_CTL_DEL, item->fd, (struct epoll_event*)1);
|
|
#elif defined(HAVE_KQUEUE)
|
|
U_INTERNAL_ASSERT_MAJOR(kq, 0)
|
|
U_INTERNAL_ASSERT_MINOR(nkqevents, max_connection)
|
|
|
|
EV_SET(kqevents+nkqevents++, item->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, (void*)item);
|
|
#else
|
|
U_INTERNAL_ASSERT(FD_ISSET(item->fd, &fd_set_write))
|
|
|
|
FD_CLR(item->fd, &fd_set_write);
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_write = %B", __FDS_BITS(&fd_set_write)[0])
|
|
# endif
|
|
|
|
--fd_write_cnt;
|
|
|
|
U_INTERNAL_ASSERT(fd_write_cnt >= 0)
|
|
#endif
|
|
}
|
|
|
|
int UNotifier::waitForEvent(int fd_max, fd_set* read_set, fd_set* write_set, UEventTime* ptimeout)
|
|
{
|
|
U_TRACE(1, "UNotifier::waitForEvent(%d,%p,%p,%p)", fd_max, read_set, write_set, ptimeout)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
int result;
|
|
|
|
#ifdef HAVE_EPOLL_WAIT
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
result = U_FF_SYSCALL(epoll_wait, "%d,%p,%u,%d", epollfd, events, max_connection, UEventTime::getMilliSecond(ptimeout));
|
|
#elif defined(HAVE_KQUEUE)
|
|
result = U_SYSCALL(kevent, "%d,%p,%d,%p,%d,%p", kq, kqevents, nkqevents, kqrevents, max_connection, UEventTime::getTimeSpec(ptimeout));
|
|
nkqevents = 0;
|
|
#else
|
|
// If both fields of the timeval structure are zero, then select() returns immediately.
|
|
// (This is useful for polling). If ptimeout is NULL (no timeout), select() can block indefinitely...
|
|
//
|
|
// On Linux, the function select modifies timeout to reflect the amount of time not slept; most other implementations do not do this.
|
|
// This causes problems both when Linux code which reads timeout is ported to other operating systems, and when code is ported to Linux
|
|
// that reuses a struct timeval for multiple selects in a loop without reinitializing it. Consider timeout to be undefined after select returns
|
|
|
|
# if defined(DEBUG) && !defined(_MSWINDOWS_)
|
|
if ( read_set) U_INTERNAL_DUMP(" read_set = %B", __FDS_BITS( read_set)[0])
|
|
if (write_set) U_INTERNAL_DUMP("write_set = %B", __FDS_BITS(write_set)[0])
|
|
# endif
|
|
|
|
result = U_FF_SYSCALL(select, "%d,%p,%p,%p,%p", fd_max, read_set, write_set, 0, UEventTime::getTimeVal(ptimeout));
|
|
|
|
# if defined(DEBUG) && !defined(_MSWINDOWS_)
|
|
if ( read_set) U_INTERNAL_DUMP(" read_set = %B", __FDS_BITS( read_set)[0])
|
|
if (write_set) U_INTERNAL_DUMP("write_set = %B", __FDS_BITS(write_set)[0])
|
|
# endif
|
|
#endif
|
|
|
|
U_RETURN(result);
|
|
}
|
|
|
|
void UNotifier::waitForEvent(UEventTime* ptimeout)
|
|
{
|
|
U_TRACE(0, "UNotifier::waitForEvent(%p)", ptimeout)
|
|
|
|
#if !defined(HAVE_EPOLL_WAIT) && !defined(HAVE_KQUEUE)
|
|
fd_set read_set, write_set;
|
|
loop:
|
|
U_INTERNAL_ASSERT(fd_read_cnt > 0 || fd_write_cnt > 0)
|
|
|
|
if (fd_read_cnt) read_set = fd_set_read;
|
|
if (fd_write_cnt) write_set = fd_set_write;
|
|
|
|
nfd_ready = waitForEvent(fd_set_max,
|
|
(fd_read_cnt ? &read_set
|
|
: 0),
|
|
(fd_write_cnt ? &write_set
|
|
: 0),
|
|
ptimeout);
|
|
#elif defined(HAVE_KQUEUE)
|
|
loop:
|
|
nfd_ready = U_SYSCALL(kevent, "%d,%p,%d,%p,%d,%p", kq, kqevents, nkqevents, kqrevents, max_connection, UEventTime::getTimeSpec(ptimeout));
|
|
# ifdef DEBUG
|
|
if (nfd_ready == -1 &&
|
|
errno == EINVAL)
|
|
{
|
|
U_ERROR("kqueue() return EINVAL...");
|
|
}
|
|
# endif
|
|
nkqevents = 0;
|
|
#else
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
loop:
|
|
nfd_ready = U_FF_SYSCALL(epoll_wait, "%d,%p,%u,%d", epollfd, events, max_connection, UEventTime::getMilliSecond(ptimeout));
|
|
#endif
|
|
|
|
if (nfd_ready > 0)
|
|
{
|
|
# ifdef DEBUG
|
|
if (max_nfd_ready < (uint32_t)nfd_ready) max_nfd_ready = nfd_ready;
|
|
|
|
U_INTERNAL_DUMP("max_nfd_ready = %u", max_nfd_ready)
|
|
# endif
|
|
|
|
U_gettimeofday // NB: optimization if it is enough a time resolution of one second...
|
|
|
|
last_event = u_now->tv_sec;
|
|
|
|
# if !defined(HAVE_EPOLL_WAIT) && !defined(HAVE_KQUEUE)
|
|
int fd = 1, fd_cnt = (fd_read_cnt + fd_write_cnt);
|
|
|
|
U_INTERNAL_DUMP("fd_cnt = %d fd_set_max = %d", fd_cnt, fd_set_max)
|
|
|
|
U_INTERNAL_ASSERT(nfd_ready <= fd_cnt)
|
|
|
|
for (int i = nfd_ready; fd < fd_set_max; ++fd)
|
|
{
|
|
bread = (fd_read_cnt && FD_ISSET(fd, &read_set));
|
|
|
|
if ((bread ||
|
|
(fd_write_cnt && FD_ISSET(fd, &write_set)) != 0) &&
|
|
setHandler(fd))
|
|
{
|
|
notifyHandlerEvent();
|
|
|
|
if (--i == 0)
|
|
{
|
|
U_INTERNAL_DUMP("fd = %d: return", fd)
|
|
|
|
if (fd_cnt > (fd_read_cnt + fd_write_cnt)) fd_set_max = getNFDS();
|
|
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
# elif defined(HAVE_KQUEUE)
|
|
int i = 0;
|
|
struct kevent* pkqrevents = kqrevents;
|
|
|
|
loop0:
|
|
U_INTERNAL_ASSERT_POINTER(pkqrevents->udata)
|
|
|
|
handler_event = (UEventFd*)pkqrevents->udata;
|
|
|
|
U_INTERNAL_DUMP("i = %d handler_event->fd = %d", i, handler_event->fd)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS((handler_event->fd, -1)
|
|
|
|
// if (handler_event->fd != -1)
|
|
{
|
|
U_INTERNAL_DUMP("bread = %b bwrite = %b", ((pkqrevents->flags & EVFILT_READ) != 0), ((pkqrevents->flags & EVFILT_WRITE) != 0))
|
|
|
|
if (UNLIKELY((pkqrevents->flags & EV_ERROR) != 0) ||
|
|
(LIKELY((pkqrevents->flags & EVFILT_READ) != 0) ? handler_event->handlerRead()
|
|
: handler_event->handlerWrite()) == U_NOTIFIER_DELETE)
|
|
{
|
|
handlerDelete(handler_event);
|
|
}
|
|
}
|
|
|
|
if (++i < nfd_ready)
|
|
{
|
|
++pkqrevents;
|
|
|
|
goto loop0;
|
|
}
|
|
# else
|
|
int i = 0;
|
|
bool bdelete;
|
|
pevents = events;
|
|
# ifdef U_EPOLLET_POSTPONE_STRATEGY
|
|
bool bloop1 = false;
|
|
bepollet = ((uint32_t)nfd_ready >= bepollet_threshold);
|
|
|
|
U_INTERNAL_DUMP("bepollet = %b nfd_ready = %d bepollet_threshold = %u", bepollet, nfd_ready, bepollet_threshold)
|
|
# endif
|
|
|
|
loop0:
|
|
U_INTERNAL_ASSERT_POINTER(pevents->data.ptr)
|
|
|
|
handler_event = (UEventFd*)pevents->data.ptr;
|
|
|
|
U_INTERNAL_DUMP("i = %d handler_event->fd = %d", i, handler_event->fd)
|
|
|
|
// U_INTERNAL_ASSERT_DIFFERS(handler_event->fd, -1)
|
|
|
|
if (handler_event->fd != -1)
|
|
{
|
|
U_INTERNAL_DUMP("bread = %b bwrite = %b events[%d].events = %d %B", ((pevents->events & (EPOLLIN | EPOLLRDHUP)) != 0),
|
|
((pevents->events & EPOLLOUT) != 0), (pevents -events), pevents->events, pevents->events)
|
|
|
|
/**
|
|
* EPOLLIN = 0x0001
|
|
* EPOLLPRI = 0x0002
|
|
* EPOLLOUT = 0x0004
|
|
* EPOLLERR = 0x0008
|
|
* EPOLLHUP = 0x0010
|
|
* EPOLLRDNORM = 0x0040
|
|
* EPOLLRDBAND = 0x0080
|
|
* EPOLLWRNORM = 0x0100
|
|
* EPOLLWRBAND = 0x0200
|
|
* EPOLLMSG = 0x0400
|
|
* EPOLLRDHUP = 0x2000
|
|
*
|
|
* <10000000 00000100 00000000 00000000>
|
|
* EPOLLIN EPOLLRDHUP
|
|
*/
|
|
|
|
bdelete = UNLIKELY((pevents->events & (EPOLLERR | EPOLLHUP)) != 0) ||
|
|
LIKELY((pevents->events & (EPOLLIN | EPOLLRDHUP)) != 0 ? handler_event->handlerRead()
|
|
: handler_event->handlerWrite() == U_NOTIFIER_DELETE);
|
|
|
|
U_INTERNAL_DUMP("bdelete = %b", bdelete)
|
|
|
|
if (bdelete)
|
|
{
|
|
U_ClientImage_state = U_NOTIFY_DELETE;
|
|
|
|
handlerDelete(handler_event);
|
|
|
|
# if defined(U_EPOLLET_POSTPONE_STRATEGY)
|
|
if (bepollet) pevents->events = 0;
|
|
# endif
|
|
}
|
|
# if defined(U_EPOLLET_POSTPONE_STRATEGY)
|
|
else if (bepollet)
|
|
{
|
|
if (U_ClientImage_state != U_PLUGIN_HANDLER_AGAIN &&
|
|
LIKELY((pevents->events & (EPOLLIN | EPOLLRDHUP)) != 0))
|
|
{
|
|
bloop1 = true;
|
|
}
|
|
else
|
|
{
|
|
pevents->events = 0;
|
|
}
|
|
}
|
|
# endif
|
|
}
|
|
|
|
if (++i < nfd_ready)
|
|
{
|
|
++pevents;
|
|
|
|
goto loop0;
|
|
}
|
|
|
|
# ifdef U_EPOLLET_POSTPONE_STRATEGY
|
|
if (bepollet)
|
|
{
|
|
if (bloop1 == false)
|
|
{
|
|
bepollet_threshold += bepollet_threshold / 2;
|
|
|
|
U_INTERNAL_DUMP("bepollet_threshold = %u", bepollet_threshold)
|
|
}
|
|
else
|
|
{
|
|
do {
|
|
i = 0;
|
|
bloop1 = false;
|
|
pevents = events;
|
|
|
|
loop2: if (pevents->events)
|
|
{
|
|
handler_event = (UEventFd*)pevents->data.ptr;
|
|
|
|
U_INTERNAL_DUMP("i = %d handler_event->fd = %d", i, handler_event->fd)
|
|
|
|
// U_INTERNAL_ASSERT_DIFFERS(handler_event->fd, -1)
|
|
|
|
if (handler_event->fd != -1)
|
|
{
|
|
if (handler_event->handlerRead() == U_NOTIFIER_DELETE)
|
|
{
|
|
handlerDelete(handler_event);
|
|
|
|
pevents->events = 0;
|
|
}
|
|
else
|
|
{
|
|
if (U_ClientImage_state != U_PLUGIN_HANDLER_AGAIN) bloop1 = true;
|
|
else pevents->events = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (++i < nfd_ready)
|
|
{
|
|
++pevents;
|
|
|
|
goto loop2;
|
|
}
|
|
}
|
|
while (bloop1);
|
|
}
|
|
}
|
|
# endif
|
|
# endif
|
|
}
|
|
else if (nfd_ready == -1)
|
|
{
|
|
U_INTERNAL_DUMP("errno = %d num_connection = %u", errno, num_connection)
|
|
|
|
if (errno == EINTR &&
|
|
UInterrupt::event_signal_pending &&
|
|
(UInterrupt::callHandlerSignal(), UInterrupt::exit_loop_wait_event_for_signal) == false)
|
|
{
|
|
goto loop;
|
|
}
|
|
|
|
# if !defined(HAVE_EPOLL_WAIT) && !defined(HAVE_KQUEUE)
|
|
if (errno == EBADF) // there are fd that become not valid (it is possible if EPIPE)
|
|
{
|
|
removeBadFd();
|
|
|
|
if (num_connection) goto loop;
|
|
}
|
|
# endif
|
|
}
|
|
}
|
|
|
|
void UNotifier::waitForEvent()
|
|
{
|
|
U_TRACE_NO_PARAM(0, "UNotifier::waitForEvent()")
|
|
|
|
#ifdef DEBUG
|
|
++nwatches;
|
|
#endif
|
|
|
|
UEventTime* ptimeout;
|
|
|
|
loop:
|
|
U_gettimeofday // NB: optimization if it is enough a time resolution of one second...
|
|
|
|
last_event = u_now->tv_sec;
|
|
|
|
waitForEvent(ptimeout = UTimer::getTimeout());
|
|
|
|
if (nfd_ready == 0 &&
|
|
ptimeout != U_NULLPTR)
|
|
{
|
|
U_INTERNAL_ASSERT_EQUALS(UTimer::first->alarm, ptimeout)
|
|
|
|
U_gettimeofday // NB: optimization if it is enough a time resolution of one second...
|
|
|
|
last_event = u_now->tv_sec;
|
|
|
|
UTimer::callHandlerTimeout();
|
|
|
|
goto loop;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
void UNotifier::createMapFd()
|
|
{
|
|
U_TRACE_NO_PARAM(0, "UNotifier::createMapFd()")
|
|
|
|
U_INTERNAL_ASSERT_MAJOR(max_connection, 0)
|
|
U_INTERNAL_ASSERT_EQUALS(lo_map_fd, U_NULLPTR)
|
|
U_INTERNAL_ASSERT_EQUALS(hi_map_fd, U_NULLPTR)
|
|
|
|
lo_map_fd_len = 20+max_connection;
|
|
lo_map_fd = (UEventFd**) UMemoryPool::_malloc(&lo_map_fd_len, sizeof(UEventFd*), true);
|
|
|
|
U_INTERNAL_DUMP("lo_map_fd_len = %u", lo_map_fd_len)
|
|
|
|
typedef UGenericHashMap<unsigned int,UEventFd*> umapfd;
|
|
|
|
U_NEW(umapfd, hi_map_fd, umapfd);
|
|
|
|
hi_map_fd->allocate(max_connection);
|
|
}
|
|
|
|
bool UNotifier::isHandler(int fd)
|
|
{
|
|
U_TRACE(0, "UNotifier::isHandler(%d)", fd)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(fd, -1)
|
|
|
|
bool result = false;
|
|
|
|
U_INTERNAL_DUMP("num_connection = %u min_connection = %u", num_connection, min_connection)
|
|
|
|
if (num_connection > min_connection)
|
|
{
|
|
U_INTERNAL_ASSERT_POINTER(lo_map_fd)
|
|
U_INTERNAL_ASSERT_POINTER(hi_map_fd)
|
|
|
|
if (fd < (int32_t)lo_map_fd_len) result = (lo_map_fd[fd] != U_NULLPTR);
|
|
else
|
|
{
|
|
lock();
|
|
|
|
result = hi_map_fd->find((unsigned int)fd);
|
|
|
|
unlock();
|
|
}
|
|
}
|
|
|
|
U_RETURN(result);
|
|
}
|
|
|
|
bool UNotifier::setHandler(int fd)
|
|
{
|
|
U_TRACE(0, "UNotifier::setHandler(%d)", fd)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(fd, -1)
|
|
U_INTERNAL_ASSERT_POINTER(lo_map_fd)
|
|
U_INTERNAL_ASSERT_POINTER(hi_map_fd)
|
|
|
|
U_INTERNAL_DUMP("lo_map_fd_len = %u", lo_map_fd_len)
|
|
|
|
if (fd < (int32_t)lo_map_fd_len)
|
|
{
|
|
if (lo_map_fd[fd])
|
|
{
|
|
handler_event = lo_map_fd[fd];
|
|
|
|
U_INTERNAL_DUMP("handler_event = %p", handler_event)
|
|
|
|
U_RETURN(true);
|
|
}
|
|
}
|
|
|
|
bool result;
|
|
|
|
lock();
|
|
|
|
result = hi_map_fd->find((unsigned int)fd);
|
|
|
|
unlock();
|
|
|
|
if (result)
|
|
{
|
|
handler_event = hi_map_fd->elem();
|
|
|
|
U_INTERNAL_DUMP("handler_event = %p", handler_event)
|
|
|
|
U_RETURN(true);
|
|
}
|
|
|
|
U_RETURN(false);
|
|
}
|
|
|
|
void UNotifier::insert(UEventFd* item, int op)
|
|
{
|
|
U_TRACE(0, "UNotifier::insert(%p,%d)", item, op)
|
|
|
|
U_INTERNAL_ASSERT_POINTER(item)
|
|
|
|
int fd = item->fd;
|
|
|
|
U_INTERNAL_DUMP("fd = %d item->op_mask = %B num_connection = %d", fd, item->op_mask, num_connection)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(fd, -1)
|
|
|
|
U_INTERNAL_DUMP("lo_map_fd_len = %u", lo_map_fd_len)
|
|
|
|
if (fd < (int32_t)lo_map_fd_len) lo_map_fd[fd] = item;
|
|
else
|
|
{
|
|
lock();
|
|
|
|
hi_map_fd->insert((unsigned int)fd, item);
|
|
|
|
unlock();
|
|
}
|
|
|
|
#ifdef USE_LIBEVENT
|
|
U_INTERNAL_ASSERT_POINTER(u_ev_base)
|
|
|
|
int mask = EV_PERSIST | ((item->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0 ? EV_READ : EV_WRITE);
|
|
|
|
U_INTERNAL_DUMP("mask = %B", mask)
|
|
|
|
U_NEW(UEvent<UEventFd>, item->pevent, UEvent<UEventFd>(fd, mask, *item));
|
|
|
|
(void) UDispatcher::add(*(item->pevent));
|
|
#elif defined(HAVE_EPOLL_WAIT)
|
|
U_INTERNAL_ASSERT_MAJOR(epollfd, 0)
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
# ifdef DEBUG
|
|
if (op)
|
|
{
|
|
U_INTERNAL_ASSERT_EQUALS(item->op_mask & EPOLLRDHUP, 0)
|
|
}
|
|
# endif
|
|
|
|
struct epoll_event _events = { item->op_mask | op, { item } };
|
|
|
|
(void) U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", epollfd, EPOLL_CTL_ADD, fd, &_events);
|
|
#elif defined(HAVE_KQUEUE)
|
|
U_INTERNAL_ASSERT_MAJOR(kq, 0)
|
|
U_INTERNAL_ASSERT_MINOR(nkqevents, max_connection)
|
|
|
|
EV_SET(kqevents+nkqevents++, fd, ((item->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0) ? EVFILT_READ : EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, (void*)item);
|
|
#else
|
|
if ((item->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0)
|
|
{
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_read), 0)
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_write), 0)
|
|
|
|
FD_SET(fd, &fd_set_read);
|
|
|
|
U_INTERNAL_ASSERT(fd_read_cnt >= 0)
|
|
|
|
++fd_read_cnt;
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_read = %B", __FDS_BITS(&fd_set_read)[0])
|
|
# endif
|
|
}
|
|
else
|
|
{
|
|
U_INTERNAL_ASSERT_DIFFERS(item->op_mask & EPOLLOUT, 0)
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_write), 0)
|
|
|
|
FD_SET(fd, &fd_set_write);
|
|
|
|
U_INTERNAL_ASSERT(fd_write_cnt >= 0)
|
|
|
|
++fd_write_cnt;
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_write = %B", __FDS_BITS(&fd_set_write)[0])
|
|
# endif
|
|
}
|
|
|
|
if (fd_set_max <= fd) fd_set_max = fd + 1;
|
|
|
|
U_INTERNAL_DUMP("fd_set_max = %d", fd_set_max)
|
|
#endif
|
|
}
|
|
|
|
bool UNotifier::modify(UEventFd* item)
|
|
{
|
|
U_TRACE(0, "UNotifier::modify(%p)", item)
|
|
|
|
U_INTERNAL_ASSERT_POINTER(item)
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
int fd = item->fd;
|
|
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %B", fd, item->op_mask)
|
|
|
|
#ifdef USE_LIBEVENT
|
|
U_INTERNAL_ASSERT_POINTER(u_ev_base)
|
|
|
|
int mask = EV_PERSIST | ((item->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0 ? EV_READ : EV_WRITE);
|
|
|
|
U_INTERNAL_DUMP("mask = %B", mask)
|
|
|
|
UDispatcher::del(item->pevent);
|
|
|
|
U_DELETE(item->pevent)
|
|
|
|
U_NEW(UEvent<UEventFd>, item->pevent, UEvent<UEventFd>(fd, mask, *item));
|
|
|
|
(void) UDispatcher::add(*(item->pevent));
|
|
#elif defined(HAVE_EPOLL_WAIT)
|
|
U_INTERNAL_ASSERT_MAJOR(epollfd, 0)
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
struct epoll_event _events = { item->op_mask, { item } };
|
|
|
|
if (U_FF_SYSCALL(epoll_ctl, "%d,%d,%d,%p", epollfd, EPOLL_CTL_MOD, fd, &_events)) U_RETURN(false);
|
|
#elif defined(HAVE_KQUEUE)
|
|
U_INTERNAL_ASSERT_MAJOR(kq, 0)
|
|
U_INTERNAL_ASSERT_MINOR(nkqevents, max_connection)
|
|
|
|
int read_flags, write_flags;
|
|
|
|
if ((item->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0)
|
|
{
|
|
read_flags = EV_ADD | EV_ENABLE;
|
|
write_flags = EV_DELETE | EV_DISABLE;
|
|
}
|
|
else
|
|
{
|
|
U_INTERNAL_DUMP("op_mask = %d", item->op_mask)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(item->op_mask & EPOLLOUT, 0)
|
|
|
|
read_flags = EV_DELETE | EV_DISABLE;
|
|
write_flags = EV_ADD | EV_ENABLE;
|
|
}
|
|
|
|
EV_SET(kqevents+nkqevents++, fd, EVFILT_READ, read_flags, 0, 0, (void*)item);
|
|
EV_SET(kqevents+nkqevents++, fd, EVFILT_WRITE, write_flags, 0, 0, (void*)item);
|
|
#else
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_read = %B", __FDS_BITS(&fd_set_read)[0])
|
|
U_INTERNAL_DUMP("fd_set_write = %B", __FDS_BITS(&fd_set_write)[0])
|
|
#endif
|
|
|
|
if ((item->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0)
|
|
{
|
|
U_INTERNAL_ASSERT(FD_ISSET(fd, &fd_set_write))
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_read), 0)
|
|
|
|
FD_SET(fd, &fd_set_read);
|
|
FD_CLR(fd, &fd_set_write);
|
|
|
|
++fd_read_cnt;
|
|
--fd_write_cnt;
|
|
}
|
|
else
|
|
{
|
|
U_INTERNAL_DUMP("op_mask = %d", item->op_mask)
|
|
|
|
U_INTERNAL_ASSERT(FD_ISSET(fd, &fd_set_read))
|
|
U_INTERNAL_ASSERT_DIFFERS(item->op_mask & EPOLLOUT, 0)
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_write), 0)
|
|
|
|
FD_CLR(fd, &fd_set_read);
|
|
FD_SET(fd, &fd_set_write);
|
|
|
|
--fd_read_cnt;
|
|
++fd_write_cnt;
|
|
}
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_read = %B", __FDS_BITS(&fd_set_read)[0])
|
|
U_INTERNAL_DUMP("fd_set_write = %B", __FDS_BITS(&fd_set_write)[0])
|
|
#endif
|
|
|
|
U_INTERNAL_ASSERT(fd_read_cnt >= 0)
|
|
U_INTERNAL_ASSERT(fd_write_cnt >= 0)
|
|
#endif
|
|
|
|
U_RETURN(true);
|
|
}
|
|
|
|
void UNotifier::handlerDelete(unsigned int fd, int mask)
|
|
{
|
|
U_TRACE(0, "UNotifier::handlerDelete(%u,%B)", fd, mask)
|
|
|
|
U_INTERNAL_ASSERT_MAJOR(fd, 0)
|
|
U_INTERNAL_ASSERT_DIFFERS(U_ClientImage_parallelization, U_PARALLELIZATION_CHILD)
|
|
|
|
if (fd < lo_map_fd_len) lo_map_fd[fd] = U_NULLPTR;
|
|
else
|
|
{
|
|
lock();
|
|
|
|
(void) hi_map_fd->erase(fd);
|
|
|
|
unlock();
|
|
}
|
|
|
|
#if !defined(USE_LIBEVENT) && !defined(HAVE_EPOLL_WAIT) && !defined(HAVE_KQUEUE)
|
|
if ((mask & (EPOLLIN | EPOLLRDHUP)) != 0)
|
|
{
|
|
U_INTERNAL_ASSERT(FD_ISSET(fd, &fd_set_read))
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_write), 0)
|
|
|
|
FD_CLR(fd, &fd_set_read);
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_read = %B", __FDS_BITS(&fd_set_read)[0])
|
|
# endif
|
|
|
|
--fd_read_cnt;
|
|
|
|
U_INTERNAL_ASSERT(fd_read_cnt >= 0)
|
|
}
|
|
else
|
|
{
|
|
U_INTERNAL_ASSERT_DIFFERS(mask & EPOLLOUT, 0)
|
|
U_INTERNAL_ASSERT(FD_ISSET(fd, &fd_set_write))
|
|
U_INTERNAL_ASSERT_EQUALS(FD_ISSET(fd, &fd_set_read), 0)
|
|
|
|
FD_CLR(fd, &fd_set_write);
|
|
|
|
# ifndef _MSWINDOWS_
|
|
U_INTERNAL_DUMP("fd_set_write = %B", __FDS_BITS(&fd_set_write)[0])
|
|
# endif
|
|
|
|
--fd_write_cnt;
|
|
|
|
U_INTERNAL_ASSERT(fd_write_cnt >= 0)
|
|
}
|
|
|
|
if (empty() == false) fd_set_max = getNFDS();
|
|
#endif
|
|
}
|
|
|
|
void UNotifier::callForAllEntryDynamic(bPFpv function)
|
|
{
|
|
U_TRACE(0, "UNotifier::callForAllEntryDynamic(%p)", function)
|
|
|
|
U_INTERNAL_DUMP("num_connection = %u", num_connection)
|
|
|
|
U_INTERNAL_ASSERT_MAJOR(num_connection, min_connection)
|
|
|
|
UEventFd* item;
|
|
|
|
for (int fd = 1; fd < (int32_t)lo_map_fd_len; ++fd)
|
|
{
|
|
if ((item = lo_map_fd[fd]))
|
|
{
|
|
# ifdef DEBUG
|
|
if (UNLIKELY((const void*)item <= U_NULL_POINTER))
|
|
{
|
|
U_DEBUG("UNotifier::callForAllEntryDynamic(): lo_map_fd[%d] = %p", fd, item);
|
|
|
|
return;
|
|
}
|
|
# endif
|
|
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %B", item->fd, item->op_mask)
|
|
|
|
if (item->fd != fd ||
|
|
function(item))
|
|
{
|
|
handlerDelete(item);
|
|
|
|
U_INTERNAL_DUMP("num_connection = %u", num_connection)
|
|
|
|
if (num_connection == min_connection) return;
|
|
}
|
|
}
|
|
}
|
|
|
|
UGenericHashMap<unsigned int,UEventFd*>::UGenericHashMapNode* pnode;
|
|
|
|
if ((pnode = hi_map_fd->first()))
|
|
{
|
|
do {
|
|
item = hi_map_fd->elem();
|
|
|
|
U_INTERNAL_ASSERT_MAJOR(item->fd, 0)
|
|
|
|
if (function(item))
|
|
{
|
|
U_DEBUG("UNotifier::callForAllEntryDynamic(): hi_map_fd(%p) = %p %u", pnode, item, item->fd);
|
|
|
|
handlerDelete(item);
|
|
|
|
U_INTERNAL_DUMP("num_connection = %u", num_connection)
|
|
|
|
if (num_connection == min_connection) return;
|
|
}
|
|
}
|
|
while ((pnode = hi_map_fd->next(pnode)));
|
|
}
|
|
}
|
|
|
|
void UNotifier::clear()
|
|
{
|
|
U_TRACE_NO_PARAM(0, "UNotifier::clear()")
|
|
|
|
U_INTERNAL_DUMP("lo_map_fd = %p", lo_map_fd)
|
|
|
|
if (lo_map_fd == U_NULLPTR) return;
|
|
|
|
U_INTERNAL_ASSERT_POINTER(hi_map_fd)
|
|
U_INTERNAL_ASSERT_MAJOR(max_connection, 0)
|
|
|
|
UEventFd* item;
|
|
|
|
U_INTERNAL_DUMP("num_connection = %u", num_connection)
|
|
|
|
if (num_connection)
|
|
{
|
|
for (int fd = 1; fd < (int32_t)lo_map_fd_len; ++fd)
|
|
{
|
|
if ((item = lo_map_fd[fd]))
|
|
{
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %B", item->fd, item->op_mask)
|
|
|
|
if (item->fd != -1)
|
|
{
|
|
U_INTERNAL_ASSERT_EQUALS(item->fd, fd)
|
|
|
|
handlerDelete(item);
|
|
}
|
|
}
|
|
}
|
|
|
|
UGenericHashMap<unsigned int,UEventFd*>::UGenericHashMapNode* pnode;
|
|
|
|
if ((pnode = hi_map_fd->first()))
|
|
{
|
|
do {
|
|
item = hi_map_fd->elem();
|
|
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %B", item->fd, item->op_mask)
|
|
|
|
if (item->fd != -1) handlerDelete(item);
|
|
}
|
|
while ((pnode = hi_map_fd->next(pnode)));
|
|
}
|
|
}
|
|
|
|
UMemoryPool::_free(lo_map_fd, lo_map_fd_len, sizeof(UEventFd*));
|
|
|
|
hi_map_fd->deallocate();
|
|
|
|
U_DELETE(hi_map_fd)
|
|
|
|
#ifndef USE_LIBEVENT
|
|
# ifdef HAVE_EPOLL_WAIT
|
|
U_INTERNAL_ASSERT_POINTER(events)
|
|
|
|
UMemoryPool::_free(events, max_connection + 1, sizeof(struct epoll_event));
|
|
|
|
(void) U_FF_SYSCALL(close, "%d", epollfd);
|
|
# elif defined(HAVE_KQUEUE)
|
|
U_INTERNAL_ASSERT_MAJOR(kq, 0)
|
|
U_INTERNAL_ASSERT_POINTER(kqevents)
|
|
U_INTERNAL_ASSERT_POINTER(kqrevents)
|
|
|
|
UMemoryPool::_free(kqevents, max_connection, sizeof(struct kevent));
|
|
UMemoryPool::_free(kqrevents, max_connection, sizeof(struct kevent));
|
|
|
|
(void) U_FF_SYSCALL(close, "%d", kq);
|
|
# endif
|
|
#endif
|
|
}
|
|
|
|
#if !defined(USE_LIBEVENT) && !defined(HAVE_EPOLL_WAIT) && !defined(HAVE_KQUEUE)
|
|
int UNotifier::getNFDS() // nfds is the highest-numbered file descriptor in any of the three sets, plus 1
|
|
{
|
|
U_TRACE_NO_PARAM(0, "UNotifier::getNFDS()")
|
|
|
|
int nfds = 0;
|
|
|
|
U_INTERNAL_DUMP("fd_set_max = %d", fd_set_max)
|
|
|
|
for (int fd = 1; fd < fd_set_max; ++fd)
|
|
{
|
|
if (isHandler(fd))
|
|
{
|
|
U_INTERNAL_DUMP("fd = %d", fd)
|
|
|
|
if (nfds < fd) nfds = fd;
|
|
}
|
|
}
|
|
|
|
++nfds;
|
|
|
|
U_RETURN(nfds);
|
|
}
|
|
|
|
void UNotifier::removeBadFd()
|
|
{
|
|
U_TRACE_NO_PARAM(1, "UNotifier::removeBadFd()")
|
|
|
|
int nfd;
|
|
bool bwrite;
|
|
fd_set fdmask;
|
|
fd_set* rmask;
|
|
fd_set* wmask;
|
|
struct timeval polling = { 0L, 0L };
|
|
|
|
for (int fd = 1; fd < fd_set_max; ++fd)
|
|
{
|
|
if (setHandler(fd))
|
|
{
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %B handler_event = %p", handler_event->fd, handler_event->op_mask, handler_event)
|
|
|
|
bread = (fd_read_cnt && ((handler_event->op_mask & (EPOLLIN | EPOLLRDHUP)) != 0));
|
|
bwrite = (fd_write_cnt && ((handler_event->op_mask & EPOLLOUT) != 0));
|
|
|
|
FD_ZERO(&fdmask);
|
|
FD_SET(fd, &fdmask);
|
|
|
|
rmask = (bread ? &fdmask : 0);
|
|
wmask = (bwrite ? &fdmask : 0);
|
|
|
|
U_INTERNAL_DUMP("fdmask = %B", __FDS_BITS(&fdmask)[0])
|
|
|
|
nfd = U_FF_SYSCALL(select, "%d,%p,%p,%p,%p", fd+1, rmask, wmask, 0, &polling);
|
|
|
|
U_INTERNAL_DUMP("fd = %d op_mask = %B ISSET(read) = %b ISSET(write) = %b", fd, handler_event->op_mask,
|
|
(rmask ? FD_ISSET(fd, rmask) : false),
|
|
(wmask ? FD_ISSET(fd, wmask) : false))
|
|
|
|
if (nfd)
|
|
{
|
|
if (nfd == -1 &&
|
|
(bread || bwrite))
|
|
{
|
|
handlerDelete(handler_event);
|
|
}
|
|
else
|
|
{
|
|
notifyHandlerEvent();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
|
|
// param timeoutMS specified the timeout value, in milliseconds.
|
|
// A negative value indicates no timeout, i.e. an infinite wait
|
|
|
|
#ifdef HAVE_POLL_H
|
|
int UNotifier::waitForEvent(int timeoutMS)
|
|
{
|
|
U_TRACE(1, "UNotifier::waitForEvent(%d)", timeoutMS)
|
|
|
|
int ret;
|
|
|
|
#ifdef DEBUG
|
|
if (timeoutMS > 0) U_INTERNAL_ASSERT(timeoutMS >= 100)
|
|
#endif
|
|
|
|
loop:
|
|
/**
|
|
* The timeout argument specifies the minimum number of milliseconds that poll() will block (This interval will be rounded up
|
|
* to the system clock granularity, and kernel scheduling delays mean that the blocking interval may overrun by a small amount)
|
|
*
|
|
* Specifying a negative value in timeout means an infinite timeout
|
|
* Specifying a timeout of zero causes poll() to return immediately, even if no file descriptors are ready
|
|
*/
|
|
|
|
ret = U_FF_SYSCALL(poll, "%p,%d,%d", fds, 1, timeoutMS);
|
|
|
|
U_INTERNAL_DUMP("errno = %d", errno)
|
|
|
|
if (ret > 0) U_RETURN(ret);
|
|
if (ret == 0)
|
|
{
|
|
errno = EAGAIN;
|
|
|
|
U_RETURN(0);
|
|
}
|
|
|
|
if (errno == EINTR)
|
|
{
|
|
UInterrupt::checkForEventSignalPending();
|
|
|
|
U_INTERNAL_DUMP("flag_sigterm = %b", flag_sigterm)
|
|
|
|
if (flag_sigterm == false) goto loop;
|
|
}
|
|
|
|
U_RETURN(-1);
|
|
}
|
|
#endif
|
|
|
|
// param timeoutMS specified the timeout value, in milliseconds.
|
|
// a negative value indicates no timeout, i.e. an infinite wait
|
|
|
|
int UNotifier::waitForRead(int fd, int timeoutMS)
|
|
{
|
|
U_TRACE(0, "UNotifier::waitForRead(%d,%d)", fd, timeoutMS)
|
|
|
|
U_INTERNAL_ASSERT_RANGE(0,fd,64*1024)
|
|
|
|
#ifdef DEBUG
|
|
if (timeoutMS > 0) U_INTERNAL_ASSERT(timeoutMS >= 100)
|
|
#endif
|
|
|
|
#ifdef HAVE_POLL_H
|
|
// NB: POLLRDHUP stream socket peer closed connection, or ***** shut down writing half of connection ****
|
|
|
|
fds[0].fd = fd;
|
|
fds[0].events = POLLIN;
|
|
|
|
int ret = waitForEvent(timeoutMS);
|
|
|
|
// NB: we don't check for POLLERR or POLLHUP because we have problem in same case (command.test)
|
|
|
|
U_INTERNAL_DUMP("revents = %d %B", fds[0].revents, fds[0].revents)
|
|
#else
|
|
# ifdef _MSWINDOWS_
|
|
HANDLE h = is_pipe(fd);
|
|
|
|
if (h != INVALID_HANDLE_VALUE)
|
|
{
|
|
DWORD count = 0;
|
|
|
|
while (U_SYSCALL(PeekNamedPipe, "%p,%p,%ld,%p,%p,%p", h, 0, 0, 0, &count, 0) &&
|
|
count == 0 &&
|
|
timeoutMS > 0)
|
|
{
|
|
Sleep(1000);
|
|
|
|
timeoutMS -= 1000;
|
|
}
|
|
|
|
U_RETURN(count);
|
|
}
|
|
# endif
|
|
|
|
// If both fields of the timeval structure are zero, then select() returns immediately.
|
|
// (This is useful for polling). If ptime is NULL (no timeout), select() can block indefinitely...
|
|
|
|
UEventTime* ptime;
|
|
|
|
if (timeoutMS < 0) ptime = 0;
|
|
else (ptime = time_obj)->setTimeToExpireMS(timeoutMS);
|
|
|
|
fd_set fdmask;
|
|
FD_ZERO(&fdmask);
|
|
FD_SET(fd, &fdmask);
|
|
|
|
int ret = waitForEvent(fd + 1, &fdmask, 0, ptime);
|
|
#endif
|
|
|
|
U_RETURN(ret);
|
|
}
|
|
|
|
int UNotifier::waitForWrite(int fd, int timeoutMS)
|
|
{
|
|
U_TRACE(0, "UNotifier::waitForWrite(%d,%d)", fd, timeoutMS)
|
|
|
|
U_INTERNAL_ASSERT_RANGE(0, fd, 64*1024)
|
|
U_INTERNAL_ASSERT_DIFFERS(timeoutMS, 0)
|
|
|
|
#ifdef DEBUG
|
|
if (timeoutMS != -1) U_INTERNAL_ASSERT(timeoutMS >= 100)
|
|
#endif
|
|
|
|
#ifdef HAVE_POLL_H // NB: POLLRDHUP stream socket peer closed connection, or ***** shut down writing half of connection ****
|
|
fds[0].fd = fd;
|
|
fds[0].events = POLLOUT;
|
|
fds[0].revents = 0;
|
|
|
|
int ret = waitForEvent(timeoutMS);
|
|
|
|
// NB: POLLERR Error condition (output only)
|
|
// POLLHUP Hang up (output only)
|
|
|
|
U_INTERNAL_DUMP("revents = %d %B", fds[0].revents, fds[0].revents)
|
|
|
|
if (ret == 1 &&
|
|
(fds[0].revents & (POLLERR | POLLHUP)) != 0)
|
|
{
|
|
U_INTERNAL_ASSERT_EQUALS(::write(fd,fds,1), -1)
|
|
|
|
U_RETURN(-1);
|
|
}
|
|
#else
|
|
# ifdef _MSWINDOWS_
|
|
if (is_pipe(fd) != INVALID_HANDLE_VALUE) U_RETURN(1);
|
|
# endif
|
|
|
|
// If both fields of the timeval structure are zero, then select() returns immediately.
|
|
// (This is useful for polling). If ptime is NULL (no timeout), select() can block indefinitely...
|
|
|
|
UEventTime* ptime;
|
|
|
|
if (timeoutMS < 0) ptime = 0;
|
|
else (ptime = time_obj)->setTimeToExpireMS(timeoutMS);
|
|
|
|
fd_set fdmask;
|
|
FD_ZERO(&fdmask);
|
|
FD_SET(fd, &fdmask);
|
|
|
|
int ret = waitForEvent(fd + 1, 0, &fdmask, ptime);
|
|
#endif
|
|
|
|
U_RETURN(ret);
|
|
}
|
|
|
|
// param timeoutMS specified the timeout value, in milliseconds.
|
|
// a negative value indicates no timeout, i.e. an infinite wait
|
|
|
|
int UNotifier::read(int fd, char* buffer, int count, int timeoutMS)
|
|
{
|
|
U_TRACE(1, "UNotifier::read(%d,%p,%d,%d)", fd, buffer, count, timeoutMS)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(fd, -1)
|
|
|
|
if (timeoutMS != -1 &&
|
|
waitForRead(fd, timeoutMS) != 1)
|
|
{
|
|
U_RETURN(-1);
|
|
}
|
|
|
|
int iBytesRead;
|
|
|
|
loop:
|
|
#ifdef _MSWINDOWS_
|
|
(void) U_SYSCALL(ReadFile, "%p,%p,%lu,%p,%p", (HANDLE)_get_osfhandle(fd), buffer, count, (DWORD*)&iBytesRead, 0);
|
|
#else
|
|
iBytesRead = U_SYSCALL(read, "%d,%p,%u", fd, buffer, count);
|
|
#endif
|
|
|
|
if (iBytesRead > 0)
|
|
{
|
|
U_INTERNAL_DUMP("BytesRead(%d) = %#.*S", iBytesRead, iBytesRead, buffer)
|
|
|
|
U_RETURN(iBytesRead);
|
|
}
|
|
|
|
if (iBytesRead == -1)
|
|
{
|
|
if (errno == EAGAIN &&
|
|
timeoutMS != -1 &&
|
|
waitForRead(fd, timeoutMS) == 1)
|
|
{
|
|
goto loop;
|
|
}
|
|
|
|
if (errno == EINTR)
|
|
{
|
|
UInterrupt::checkForEventSignalPending();
|
|
|
|
U_INTERNAL_DUMP("flag_sigterm = %b", flag_sigterm)
|
|
|
|
if (flag_sigterm == false) goto loop;
|
|
}
|
|
}
|
|
|
|
U_RETURN(-1);
|
|
}
|
|
|
|
// param timeoutMS specified the timeout value, in milliseconds.
|
|
// A negative value indicates no timeout, i.e. an infinite wait
|
|
|
|
uint32_t UNotifier::write(int fd, const char* str, int count, int timeoutMS)
|
|
{
|
|
U_TRACE(1, "UNotifier::write(%d,%.*S,%d,%d)", fd, count, str, count, timeoutMS)
|
|
|
|
U_INTERNAL_ASSERT_DIFFERS(fd, -1)
|
|
|
|
ssize_t value;
|
|
int byte_written = 0;
|
|
|
|
do {
|
|
if (timeoutMS >= 0)
|
|
{
|
|
if (fd < 0) U_RETURN(U_NOT_FOUND);
|
|
|
|
if (waitForWrite(fd, timeoutMS) <= 0) break;
|
|
|
|
timeoutMS = -1; // NB: in this way it is only for the first write...
|
|
}
|
|
|
|
# ifdef _MSWINDOWS_
|
|
(void) U_SYSCALL(WriteFile, "%p,%p,%lu,%p,%p", (HANDLE)_get_osfhandle(fd), str + byte_written, count - byte_written, (DWORD*)&value, 0);
|
|
# else
|
|
value = U_SYSCALL(write, "%d,%S,%u", fd, str + byte_written, count - byte_written);
|
|
# endif
|
|
|
|
if (value == 0) break;
|
|
if (value == -1)
|
|
{
|
|
if (errno == EINTR)
|
|
{
|
|
UInterrupt::checkForEventSignalPending();
|
|
|
|
continue;
|
|
}
|
|
|
|
break;
|
|
}
|
|
|
|
U_INTERNAL_DUMP("BytesWritten(%d) = %#.*S", value, value, str + byte_written)
|
|
|
|
byte_written += value;
|
|
}
|
|
while (byte_written < count);
|
|
|
|
U_RETURN(byte_written);
|
|
}
|
|
|
|
// STREAM
|
|
|
|
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
|
|
const char* UNotifier::dump(bool reset) const
|
|
{
|
|
#ifdef USE_LIBEVENT
|
|
// nothing
|
|
#elif defined(HAVE_EPOLL_WAIT)
|
|
*UObjectIO::os << "epollfd " << epollfd << '\n';
|
|
#elif defined(HAVE_KQUEUE)
|
|
*UObjectIO::os << "kq " << kq << '\n';
|
|
#else
|
|
*UObjectIO::os << "fd_set_max " << fd_set_max << '\n'
|
|
<< "fd_read_cnt " << fd_read_cnt << '\n'
|
|
<< "fd_write_cnt " << fd_write_cnt << '\n';
|
|
*UObjectIO::os << "fd_set_read ";
|
|
char buffer[70];
|
|
UObjectIO::os->write(buffer, u__snprintf(buffer, U_CONSTANT_SIZE(buffer), U_CONSTANT_TO_PARAM("%B"), __FDS_BITS(&fd_set_read)[0]));
|
|
UObjectIO::os->put('\n');
|
|
*UObjectIO::os << "fd_set_write ";
|
|
UObjectIO::os->write(buffer, u__snprintf(buffer, U_CONSTANT_SIZE(buffer), U_CONSTANT_TO_PARAM("%B"), __FDS_BITS(&fd_set_write)[0]));
|
|
UObjectIO::os->put('\n');
|
|
#endif
|
|
*UObjectIO::os << "nfd_ready " << nfd_ready << '\n'
|
|
<< "last_event " << last_event << '\n'
|
|
<< "min_connection " << min_connection << '\n'
|
|
<< "num_connection " << num_connection << '\n'
|
|
<< "max_connection " << max_connection;
|
|
|
|
if (reset)
|
|
{
|
|
UObjectIO::output();
|
|
|
|
return UObjectIO::buffer_output;
|
|
}
|
|
|
|
return U_NULLPTR;
|
|
}
|
|
#endif
|