1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
This commit is contained in:
stefanocasazza 2018-04-30 16:11:09 +02:00
parent 1e58dc49d0
commit 07104f2479
18 changed files with 162 additions and 145 deletions

View File

@ -260,7 +260,7 @@ static void usp_init_wi_auth2()
if (rc->connect() == false)
{
U_ERROR("usp_fork_wi_auth2(): %V", rc->UClient_Base::getResponse().rep);
U_ERROR("usp_init_wi_auth2(): %V", rc->UClient_Base::getResponse().rep);
}
// ANAGRAFICA
@ -276,7 +276,10 @@ static void usp_fork_wi_auth2()
U_INTERNAL_ASSERT_POINTER(rc)
rc->UClient_Base::reOpen();
if (rc->UClient_Base::reConnect() == false)
{
U_ERROR("usp_fork_wi_auth2(): %V", rc->UClient_Base::getResponse().rep);
}
}
static void usp_end_wi_auth2()

View File

@ -17,11 +17,9 @@
#
# MIN_SIZE_FOR_SENDFILE for major size it is better to use sendfile() to serve static content
#
# LISTEN_BACKLOG max number of ready to be delivered connections to accept()
# SET_REALTIME_PRIORITY flag indicating that the preforked processes will be scheduled under the real-time policies SCHED_FIFO
#
# CLIENT_THRESHOLD min number of clients to active polling
# CLIENT_FOR_PARALLELIZATION min number of clients to active parallelization (dedicated process)
# LISTEN_BACKLOG max number of ready to be delivered connections to accept()
# CLIENT_THRESHOLD min number of clients to active polling
# SET_REALTIME_PRIORITY flag indicating that the preforked processes will be scheduled under the real-time policies SCHED_FIFO
#
# LOAD_BALANCE_CLUSTER list of comma separated IP address (IPADDR[/MASK]) to define the load balance cluster
# LOAD_BALANCE_DEVICE_NETWORK network interface name of cluster of physical server
@ -104,10 +102,8 @@ userver {
# MIN_SIZE_FOR_SENDFILE 500k
# LISTEN_BACKLOG 1024
# CLIENT_THRESHOLD 100
# SET_REALTIME_PRIORITY yes
# CLIENT_THRESHOLD 100
# CLIENT_FOR_PARALLELIZATION 10
# LOAD_BALANCE_CLUSTER 10.30.0.0/16
# LOAD_BALANCE_DEVICE_NETWORK eth1

View File

@ -40,9 +40,27 @@ class UProxyPlugIn;
class UNoCatPlugIn;
class UServer_Base;
class UHttpClient_Base;
class UClientImage_Base;
class UREDISClient_Base;
class UElasticSearchClient;
// manage client write to log
#ifdef U_LOG_DISABLE
# define U_CLIENT_LOG_RESPONSE() {}
# define U_CLIENT_LOG_REQUEST(n) {}
# define U_CLIENT_LOG( fmt,args...) {}
# define U_CLIENT_LOG_WITH_ADDR(fmt,args...) {}
#else
# define U_CLIENT_LOG( fmt,args...) { if (UClient_Base::log) UClient_Base::log->log(U_CONSTANT_TO_PARAM(fmt), ##args); }
# define U_CLIENT_LOG_WITH_ADDR(fmt,args...) { if (UClient_Base::log) UClient_Base::log->log(U_CONSTANT_TO_PARAM(fmt " %V%R"), ##args, UClient_Base::host_port.rep, 0); }
# define U_CLIENT_LOG_REQUEST(n) { if (UClient_Base::log) UClient_Base::log->log(UClient_Base::iov, "request", n, "", 0, U_CONSTANT_TO_PARAM(" to %V"), UClient_Base::host_port.rep); }
# define U_CLIENT_LOG_RESPONSE() { if (UClient_Base::log && UClient_Base::response) \
UClient_Base::log->logResponse(UClient_Base::response, U_CONSTANT_TO_PARAM(" from %V"), UClient_Base::host_port.rep); }
#endif
class U_EXPORT UClient_Base {
public:
@ -102,13 +120,15 @@ public:
if (isOpen()) socket->_close_socket();
}
void reOpen()
bool reConnect()
{
U_TRACE_NO_PARAM(0, "UClient_Base::reOpen()")
U_TRACE_NO_PARAM(0, "UClient_Base::reConnect()")
U_INTERNAL_ASSERT_POINTER(socket)
socket->reOpen();
return connect();
}
bool shutdown(int how = SHUT_WR)
@ -185,10 +205,23 @@ public:
unsigned int getPort() const { return port; }
bool connect();
bool remoteIPAddress(UIPAddress& addr);
bool readResponse(uint32_t count = U_SINGLE_READ);
bool setHostPort(const UString& host, unsigned int port);
bool remoteIPAddress(UIPAddress& addr)
{
U_TRACE(0, "UClient_Base::::remoteIPAddress(%p)", &addr)
if (socket->iRemotePort)
{
addr = socket->cRemoteAddress;
U_RETURN(true);
}
U_RETURN(false);
}
static void closeLog();
// NB: return if it has modified host or port...
@ -297,6 +330,7 @@ private:
friend class UNoCatPlugIn;
friend class UServer_Base;
friend class UHttpClient_Base;
friend class UClientImage_Base;
friend class UREDISClient_Base;
friend class UElasticSearchClient;
};

View File

@ -545,9 +545,9 @@ public:
# endif
}
// PARALLELIZATION (dedicated process for long-running task)
static uint32_t num_client_threshold;
static uint32_t num_client_for_parallelization, num_client_threshold;
// PARALLELIZATION (dedicated process for long-running task)
static void endNewChild() __noreturn;
static pid_t startNewChild();

View File

@ -2277,12 +2277,16 @@ case_R: /* extension: print msg - u_getSysError() */
*bp++ = ' ';
}
if (errno == 0) errno = u_errno;
if (errno == 0)
{
errno = u_errno;
u_errno = 0;
}
# ifdef _MSWINDOWS_
if (errno < 0)
{
errno = - errno;
errno = -errno;
cp = getSysError_w32((uint32_t*)&len);
@ -2797,7 +2801,8 @@ void u__printf(int fd, const char* format, uint32_t fmt_size, ...)
{
/* check if warning due to syscall */
if (u_flag_exit != 2 || errno == 0)
if (u_flag_exit != 2 ||
errno == 0)
{
struct iovec iov[1] = { { (caddr_t)buffer, bytes_written } };

View File

@ -290,65 +290,70 @@ time_t UTimeDate::getSecondFromDate(const char* str, bool gmt, struct tm* tm, co
tm->tm_sec = u__strtoul(str, 2);
}
else if ((tm->tm_mon = u_getMonth(str)))
{
/**
* Jan 25 11:54:00 2005 GMT
* | | | | | |
* 0 4 7 10 13 16
*/
str += 4;
tm->tm_mday = u_strtoulp(&str);
tm->tm_hour = u__strtoul(str, 2);
str += 3;
tm->tm_min = u__strtoul(str, 2);
str += 3;
tm->tm_sec = u__strtoul(str, 2);
str += 3;
tm->tm_year = u__strtoul(str, 4);
}
else
{
/**
* 100212124550Z (zulu time)
* | | | | | |
* 0 2 4 6 8 10
*/
if (u__isspace(*str)) while (u__isspace((*++str))) {}
tm->tm_mday = u__strtoul(str, 2);
if ((tm->tm_mon = u_getMonth(str)))
{
/**
* Jan 25 11:54:00 2005 GMT
* | | | | | |
* 0 4 7 10 13 16
*/
str += 2;
str += 4;
tm->tm_mon = u__strtoul(str, 2);
tm->tm_mday = u_strtoulp(&str);
str += 2;
tm->tm_hour = u__strtoul(str, 2);
tm->tm_year = u__strtoul(str, 2) + 2000; // ts.tm_year is number of years since 1900
str += 3;
// if (tm->tm_year > 2050) tm->tm_year -= 100;
tm->tm_min = u__strtoul(str, 2);
str += 2;
str += 3;
tm->tm_hour = u__strtoul(str, 2);
tm->tm_sec = u__strtoul(str, 2);
str += 2;
str += 3;
tm->tm_min = u__strtoul(str, 2);
tm->tm_year = u__strtoul(str, 4);
}
else
{
/**
* 100212124550Z (zulu time)
* | | | | | |
* 0 2 4 6 8 10
*/
str += 2;
tm->tm_mday = u__strtoul(str, 2);
tm->tm_sec = u__strtoul(str, 2);
str += 2;
// U_INTERNAL_ASSERT_EQUALS(str[2], 'Z')
tm->tm_mon = u__strtoul(str, 2);
str += 2;
tm->tm_year = u__strtoul(str, 2) + 2000; // ts.tm_year is number of years since 1900
// if (tm->tm_year > 2050) tm->tm_year -= 100;
str += 2;
tm->tm_hour = u__strtoul(str, 2);
str += 2;
tm->tm_min = u__strtoul(str, 2);
str += 2;
tm->tm_sec = u__strtoul(str, 2);
// U_INTERNAL_ASSERT_EQUALS(str[2], 'Z')
}
}
U_INTERNAL_DUMP("tm_year = %u tm_mon = %u tm_mday = %u tm_hour = %d tm_min = %d tm_sec = %d", tm->tm_year, tm->tm_mon, tm->tm_mday, tm->tm_hour, tm->tm_min, tm->tm_sec)

View File

@ -170,7 +170,7 @@ void UTrace::trace_syscall(const char* format, uint32_t fmt_size, ...)
#ifdef _MSWINDOWS_
SetLastError(0);
#endif
errno = u_errno = 0;
errno = 0;
}
void UTrace::trace_sysreturn(bool error, const char* format, uint32_t fmt_size, ...)

View File

@ -287,9 +287,7 @@ bool UClient_Base::connect()
response.snprintf(U_CONSTANT_TO_PARAM("Sorry, couldn't connect to server %v%R"), host_port.rep, 0); // NB: the last argument (0) is necessary...
#ifndef U_LOG_DISABLE
if (log) log->log(U_CONSTANT_TO_PARAM("%v"), response.rep);
#endif
U_CLIENT_LOG("%v", response.rep)
U_RETURN(false);
}
@ -326,20 +324,6 @@ bool UClient_Base::connectServer(const UString& _url)
U_RETURN(false);
}
bool UClient_Base::remoteIPAddress(UIPAddress& addr)
{
U_TRACE(0, "UClient_Base::::remoteIPAddress(%p)", &addr)
if (socket->iRemotePort)
{
addr = socket->cRemoteAddress;
U_RETURN(true);
}
U_RETURN(false);
}
bool UClient_Base::setUrl(const char* str, uint32_t len)
{
U_TRACE(0, "UClient_Base::setUrl(%.*S,%u)", len, str, len)
@ -435,9 +419,7 @@ bool UClient_Base::sendRequest(bool bread_response)
resend:
if (connect())
{
# ifndef U_LOG_DISABLE
if (log) log->log(iov, "request", ncount, "", 0, U_CONSTANT_TO_PARAM(" to %v"), host_port.rep);
# endif
U_CLIENT_LOG_REQUEST(ncount)
ok = (USocketExt::writev(socket, iov, iovcnt, ncount, timeoutMS, 1) == ncount);
@ -447,16 +429,12 @@ resend:
if (++counter <= 2)
{
# ifndef U_LOG_DISABLE
if (log) log->log(U_CONSTANT_TO_PARAM("failed attempts (%u) to sending data (%u bytes) to %V%R"), counter, ncount, host_port.rep, 0); // NB: the last argument (0) is necessary...
# endif
U_CLIENT_LOG_WITH_ADDR("failed attempts (%u) to sending data (%u bytes) to", counter, ncount)
goto resend;
}
# ifndef U_LOG_DISABLE
if (log) log->log(U_CONSTANT_TO_PARAM("error on sending data to %V%R"), host_port.rep, 0); // NB: the last argument (0) is necessary...
# endif
U_CLIENT_LOG_WITH_ADDR("error on sending data (%u bytes) to", ncount)
goto end;
}
@ -476,20 +454,12 @@ resend:
goto resend;
}
# ifndef U_LOG_DISABLE
if (log) log->log(U_CONSTANT_TO_PARAM("error on reading data from %V%R"), host_port.rep, 0); // NB: the last argument (0) is necessary...
# endif
U_CLIENT_LOG_WITH_ADDR("error on reading data from")
goto end;
}
# ifndef U_LOG_DISABLE
if (log &&
response)
{
log->logResponse(response, U_CONSTANT_TO_PARAM(" from %V"), host_port.rep);
}
# endif
U_CLIENT_LOG_RESPONSE()
reset();
}
@ -549,13 +519,7 @@ bool UClient_Base::readResponse(uint32_t count)
if (USocketExt::read(socket, response, count, timeoutMS))
{
# ifndef U_LOG_DISABLE
if (log &&
response)
{
log->logResponse(response, U_CONSTANT_TO_PARAM(" from %V"), host_port.rep);
}
# endif
U_CLIENT_LOG_RESPONSE()
U_RETURN(true);
}
@ -577,13 +541,7 @@ bool UClient_Base::readHTTPResponse()
{
if (UHTTP::readBodyResponse(socket, &buffer, response) == false) U_RETURN(false);
# ifndef U_LOG_DISABLE
if (log &&
response)
{
log->logResponse(response, U_CONSTANT_TO_PARAM(" from %V"), host_port.rep);
}
# endif
U_CLIENT_LOG_RESPONSE()
}
U_RETURN(true);

View File

@ -14,17 +14,18 @@
#include <ulib/utility/uhttp.h>
#include <ulib/utility/websocket.h>
#ifdef HAVE_SCHED_GETCPU
# include <sched.h>
#endif
#ifndef U_HTTP2_DISABLE
# include <ulib/utility/http2.h>
#endif
#ifdef U_SERVER_CHECK_TIME_BETWEEN_REQUEST
# include <ulib/net/client/client.h>
# define U_NUM_CLIENT_THRESHOLD 128
#endif
#ifdef HAVE_SCHED_GETCPU
# include <sched.h>
#endif
int UClientImage_Base::idx;
int UClientImage_Base::iovcnt;
bool UClientImage_Base::bIPv6;
@ -1161,7 +1162,8 @@ bool UClientImage_Base::genericRead()
#ifdef U_SERVER_CHECK_TIME_BETWEEN_REQUEST
if (U_ClientImage_advise_for_parallelization)
{
if (checkRequestToCache() == 2 &&
if (checkRequestToCache() == 2 &&
UClient_Base::csocket == U_NULLPTR &&
UServer_Base::startParallelization(U_NUM_CLIENT_THRESHOLD))
{
// parent

View File

@ -957,7 +957,7 @@ void UNoCatPlugIn::checkSystem()
end:
U_RESET_MODULE_NAME;
u_errno = errno = EINTR;
errno = EINTR;
flag_check_system = false;

View File

@ -118,7 +118,6 @@ uint32_t UServer_Base::client_address_len;
uint32_t UServer_Base::document_root_size;
uint32_t UServer_Base::num_client_threshold;
uint32_t UServer_Base::min_size_for_sendfile;
uint32_t UServer_Base::num_client_for_parallelization;
sigset_t UServer_Base::mask;
UString* UServer_Base::host;
UString* UServer_Base::server;
@ -2200,14 +2199,13 @@ void UServer_Base::loadConfigParam()
set_tcp_keep_alive = cfg->readBoolean(U_CONSTANT_TO_PARAM("TCP_KEEP_ALIVE"));
set_realtime_priority = cfg->readBoolean(U_CONSTANT_TO_PARAM("SET_REALTIME_PRIORITY"), true);
crash_count = cfg->readLong(U_CONSTANT_TO_PARAM("CRASH_COUNT"), 5);
tcp_linger_set = cfg->readLong(U_CONSTANT_TO_PARAM("TCP_LINGER_SET"), -2);
USocket::iBackLog = cfg->readLong(U_CONSTANT_TO_PARAM("LISTEN_BACKLOG"), SOMAXCONN);
min_size_for_sendfile = cfg->readLong(U_CONSTANT_TO_PARAM("MIN_SIZE_FOR_SENDFILE"), 500 * 1024); // 500k: for major size we assume is better to use sendfile()
num_client_threshold = cfg->readLong(U_CONSTANT_TO_PARAM("CLIENT_THRESHOLD"));
UNotifier::max_connection = cfg->readLong(U_CONSTANT_TO_PARAM("MAX_KEEP_ALIVE"));
u_printf_string_max_length = cfg->readLong(U_CONSTANT_TO_PARAM("LOG_MSG_SIZE"));
num_client_for_parallelization = cfg->readLong(U_CONSTANT_TO_PARAM("CLIENT_FOR_PARALLELIZATION"));
crash_count = cfg->readLong(U_CONSTANT_TO_PARAM("CRASH_COUNT"), 5);
tcp_linger_set = cfg->readLong(U_CONSTANT_TO_PARAM("TCP_LINGER_SET"), -2);
USocket::iBackLog = cfg->readLong(U_CONSTANT_TO_PARAM("LISTEN_BACKLOG"), SOMAXCONN);
min_size_for_sendfile = cfg->readLong(U_CONSTANT_TO_PARAM("MIN_SIZE_FOR_SENDFILE"), 500 * 1024); // 500k: for major size we assume is better to use sendfile()
num_client_threshold = cfg->readLong(U_CONSTANT_TO_PARAM("CLIENT_THRESHOLD"));
UNotifier::max_connection = cfg->readLong(U_CONSTANT_TO_PARAM("MAX_KEEP_ALIVE"));
u_printf_string_max_length = cfg->readLong(U_CONSTANT_TO_PARAM("LOG_MSG_SIZE"));
#ifdef USERVER_UDP
if (budp &&
@ -3459,12 +3457,10 @@ next:
UNotifier::max_connection = (UNotifier::max_connection ? UNotifier::max_connection : USocket::iBackLog) + (UNotifier::num_connection = UNotifier::min_connection);
if (num_client_threshold == 0) num_client_threshold = U_NOT_FOUND;
if (num_client_for_parallelization == 0) num_client_for_parallelization = UNotifier::max_connection / 2;
}
U_INTERNAL_DUMP("UNotifier::max_connection = %u UNotifier::min_connection = %u num_client_for_parallelization = %u num_client_threshold = %u",
UNotifier::max_connection, UNotifier::min_connection, num_client_for_parallelization, num_client_threshold)
U_INTERNAL_DUMP("UNotifier::max_connection = %u UNotifier::min_connection = %u num_client_threshold = %u",
UNotifier::max_connection, UNotifier::min_connection, num_client_threshold)
pthis->preallocate();
@ -4946,6 +4942,13 @@ bool UServer_Base::startParallelization(uint32_t nclient)
{
if (isParallelizationGoingToStart(nclient))
{
# ifdef DEBUG
if (UClient_Base::csocket)
{
U_WARNING("after forking you can have problem with the shared db connection...");
}
# endif
pid_t pid = startNewChild();
if (pid > 0)

View File

@ -1009,7 +1009,7 @@ void USocket::setMsgError()
if (isSysError())
{
u_errno = errno = -iState;
errno = -iState;
(void) u__snprintf(u_buffer, U_BUFFER_SIZE, U_CONSTANT_TO_PARAM("%#R"), 0); // NB: the last argument (0) is necessary...
}
@ -1083,7 +1083,7 @@ ok: setLocal();
if (error == 0) goto ok;
iState = -(u_errno = errno = error);
iState = -(errno = error);
}
else if (result == 0)
{
@ -1091,9 +1091,8 @@ ok: setLocal();
_close_socket();
iState = TIMEOUT;
errno =
u_errno = ETIMEDOUT;
errno = ETIMEDOUT;
iState = TIMEOUT;
}
U_RETURN(false);

View File

@ -1275,8 +1275,7 @@ loop:
if (ret > 0) U_RETURN(ret);
if (ret == 0)
{
errno =
u_errno = EAGAIN;
errno = EAGAIN;
U_RETURN(0);
}

View File

@ -303,12 +303,14 @@ int USocketExt::write(USocket* sk, const char* ptr, uint32_t count, int timeoutM
int byte_written = 0;
write:
/*
if (sk->isBlocking() &&
timeoutMS != 0 &&
(errno = 0, UNotifier::waitForWrite(sk->iSockDesc, timeoutMS) != 1))
{
goto error;
}
*/
value = sk->send(ptr + byte_written, count);
@ -316,7 +318,8 @@ write:
{
if (value == -1)
{
error: U_INTERNAL_DUMP("errno = %d", errno)
//error:
U_INTERNAL_DUMP("errno = %d", errno)
if (errno != EAGAIN) sk->abortive_close();
else if (timeoutMS != 0)
@ -373,12 +376,14 @@ int USocketExt::sendfile(USocket* sk, int in_fd, off_t* poffset, off_t count, in
int byte_written = 0;
loop:
/*
if (sk->isBlocking() &&
timeoutMS != 0 &&
(errno = 0, UNotifier::waitForWrite(sk->iSockDesc, timeoutMS) != 1))
{
goto error;
}
*/
#if defined(HAVE_MACOSX_SENDFILE)
/**
@ -408,7 +413,8 @@ loop:
{
if (value == -1)
{
error: U_INTERNAL_DUMP("errno = %d", errno)
//error:
U_INTERNAL_DUMP("errno = %d", errno)
if (errno != EAGAIN)
{
@ -499,12 +505,14 @@ int USocketExt::_writev(USocket* sk, struct iovec* iov, int iovcnt, uint32_t cou
#endif
loop:
/*
if (sk->isBlocking() &&
timeoutMS != 0 &&
(errno = 0, UNotifier::waitForWrite(sk->iSockDesc, timeoutMS) != 1))
{
goto error;
}
*/
#if defined(USE_LIBSSL) && !defined(_MSWINDOWS_)
if (sk->isSSLActive())
@ -527,7 +535,8 @@ check:
{
if (value == -1)
{
error: U_INTERNAL_DUMP("errno = %d", errno)
//error:
U_INTERNAL_DUMP("errno = %d", errno)
if (errno != EAGAIN)
{

View File

@ -2440,7 +2440,11 @@ U_NO_EXPORT bool UHTTP::readBodyRequest()
U_RETURN(false);
}
if (UServer_Base::startParallelization()) U_RETURN(false); // parent
if (UClient_Base::csocket == U_NULLPTR && // NB: after forking we can have problem with the shared db connection...
UServer_Base::startParallelization())
{
U_RETURN(false); // parent
}
// NB: wait for other data to complete the read of the request...

View File

@ -13,7 +13,7 @@ rm -f /tmp/uclient.log \
trace.*userver_*.[0-9]* object.*userver_*.[0-9]* stack.*userver_*.[0-9]* mempool.*userver_*.[0-9]* \
$DOC_ROOT/trace.*userver_*.[0-9]* $DOC_ROOT/object.*userver_*.[0-9]* $DOC_ROOT/stack.*userver_*.[0-9]* $DOC_ROOT/mempool.*userver_*.[0-9]*
UTRACE="0 100M -1"
#UTRACE="0 100M -1"
UTRACE_FOLDER=/tmp
TMPDIR=/tmp
#UOBJDUMP="0 10M 100"

View File

@ -12,7 +12,7 @@ rm -f out/userver_tcp.out err/wi-auth2.err err/uclient.err \
trace.*userver_*.[0-9]* object.*userver_*.[0-9]* stack.*userver_*.[0-9]* mempool.*userver_*.[0-9]* \
$DOC_ROOT/trace.*userver_*.[0-9]* $DOC_ROOT/object.*userver_*.[0-9]* $DOC_ROOT/stack.*userver_*.[0-9]* $DOC_ROOT/mempool.*userver_*.[0-9]*
#UTRACE="0 100M -1"
UTRACE="0 100M -1"
UTRACE_FOLDER=/tmp
TMPDIR=/tmp
#UOBJDUMP="0 10M 100"