diff --git a/include/ulib/event/event_db.h b/include/ulib/event/event_db.h index 3d25d011..1ca2ec1c 100644 --- a/include/ulib/event/event_db.h +++ b/include/ulib/event/event_db.h @@ -16,11 +16,7 @@ #include -#ifdef DEBUG -# define U_QUERY_INFO_SZ 168 -#else -# define U_QUERY_INFO_SZ 200 -#endif +#define U_QUERY_INFO_SZ 168 class UServer_Base; class UClientImage_Base; @@ -71,17 +67,20 @@ protected: typedef struct query_info { vPFpvu handlerResult; UClientImage_Base* pClientImage; + uint16_t num_query; # ifdef DEBUG uint32_t timestamp; # endif - uint16_t num_query; } query_info; void* conn; uint8_t* pbusy; - query_info query[U_QUERY_INFO_SZ]; uint16_t num_result, num_handler; bool bsend, bnotifier; + query_info query[U_QUERY_INFO_SZ]; +#ifdef DEBUG + const void* pthis; +#endif #ifdef DEBUG static uint32_t max_num_handler; diff --git a/include/ulib/net/client/redis.h b/include/ulib/net/client/redis.h index ee01d7e9..6a77a9d3 100644 --- a/include/ulib/net/client/redis.h +++ b/include/ulib/net/client/redis.h @@ -14,6 +14,7 @@ #ifndef ULIB_REDIS_H #define ULIB_REDIS_H 1 +#include #include /** @@ -59,7 +60,7 @@ typedef void (*vPFcs)(const UString&); -class U_EXPORT UREDISClient_Base : public UClient_Base { +class U_EXPORT UREDISClient_Base : public UClient_Base, UEventFd { public: ~UREDISClient_Base() @@ -670,31 +671,6 @@ public: host, port, keylen, key, destination_db, timeout_ms, COPY ? "COPY" : "", REPLACE ? "REPLACE" : "")); } - // PUB/SUB (@see http://redis.io/pubsub) - - bool publish(const char* channel, uint32_t channel_len, const char* msg, uint32_t msg_len) // Posts a message to the given channel - { - U_TRACE(0, "UREDISClient_Base::publish(%.*S,%u,%.*S,%u)", channel_len, channel, channel_len, msg_len, msg, msg_len) - - if (processRequest(U_RC_INT, U_CONSTANT_TO_PARAM("PUBLISH"), channel, channel_len, msg, msg_len)) return getBool(); - - U_RETURN(false); - } - - bool subscribe(const char* param, uint32_t len) // Listen for messages published to the given channels - { - U_TRACE(0, "UREDISClient_Base::subscribe(%.*S,%u)", len, param, len) - - return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), param, len); - } - - bool unsubscribe(const char* param, uint32_t len) // Stop listening for messages posted to the given channels - { - U_TRACE(0, "UREDISClient_Base::unsubscribe(%.*S,%u)", len, param, len) - - return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), param, len); - } - // LIST (@see http://redis.io/list) bool lrange(const char* param, uint32_t len) // Get a range of elements from a list @@ -723,6 +699,59 @@ public: (withPayloads ? U_CONSTANT_SIZE("WITHPAYLOADS") : 0), "WITHPAYLOADS")); } + // PUB/SUB (@see http://redis.io/pubsub) + + bool publish(const char* channel, uint32_t channel_len, const char* msg, uint32_t msg_len) // Posts a message to the given channel + { + U_TRACE(0, "UREDISClient_Base::publish(%.*S,%u,%.*S,%u)", channel_len, channel, channel_len, msg_len, msg, msg_len) + + if (processRequest(U_RC_INT, U_CONSTANT_TO_PARAM("PUBLISH"), channel, channel_len, msg, msg_len)) return getBool(); + + U_RETURN(false); + } + + bool subscribe(const char* param, uint32_t len) // Listen for messages published to the given channels + { + U_TRACE(0, "UREDISClient_Base::subscribe(%.*S,%u)", len, param, len) + + return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), param, len); + } + + bool unsubscribe(const char* param, uint32_t len) // Stop listening for messages posted to the given channels + { + U_TRACE(0, "UREDISClient_Base::unsubscribe(%.*S,%u)", len, param, len) + + return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), param, len); + } + + // define method VIRTUAL of class UEventFd + + virtual int handlerRead() U_DECL_FINAL + { + U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerRead()") + + U_RETURN(U_NOTIFIER_OK); + } + + virtual void handlerDelete() U_DECL_FINAL + { + U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerDelete()") + + U_INTERNAL_DUMP("UEventFd::fd = %d", UEventFd::fd) + + UEventFd::fd = -1; + } + + void listenForEvents() + { + U_TRACE_NO_PARAM(0, "UREDISClient_Base::listenForEvents()") + + UEventFd::op_mask |= EPOLLET; + UEventFd::op_mask &= ~EPOLLRDHUP; + + UNotifier::insert(this, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel... + } + #if defined(U_STDCPP_ENABLE) && defined(DEBUG) const char* dump(bool reset) const; #endif diff --git a/src/ulib/event/event_db.cpp b/src/ulib/event/event_db.cpp index 6bc0ea54..7a1ff96f 100644 --- a/src/ulib/event/event_db.cpp +++ b/src/ulib/event/event_db.cpp @@ -36,6 +36,12 @@ UEventDB::UEventDB() bsend = false; bnotifier = true; + +#ifdef DEBUG + pthis = (void*)U_CHECK_MEMORY_SENTINEL; +#endif + +// U_WARNING("UEventDB::UEventDB(): sizeof(UEventDB) = %u sizeof(query_info) = %u", sizeof(UEventDB), sizeof(query_info)); // sizeof(UEventDB) = 4072|4088 sizeof(query_info) = 24 } UEventDB::~UEventDB() @@ -73,18 +79,18 @@ void UEventDB::reset() U_CHECK_MEMORY - U_INTERNAL_DUMP("num_handler = %u", num_handler) - - U_INTERNAL_ASSERT_MINOR(num_handler, U_QUERY_INFO_SZ) - #ifdef U_STATIC_ORM_DRIVER_PGSQL bsend = false; - num_result = 0; + U_INTERNAL_DUMP("num_handler = %u", num_handler) if (num_handler) { - U_DEBUG("UEventDB::reset(): num_handler = %u", num_handler); + U_DEBUG("UEventDB::reset(): num_result(%u), num_handler(%u)%s", num_result, num_handler, pthis == (void*)U_CHECK_MEMORY_SENTINEL ? "" : " ABW on query array"); + + U_INTERNAL_ASSERT_MINOR(num_handler, U_QUERY_INFO_SZ) + + U_SYSCALL_VOID(PQresetQueue, "%p", (PGconn*)conn); for (uint32_t i = 0; i < num_handler; ++i) { @@ -102,10 +108,12 @@ void UEventDB::reset() } } - num_handler = 0; + (void) U_SYSCALL(memset, "%p,%d,%u", query, 0, num_handler * sizeof(query_info)); - U_SYSCALL_VOID(PQresetQueue, "%p", (PGconn*)conn); + num_handler = 0; } + + num_result = 0; #endif } @@ -130,8 +138,11 @@ void UEventDB::handlerQuery(vPFpvu handler, uint32_t num_query) pquery->handlerResult = handler; pquery->pClientImage = UServer_Base::pClientImage; pquery->num_query = num_query; + #ifdef DEBUG - pquery->timestamp = u_now->tv_sec; + pquery->timestamp = u_now->tv_sec; + + U_ASSERT_MACRO(pthis == (void*)U_CHECK_MEMORY_SENTINEL, "ABW on query array", "") #endif num_result += num_query; @@ -168,6 +179,9 @@ void UEventDB::handlerQuery(vPFpvu handler, uint32_t num_query) *pbusy = false; + U_INTERNAL_ASSERT_EQUALS(num_result, 0) + U_INTERNAL_ASSERT_EQUALS(num_handler, 0) + return; } @@ -283,17 +297,20 @@ read: } } - U_DEBUG("UEventDB::handlerRead(): vresult_size = %u num_result = %u pid = %u conn->errorMessage = %S", vresult_size, num_result, pid, PQerrorMessage((PGconn*)conn)); + U_DEBUG("UEventDB::handlerRead(): vresult_size = %u num_result = %u num_handler = %u pid = %u conn->errorMessage = %S", + vresult_size, num_result, num_handler, pid, PQerrorMessage((PGconn*)conn)); next: *pbusy = false; - n = U_min(num_handler, vresult_size); + n = U_min(num_result, vresult_size); U_INTERNAL_DUMP("n = %u", n) for (k = i = 0; k < n; ++i) { + U_INTERNAL_ASSERT_MINOR(i, num_handler) + bopen = (pquery = (query+i))->pClientImage->isOpen(); # ifdef DEBUG @@ -337,7 +354,7 @@ next: } } - U_INTERNAL_DUMP("i = %u k = %u vresult_size = %u num_handler = %u", i+1, k, vresult_size, num_handler) + U_INTERNAL_DUMP("i = %u k = %u vresult_size = %u num_result = %u num_handler = %u", i+1, k, vresult_size, num_result, num_handler) } for (; i < num_handler; ++i) diff --git a/src/ulib/utility/uhttp.cpp b/src/ulib/utility/uhttp.cpp index 9c6a2ebd..5c2d8241 100644 --- a/src/ulib/utility/uhttp.cpp +++ b/src/ulib/utility/uhttp.cpp @@ -7527,15 +7527,26 @@ U_NO_EXPORT __pure uint32_t UHTTP::getPosPasswd(UString& fpasswd, const UString& { U_TRACE(0, "UHTTP::getPosPasswd(%V,%V)", fpasswd.rep, line.rep) + U_INTERNAL_ASSERT(fpasswd) + uint32_t pos = fpasswd.find(line); - if (pos == U_NOT_FOUND || - (pos > 0 && fpasswd[pos-1] != '\n')) + if (pos == U_NOT_FOUND) U_RETURN(U_NOT_FOUND); + + if (pos == 0 || + fpasswd[pos-1] == '\n') { - U_RETURN(U_NOT_FOUND); + U_RETURN(pos); } - U_RETURN(pos); + while (true) + { + pos = fpasswd.find(line, pos+1); + + if (pos == U_NOT_FOUND) U_RETURN(U_NOT_FOUND); + + if (fpasswd[pos-1] == '\n') U_RETURN(pos); + } } U_NO_EXPORT uint32_t UHTTP::checkPasswd(UHTTP::UFileCacheData* ptr_file_data, UString& fpasswd, const UString& line) @@ -7715,37 +7726,41 @@ void UHTTP::setPasswdUser(UString& fpasswd, const UString& username, const UStri { U_TRACE(0, "UHTTP::setPasswdUser(%V,%V,%V)", fpasswd.rep, username.rep, password.rep) - UString buffer(U_CAPACITY), hash(1000U), user_token(U_CAPACITY); - - if (digest_authentication) + if (username && + password) { - // s.casazza:Protected Area:b9ee2af50be37...........\n + UString buffer(U_CAPACITY), hash(1000U), user_token(U_CAPACITY); - buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep); + if (digest_authentication) + { + // s.casazza:Protected Area:b9ee2af50be37...........\n - UServices::generateDigest(U_HASH_MD5, 0, buffer+password, hash); + buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep); - user_token.snprintf(U_CONSTANT_TO_PARAM("%v\n"), buffer.rep, hash.rep); - } - else - { - // s.casazza:{SHA}Lkii1ZE7k.....\n + UServices::generateDigest(U_HASH_MD5, 0, buffer+password, hash); - buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep); + user_token.snprintf(U_CONSTANT_TO_PARAM("%v%v\n"), buffer.rep, hash.rep); + } + else + { + // s.casazza:{SHA}Lkii1ZE7k.....\n - UServices::generateDigest(U_HASH_SHA1, 0, password, hash, true); + buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep); - user_token.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}%v\n"), username.rep, hash.rep); - } + UServices::generateDigest(U_HASH_SHA1, 0, password, hash, true); - uint32_t pos_begin = getPosPasswd(fpasswd, buffer); + user_token.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}%v\n"), username.rep, hash.rep); + } - if (pos_begin == U_NOT_FOUND) (void) fpasswd.append(user_token); - else - { - uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin; + uint32_t pos_begin = getPosPasswd(fpasswd, buffer); - (void) fpasswd.replace(pos_begin, pos_end, user_token); + if (pos_begin == U_NOT_FOUND) (void) fpasswd.append(user_token); + else + { + uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin+1; + + (void) fpasswd.replace(pos_begin, pos_end, user_token); + } } } @@ -7753,20 +7768,24 @@ bool UHTTP::revokePasswdUser(UString& fpasswd, const UString& username) // Remov { U_TRACE(0, "UHTTP::revokePasswdUser(%V,%V)", fpasswd.rep, username.rep) - UString buffer(U_CAPACITY); - - if (digest_authentication) buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep); // s.casazza:Protected Area:b9ee2af50be37...........\n - else buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep); // s.casazza:{SHA}Lkii1ZE7k.....\n - - uint32_t pos_begin = getPosPasswd(fpasswd, buffer); - - if (pos_begin != U_NOT_FOUND) + if (fpasswd && + username) { - uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin; + UString buffer(U_CAPACITY); - (void) fpasswd.erase(pos_begin, pos_end); + if (digest_authentication) buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep); // s.casazza:Protected Area:b9ee2af50be37...........\n + else buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep); // s.casazza:{SHA}Lkii1ZE7k.....\n - U_RETURN(true); + uint32_t pos_begin = getPosPasswd(fpasswd, buffer); + + if (pos_begin != U_NOT_FOUND) + { + uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin; + + (void) fpasswd.erase(pos_begin, pos_end); + + U_RETURN(true); + } } U_RETURN(false);