1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
This commit is contained in:
stefanocasazza 2018-08-24 18:39:45 +02:00
parent c31819c15f
commit 8c411d9fb1
4 changed files with 145 additions and 81 deletions

View File

@ -16,11 +16,7 @@
#include <ulib/event/event_fd.h>
#ifdef DEBUG
# define U_QUERY_INFO_SZ 168
#else
# define U_QUERY_INFO_SZ 200
#endif
#define U_QUERY_INFO_SZ 168
class UServer_Base;
class UClientImage_Base;
@ -71,17 +67,20 @@ protected:
typedef struct query_info {
vPFpvu handlerResult;
UClientImage_Base* pClientImage;
uint16_t num_query;
# ifdef DEBUG
uint32_t timestamp;
# endif
uint16_t num_query;
} query_info;
void* conn;
uint8_t* pbusy;
query_info query[U_QUERY_INFO_SZ];
uint16_t num_result, num_handler;
bool bsend, bnotifier;
query_info query[U_QUERY_INFO_SZ];
#ifdef DEBUG
const void* pthis;
#endif
#ifdef DEBUG
static uint32_t max_num_handler;

View File

@ -14,6 +14,7 @@
#ifndef ULIB_REDIS_H
#define ULIB_REDIS_H 1
#include <ulib/notifier.h>
#include <ulib/net/client/client.h>
/**
@ -59,7 +60,7 @@
typedef void (*vPFcs)(const UString&);
class U_EXPORT UREDISClient_Base : public UClient_Base {
class U_EXPORT UREDISClient_Base : public UClient_Base, UEventFd {
public:
~UREDISClient_Base()
@ -670,31 +671,6 @@ public:
host, port, keylen, key, destination_db, timeout_ms, COPY ? "COPY" : "", REPLACE ? "REPLACE" : ""));
}
// PUB/SUB (@see http://redis.io/pubsub)
bool publish(const char* channel, uint32_t channel_len, const char* msg, uint32_t msg_len) // Posts a message to the given channel
{
U_TRACE(0, "UREDISClient_Base::publish(%.*S,%u,%.*S,%u)", channel_len, channel, channel_len, msg_len, msg, msg_len)
if (processRequest(U_RC_INT, U_CONSTANT_TO_PARAM("PUBLISH"), channel, channel_len, msg, msg_len)) return getBool();
U_RETURN(false);
}
bool subscribe(const char* param, uint32_t len) // Listen for messages published to the given channels
{
U_TRACE(0, "UREDISClient_Base::subscribe(%.*S,%u)", len, param, len)
return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), param, len);
}
bool unsubscribe(const char* param, uint32_t len) // Stop listening for messages posted to the given channels
{
U_TRACE(0, "UREDISClient_Base::unsubscribe(%.*S,%u)", len, param, len)
return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), param, len);
}
// LIST (@see http://redis.io/list)
bool lrange(const char* param, uint32_t len) // Get a range of elements from a list
@ -723,6 +699,59 @@ public:
(withPayloads ? U_CONSTANT_SIZE("WITHPAYLOADS") : 0), "WITHPAYLOADS"));
}
// PUB/SUB (@see http://redis.io/pubsub)
bool publish(const char* channel, uint32_t channel_len, const char* msg, uint32_t msg_len) // Posts a message to the given channel
{
U_TRACE(0, "UREDISClient_Base::publish(%.*S,%u,%.*S,%u)", channel_len, channel, channel_len, msg_len, msg, msg_len)
if (processRequest(U_RC_INT, U_CONSTANT_TO_PARAM("PUBLISH"), channel, channel_len, msg, msg_len)) return getBool();
U_RETURN(false);
}
bool subscribe(const char* param, uint32_t len) // Listen for messages published to the given channels
{
U_TRACE(0, "UREDISClient_Base::subscribe(%.*S,%u)", len, param, len)
return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("SUBSCRIBE"), param, len);
}
bool unsubscribe(const char* param, uint32_t len) // Stop listening for messages posted to the given channels
{
U_TRACE(0, "UREDISClient_Base::unsubscribe(%.*S,%u)", len, param, len)
return processRequest(U_RC_MULTIBULK, U_CONSTANT_TO_PARAM("UNSUBSCRIBE"), param, len);
}
// define method VIRTUAL of class UEventFd
virtual int handlerRead() U_DECL_FINAL
{
U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerRead()")
U_RETURN(U_NOTIFIER_OK);
}
virtual void handlerDelete() U_DECL_FINAL
{
U_TRACE_NO_PARAM(0, "UREDISClient_Base::handlerDelete()")
U_INTERNAL_DUMP("UEventFd::fd = %d", UEventFd::fd)
UEventFd::fd = -1;
}
void listenForEvents()
{
U_TRACE_NO_PARAM(0, "UREDISClient_Base::listenForEvents()")
UEventFd::op_mask |= EPOLLET;
UEventFd::op_mask &= ~EPOLLRDHUP;
UNotifier::insert(this, EPOLLEXCLUSIVE | EPOLLROUNDROBIN); // NB: we ask to listen for events to a Redis publish channel...
}
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* dump(bool reset) const;
#endif

View File

@ -36,6 +36,12 @@ UEventDB::UEventDB()
bsend = false;
bnotifier = true;
#ifdef DEBUG
pthis = (void*)U_CHECK_MEMORY_SENTINEL;
#endif
// U_WARNING("UEventDB::UEventDB(): sizeof(UEventDB) = %u sizeof(query_info) = %u", sizeof(UEventDB), sizeof(query_info)); // sizeof(UEventDB) = 4072|4088 sizeof(query_info) = 24
}
UEventDB::~UEventDB()
@ -73,18 +79,18 @@ void UEventDB::reset()
U_CHECK_MEMORY
U_INTERNAL_DUMP("num_handler = %u", num_handler)
U_INTERNAL_ASSERT_MINOR(num_handler, U_QUERY_INFO_SZ)
#ifdef U_STATIC_ORM_DRIVER_PGSQL
bsend = false;
num_result = 0;
U_INTERNAL_DUMP("num_handler = %u", num_handler)
if (num_handler)
{
U_DEBUG("UEventDB::reset(): num_handler = %u", num_handler);
U_DEBUG("UEventDB::reset(): num_result(%u), num_handler(%u)%s", num_result, num_handler, pthis == (void*)U_CHECK_MEMORY_SENTINEL ? "" : " ABW on query array");
U_INTERNAL_ASSERT_MINOR(num_handler, U_QUERY_INFO_SZ)
U_SYSCALL_VOID(PQresetQueue, "%p", (PGconn*)conn);
for (uint32_t i = 0; i < num_handler; ++i)
{
@ -102,10 +108,12 @@ void UEventDB::reset()
}
}
num_handler = 0;
(void) U_SYSCALL(memset, "%p,%d,%u", query, 0, num_handler * sizeof(query_info));
U_SYSCALL_VOID(PQresetQueue, "%p", (PGconn*)conn);
num_handler = 0;
}
num_result = 0;
#endif
}
@ -130,8 +138,11 @@ void UEventDB::handlerQuery(vPFpvu handler, uint32_t num_query)
pquery->handlerResult = handler;
pquery->pClientImage = UServer_Base::pClientImage;
pquery->num_query = num_query;
#ifdef DEBUG
pquery->timestamp = u_now->tv_sec;
pquery->timestamp = u_now->tv_sec;
U_ASSERT_MACRO(pthis == (void*)U_CHECK_MEMORY_SENTINEL, "ABW on query array", "")
#endif
num_result += num_query;
@ -168,6 +179,9 @@ void UEventDB::handlerQuery(vPFpvu handler, uint32_t num_query)
*pbusy = false;
U_INTERNAL_ASSERT_EQUALS(num_result, 0)
U_INTERNAL_ASSERT_EQUALS(num_handler, 0)
return;
}
@ -283,17 +297,20 @@ read:
}
}
U_DEBUG("UEventDB::handlerRead(): vresult_size = %u num_result = %u pid = %u conn->errorMessage = %S", vresult_size, num_result, pid, PQerrorMessage((PGconn*)conn));
U_DEBUG("UEventDB::handlerRead(): vresult_size = %u num_result = %u num_handler = %u pid = %u conn->errorMessage = %S",
vresult_size, num_result, num_handler, pid, PQerrorMessage((PGconn*)conn));
next:
*pbusy = false;
n = U_min(num_handler, vresult_size);
n = U_min(num_result, vresult_size);
U_INTERNAL_DUMP("n = %u", n)
for (k = i = 0; k < n; ++i)
{
U_INTERNAL_ASSERT_MINOR(i, num_handler)
bopen = (pquery = (query+i))->pClientImage->isOpen();
# ifdef DEBUG
@ -337,7 +354,7 @@ next:
}
}
U_INTERNAL_DUMP("i = %u k = %u vresult_size = %u num_handler = %u", i+1, k, vresult_size, num_handler)
U_INTERNAL_DUMP("i = %u k = %u vresult_size = %u num_result = %u num_handler = %u", i+1, k, vresult_size, num_result, num_handler)
}
for (; i < num_handler; ++i)

View File

@ -7527,15 +7527,26 @@ U_NO_EXPORT __pure uint32_t UHTTP::getPosPasswd(UString& fpasswd, const UString&
{
U_TRACE(0, "UHTTP::getPosPasswd(%V,%V)", fpasswd.rep, line.rep)
U_INTERNAL_ASSERT(fpasswd)
uint32_t pos = fpasswd.find(line);
if (pos == U_NOT_FOUND ||
(pos > 0 && fpasswd[pos-1] != '\n'))
if (pos == U_NOT_FOUND) U_RETURN(U_NOT_FOUND);
if (pos == 0 ||
fpasswd[pos-1] == '\n')
{
U_RETURN(U_NOT_FOUND);
U_RETURN(pos);
}
U_RETURN(pos);
while (true)
{
pos = fpasswd.find(line, pos+1);
if (pos == U_NOT_FOUND) U_RETURN(U_NOT_FOUND);
if (fpasswd[pos-1] == '\n') U_RETURN(pos);
}
}
U_NO_EXPORT uint32_t UHTTP::checkPasswd(UHTTP::UFileCacheData* ptr_file_data, UString& fpasswd, const UString& line)
@ -7715,37 +7726,41 @@ void UHTTP::setPasswdUser(UString& fpasswd, const UString& username, const UStri
{
U_TRACE(0, "UHTTP::setPasswdUser(%V,%V,%V)", fpasswd.rep, username.rep, password.rep)
UString buffer(U_CAPACITY), hash(1000U), user_token(U_CAPACITY);
if (digest_authentication)
if (username &&
password)
{
// s.casazza:Protected Area:b9ee2af50be37...........\n
UString buffer(U_CAPACITY), hash(1000U), user_token(U_CAPACITY);
buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep);
if (digest_authentication)
{
// s.casazza:Protected Area:b9ee2af50be37...........\n
UServices::generateDigest(U_HASH_MD5, 0, buffer+password, hash);
buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep);
user_token.snprintf(U_CONSTANT_TO_PARAM("%v\n"), buffer.rep, hash.rep);
}
else
{
// s.casazza:{SHA}Lkii1ZE7k.....\n
UServices::generateDigest(U_HASH_MD5, 0, buffer+password, hash);
buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep);
user_token.snprintf(U_CONSTANT_TO_PARAM("%v%v\n"), buffer.rep, hash.rep);
}
else
{
// s.casazza:{SHA}Lkii1ZE7k.....\n
UServices::generateDigest(U_HASH_SHA1, 0, password, hash, true);
buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep);
user_token.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}%v\n"), username.rep, hash.rep);
}
UServices::generateDigest(U_HASH_SHA1, 0, password, hash, true);
uint32_t pos_begin = getPosPasswd(fpasswd, buffer);
user_token.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}%v\n"), username.rep, hash.rep);
}
if (pos_begin == U_NOT_FOUND) (void) fpasswd.append(user_token);
else
{
uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin;
uint32_t pos_begin = getPosPasswd(fpasswd, buffer);
(void) fpasswd.replace(pos_begin, pos_end, user_token);
if (pos_begin == U_NOT_FOUND) (void) fpasswd.append(user_token);
else
{
uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin+1;
(void) fpasswd.replace(pos_begin, pos_end, user_token);
}
}
}
@ -7753,20 +7768,24 @@ bool UHTTP::revokePasswdUser(UString& fpasswd, const UString& username) // Remov
{
U_TRACE(0, "UHTTP::revokePasswdUser(%V,%V)", fpasswd.rep, username.rep)
UString buffer(U_CAPACITY);
if (digest_authentication) buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep); // s.casazza:Protected Area:b9ee2af50be37...........\n
else buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep); // s.casazza:{SHA}Lkii1ZE7k.....\n
uint32_t pos_begin = getPosPasswd(fpasswd, buffer);
if (pos_begin != U_NOT_FOUND)
if (fpasswd &&
username)
{
uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin;
UString buffer(U_CAPACITY);
(void) fpasswd.erase(pos_begin, pos_end);
if (digest_authentication) buffer.snprintf(U_CONSTANT_TO_PARAM("%v:" U_HTTP_REALM ":"), username.rep); // s.casazza:Protected Area:b9ee2af50be37...........\n
else buffer.snprintf(U_CONSTANT_TO_PARAM("%v:{SHA}"), username.rep); // s.casazza:{SHA}Lkii1ZE7k.....\n
U_RETURN(true);
uint32_t pos_begin = getPosPasswd(fpasswd, buffer);
if (pos_begin != U_NOT_FOUND)
{
uint32_t pos_end = fpasswd.find('\n', pos_begin+1) - pos_begin;
(void) fpasswd.erase(pos_begin, pos_end);
U_RETURN(true);
}
}
U_RETURN(false);