1
0
mirror of https://github.com/stefanocasazza/ULib.git synced 2025-09-28 19:05:55 +08:00
ULib/src/ulib/net/server/plugin/mod_stream.cpp
stefanocasazza d4dfd5d9a8 sync
2018-05-11 18:36:10 +02:00

250 lines
6.6 KiB
C++

// ============================================================================
//
// = LIBRARY
// ULib - c++ library
//
// = FILENAME
// mod_stream.cpp - distributing realtime input
//
// = AUTHOR
// Stefano Casazza
//
// ============================================================================
#include <ulib/command.h>
#include <ulib/file_config.h>
#include <ulib/utility/uhttp.h>
#include <ulib/utility/services.h>
#include <ulib/net/server/server.h>
#include <ulib/net/server/plugin/mod_stream.h>
U_CREAT_FUNC(server_plugin_stream, UStreamPlugIn)
pid_t UStreamPlugIn::pid = (pid_t)-1;
UString* UStreamPlugIn::uri_path;
UString* UStreamPlugIn::metadata;
UString* UStreamPlugIn::content_type;
UCommand* UStreamPlugIn::command;
URingBuffer* UStreamPlugIn::rbuf;
URingBuffer::rbuf_data* UStreamPlugIn::ptr;
// 1M size ring buffer
#define U_RING_BUFFER_SIZE (1 * 1024 * 1024)
RETSIGTYPE UStreamPlugIn::handlerForSigTERM(int signo)
{
U_TRACE(0, "[SIGTERM] UStreamPlugIn::handlerForSigTERM(%d)", signo)
if (pid != -1) UProcess::kill(pid, SIGTERM);
UInterrupt::sendOurselves(SIGTERM);
}
UStreamPlugIn::UStreamPlugIn()
{
U_TRACE_CTOR(0, UStreamPlugIn, "")
U_NEW_STRING(uri_path, UString);
U_NEW_STRING(content_type, UString);
}
UStreamPlugIn::~UStreamPlugIn()
{
U_TRACE_DTOR(0, UStreamPlugIn)
U_DELETE(uri_path)
U_DELETE(content_type)
if (command)
{
U_DELETE(command)
if (rbuf) U_DELETE(rbuf)
if (pid != -1) UProcess::kill(pid, SIGTERM);
}
if (metadata) U_DELETE(metadata)
}
// Server-wide hooks
int UStreamPlugIn::handlerConfig(UFileConfig& cfg)
{
U_TRACE(0, "UStreamPlugIn::handlerConfig(%p)", &cfg)
// stream - plugin parameters
// ------------------------------------------------------------------------------------------------------------------------
// URI_PATH specifies the local part of the URL path at which you would like the content to appear (Ex. /my/video.mjpeg)
// METADATA specifies the needs to have setup headers prepended for each codec stream (Ex. /my/audio.ogg)
// CONTENT_TYPE specifies the Internet media type of the stream, which will appear in the Content-Type HTTP response header
//
// COMMAND command to execute
// ENVIRONMENT environment for command to execute
// ------------------------------------------------------------------------------------------------------------------------
if (cfg.loadTable())
{
UString x = cfg.at(U_CONSTANT_TO_PARAM("METADATA"));
if (x) U_NEW_STRING(metadata, UString(x))
*uri_path = cfg.at(U_CONSTANT_TO_PARAM("URI_PATH"));
*content_type = cfg.at(U_CONSTANT_TO_PARAM("CONTENT_TYPE"));
command = UServer_Base::loadConfigCommand();
U_RETURN(U_PLUGIN_HANDLER_PROCESSED);
}
U_RETURN(U_PLUGIN_HANDLER_OK);
}
int UStreamPlugIn::handlerInit()
{
U_TRACE_NO_PARAM(0, "UStreamPlugIn::handlerInit()")
static int fd_stderr;
if (command == U_NULLPTR) U_RETURN(U_PLUGIN_HANDLER_ERROR);
if (fd_stderr == 0) fd_stderr = UServices::getDevNull("/tmp/UStreamPlugIn.err");
bool result = command->execute(U_NULLPTR, (UString*)-1, -1, fd_stderr);
U_SRV_LOG_CMD_MSG_ERR(*command, false);
if (result == false)
{
U_DELETE(command)
command = U_NULLPTR;
U_RETURN(U_PLUGIN_HANDLER_ERROR);
}
ptr = (URingBuffer::rbuf_data*) UServer_Base::getOffsetToDataShare(sizeof(URingBuffer::rbuf_data) + U_RING_BUFFER_SIZE);
(void) content_type->append(U_CONSTANT_TO_PARAM(U_CRLF));
U_RETURN(U_PLUGIN_HANDLER_OK);
}
int UStreamPlugIn::handlerRun()
{
U_TRACE_NO_PARAM(0, "UStreamPlugIn::handlerRun()")
U_INTERNAL_ASSERT_EQUALS(pid,-1)
U_NEW(URingBuffer, rbuf, URingBuffer((URingBuffer::rbuf_data*) UServer_Base::getPointerToDataShare(ptr), U_RING_BUFFER_SIZE));
// NB: we are feeding by a child of us...
UProcess proc;
if (proc.fork() &&
proc.parent())
{
pid = proc.pid();
/*
pid_t pgid = U_SYSCALL_NO_PARAM(getpgrp);
UProcess::setProcessGroup(pid, pgid);
*/
U_RETURN(U_PLUGIN_HANDLER_OK);
}
if (proc.child())
{
UTimeVal to_sleep(0L, 50 * 1000L);
pid = UCommand::pid;
UInterrupt::insert(SIGTERM, (sighandler_t)UStreamPlugIn::handlerForSigTERM); // async signal
while (UNotifier::waitForRead(UProcess::filedes[2]) >= 1)
{
int nread = rbuf->readFromFdAndWrite(UProcess::filedes[2]);
if (nread == 0) break; // EOF
if (nread < 0) to_sleep.nanosleep(); // EAGAIN
}
handlerForSigTERM(SIGTERM);
}
U_RETURN(U_PLUGIN_HANDLER_OK);
}
// Connection-wide hooks
int UStreamPlugIn::handlerRequest()
{
U_TRACE_NO_PARAM(0, "UStreamPlugIn::handlerRequest()")
if (U_HTTP_URI_EQUAL(*uri_path) == false) U_RETURN(U_PLUGIN_HANDLER_OK);
U_http_info.nResponseCode = HTTP_OK;
UHTTP::setResponse(*content_type, U_NULLPTR);
UClientImage_Base::setCloseConnection();
if (USocketExt::write(UServer_Base::csocket, *UClientImage_Base::wbuffer, UServer_Base::timeoutMS) &&
UHTTP::isHEAD() == false)
{
int readd = rbuf->open();
if (readd != -1)
{
if (UServer_Base::startParallelization())
{
// parent
rbuf->close(readd);
U_RETURN(U_PLUGIN_HANDLER_OK);
}
UTimeVal to_sleep(0L, 10 * 1000L);
if (metadata) (void) USocketExt::write(UServer_Base::csocket, *metadata, UServer_Base::timeoutMS);
while (UServer_Base::flag_loop)
{
if ( rbuf->isEmpty(readd) == false &&
(rbuf->readAndWriteToFd(readd, UServer_Base::csocket->iSockDesc) <= 0 && errno != EAGAIN)) break;
to_sleep.nanosleep();
}
rbuf->close(readd);
}
}
U_RETURN(U_PLUGIN_HANDLER_PROCESSED);
}
// DEBUG
#if defined(U_STDCPP_ENABLE) && defined(DEBUG)
const char* UStreamPlugIn::dump(bool reset) const
{
*UObjectIO::os << "pid " << pid << '\n'
<< "uri_path (UString " << (void*)uri_path << ")\n"
<< "content_type (UString " << (void*)content_type << ")\n"
<< "command (UCommand " << (void*)command << ")\n"
<< "rbuf (URingBuffer " << (void*)rbuf << ')';
if (reset)
{
UObjectIO::output();
return UObjectIO::buffer_output;
}
return U_NULLPTR;
}
#endif