Alive thread implemented

This commit is contained in:
2026-04-11 09:50:59 +02:00
parent 8e5381fda2
commit 52aa0cac10
10 changed files with 215 additions and 31 deletions

View File

@@ -26,13 +26,26 @@ public:
ShmQueue *result_queue; ShmQueue *result_queue;
ShmQueue *event_queue; ShmQueue *event_queue;
Rktwebview_qt *webview_handler; Rktwebview_qt *webview_handler;
bool quit;
// QThread interface // QThread interface
protected: protected:
void run(); void run();
}; };
class Alive : public QThread
{
public:
Handler *handler;
ShmQueue *alive_queue;
bool alive_timeout;
protected:
void run();
};
static Handler *_handler; static Handler *_handler;
static Alive *_alive;
static void event_cb(rkt_data_t *data) static void event_cb(rkt_data_t *data)
{ {
@@ -58,7 +71,7 @@ int main(int argc, char *argv[])
INFO1("Starting at %s", ctime(&my_time)); INFO1("Starting at %s", ctime(&my_time));
} }
if (argc < 6) { if (argc < 7) {
ERROR1("%s: wrong number of arguments\n", me); ERROR1("%s: wrong number of arguments\n", me);
exit(1); exit(1);
} }
@@ -68,14 +81,16 @@ int main(int argc, char *argv[])
const char *cmd_slot_str = argv[3]; const char *cmd_slot_str = argv[3];
const char *res_slot_str = argv[4]; const char *res_slot_str = argv[4];
const char *evt_slot_str = argv[5]; const char *evt_slot_str = argv[5];
const char *alive_slot_str = argv[6];
size_t shm_size = atoi(shm_size_str); size_t shm_size = atoi(shm_size_str);
int cmd_slot = atoi(cmd_slot_str); int cmd_slot = atoi(cmd_slot_str);
int res_slot = atoi(res_slot_str); int res_slot = atoi(res_slot_str);
int evt_slot = atoi(evt_slot_str); int evt_slot = atoi(evt_slot_str);
int alive_slot = atoi(alive_slot_str);
MKLOGSTMT(LOG_INFO, fprintf(stderr, "%s %s %s %s %s %s\n", me, shm_name, shm_size_str, cmd_slot_str, res_slot_str, evt_slot_str)); MKLOGSTMT(LOG_INFO, fprintf(stderr, "%s %s %s %s %s %s %s\n", me, shm_name, shm_size_str, cmd_slot_str, res_slot_str, evt_slot_str, alive_slot_str));
MKLOGSTMT(LOG_INFO, fprintf(stderr, "%s %s %ld %d %d %d\n", me, shm_name, shm_size, cmd_slot, res_slot, evt_slot)); MKLOGSTMT(LOG_INFO, fprintf(stderr, "%s %s %ld %d %d %d %d\n", me, shm_name, shm_size, cmd_slot, res_slot, evt_slot, alive_slot));
if (!(shm_size > 0 && cmd_slot > 0 && res_slot > 0 && evt_slot > 0)) { if (!(shm_size > 0 && cmd_slot > 0 && res_slot > 0 && evt_slot > 0)) {
ERROR1("%s: Invalid shm size or slots\n", me); ERROR1("%s: Invalid shm size or slots\n", me);
@@ -91,14 +106,38 @@ int main(int argc, char *argv[])
handler->webview_handler = new Rktwebview_qt(argc, argv); handler->webview_handler = new Rktwebview_qt(argc, argv);
handler->start(); handler->start();
Alive *alive = new Alive();
_alive = alive;
alive->handler = handler;
alive->alive_timeout = false;
alive->alive_queue = new ShmQueue(handler->shm, alive_slot, false);
alive->start();
handler->webview_handler->initApp(); handler->webview_handler->initApp();
handler->webview_handler->execApp(); handler->webview_handler->execApp();
INFO0("waiting for thread to end\n"); INFO0("waiting for thread to end\n");
handler->wait(); handler->wait();
INFO0("Handler thread stopped\n");
alive->wait();
INFO0("Alive thread stopped\n");
if (alive->alive_timeout) {
// take ownership over all queues and SHM, because we didn't get an alive message
// and probably the racket side has crashed because of that.
ERROR0("Taking ownership of shared memory, shared queues and shared semaphores\n");
handler->command_queue->takeOwnership();
handler->result_queue->takeOwnership();
handler->event_queue->takeOwnership();
handler->shm->takeOwnership();
alive->alive_queue->takeOwnership();
}
INFO0("cleaning up shm\n"); INFO0("cleaning up shm\n");
delete alive->alive_queue;
delete alive;
delete handler->webview_handler; delete handler->webview_handler;
delete handler->command_queue; delete handler->command_queue;
delete handler->result_queue; delete handler->result_queue;
@@ -116,14 +155,28 @@ int main(int argc, char *argv[])
void Handler::run() void Handler::run()
{ {
bool quit = false; int wait_ms = 10 * 1000; // 10 seconds.
while (!quit) { while (!quit) {
int cmd; int cmd;
std::string data; std::string data;
command_queue->dequeue(cmd, data, true); bool something_came = command_queue->dequeue(cmd, data, wait_ms);
if (!something_came) {
DEBUG1("No command received last %d seconds\n", wait_ms / 1000);
cmd = CMD_NOOP;
}
QJsonObject data_obj = QJsonDocument::fromJson(data.c_str()).object(); QJsonObject data_obj = QJsonDocument::fromJson(data.c_str()).object();
switch(cmd) { switch(cmd) {
case CMD_NOOP: {
if (quit) {
DEBUG1("Alive timeout, quit = %d\n", quit);
webview_handler->closeAllWindows();
DEBUG0("Closed all windows\n");
webview_handler->rktQuit();
DEBUG0("Quit application\n");
}
}
break;
case CMD_QUIT: { case CMD_QUIT: {
INFO0("Got quit message\n"); INFO0("Got quit message\n");
webview_handler->rktQuit(); webview_handler->rktQuit();
@@ -341,4 +394,31 @@ void Handler::run()
} }
} }
} }
DEBUG0("Exiting handler thread\n");
}
void Alive::run()
{
int wait_ms = 10 * 1000;
int ping_no;
std::string data;
bool go_on = true;
while (go_on) {
bool something_came = alive_queue->dequeue(ping_no, data, wait_ms);
if (!something_came) {
ERROR0("No alive message received, stopping alive loop and quitting rktwebview_prg\n");
handler->quit = true;
handler->command_queue->enqueue(CMD_NOOP);
alive_timeout = true;
go_on = false;
} else {
if (ping_no == CMD_ALIVE_QUIT) {
DEBUG0("Got Quit ping for alive thread\n");
go_on = false;
} else {
DEBUG1("Got alive ping: %d\n", ping_no);
}
}
}
DEBUG0("Exiting alive thread\n");
} }

View File

@@ -30,7 +30,9 @@
#define CMD_INFO 27 // arguments: none #define CMD_INFO 27 // arguments: none
#define CMD_SET_ICON 28 // arguments: wv: int, icon:string (absolute filename to icon in png/jpg/svg) #define CMD_SET_ICON 28 // arguments: wv: int, icon:string (absolute filename to icon in png/jpg/svg)
#define CMD_NOOP 33621
#define RESULT_QUIT 36379 #define RESULT_QUIT 36379
#define CMD_ALIVE_QUIT -38383
#endif // RKT_PROTOCOL_H #endif // RKT_PROTOCOL_H

View File

@@ -28,6 +28,7 @@
#define COMMAND_SLOT 1 #define COMMAND_SLOT 1
#define COMMAND_RESULT_SLOT 2 #define COMMAND_RESULT_SLOT 2
#define EVENT_SLOT 3 #define EVENT_SLOT 3
#define ALIVE_SLOT 4
//#define DEBUG //#define DEBUG
@@ -42,6 +43,7 @@ typedef struct {
ShmQueue *command_queue; ShmQueue *command_queue;
ShmQueue *command_result_queue; ShmQueue *command_result_queue;
ShmQueue *event_queue; ShmQueue *event_queue;
ShmQueue *alive_queue;
#ifdef _WIN32 #ifdef _WIN32
HANDLE rkt_webview_prg_pid; HANDLE rkt_webview_prg_pid;
#else #else
@@ -59,6 +61,8 @@ typedef struct {
std::thread *guard_thread; std::thread *guard_thread;
bool guard_go_on; bool guard_go_on;
bool alive_go_on;
std::thread *alive_thread;
} Handle_t; } Handle_t;
Handle_t *handler = nullptr; Handle_t *handler = nullptr;
@@ -96,11 +100,14 @@ bool runRktWebview(Handle_t *handler)
char command_slot[10]; char command_slot[10];
char command_result_slot[10]; char command_result_slot[10];
char event_slot[10]; char event_slot[10];
char alive_slot[10];
sprintf(shm_size_str, "%d", static_cast<int>(handler->shm_size)); sprintf(shm_size_str, "%d", static_cast<int>(handler->shm_size));
sprintf(command_slot, "%d", COMMAND_SLOT); sprintf(command_slot, "%d", COMMAND_SLOT);
sprintf(command_result_slot, "%d", COMMAND_RESULT_SLOT); sprintf(command_result_slot, "%d", COMMAND_RESULT_SLOT);
sprintf(event_slot, "%d", EVENT_SLOT); sprintf(event_slot, "%d", EVENT_SLOT);
sprintf(alive_slot, "%d", ALIVE_SLOT);
// run rktwebview_prg using the environment variable RKT_WEBVIEW_PRG // run rktwebview_prg using the environment variable RKT_WEBVIEW_PRG
#ifdef _WIN32 #ifdef _WIN32
@@ -137,7 +144,7 @@ bool runRktWebview(Handle_t *handler)
DWORD flags = CREATE_NO_WINDOW | NORMAL_PRIORITY_CLASS; DWORD flags = CREATE_NO_WINDOW | NORMAL_PRIORITY_CLASS;
std::string cmdargs = std::string("") + handler->name + " " + shm_size_str + " " + command_slot + " " + command_result_slot + " " + event_slot; std::string cmdargs = std::string("") + handler->name + " " + shm_size_str + " " + command_slot + " " + command_result_slot + " " + event_slot + " " + alive_slot;
std::string exe = std::string(rkt_webview_prg_path); std::string exe = std::string(rkt_webview_prg_path);
std::string dir = basedir(exe); std::string dir = basedir(exe);
@@ -178,13 +185,16 @@ bool runRktWebview(Handle_t *handler)
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
#define EVT_GUARD_STOP -93273 #define EVT_GUARD_STOP -93273
#define WAIT_ON_EVENT_MS (10 * 1000)
#define ALIVE_MESSAGE_INTERVAL_S 5 // Should be smaller than 10 seconds
#define MAX_WAIT_RESULT (10 * 1000) // Maximum wait in milliseconds for a result
void rkt_evt_guard(void) void rkt_evt_guard(void)
{ {
while(handler->guard_go_on) { while(handler->guard_go_on) {
int wv; int wv;
std::string data; std::string data;
if (handler->event_queue->dequeue(wv, data, true)) { if (handler->event_queue->dequeue(wv, data, WAIT_ON_EVENT_MS)) {
if (wv != EVT_GUARD_STOP) { if (wv != EVT_GUARD_STOP) {
handler->evt_queue->enqueue(wv, data); handler->evt_queue->enqueue(wv, data);
handler->evt_cb(1); handler->evt_cb(1);
@@ -193,6 +203,27 @@ void rkt_evt_guard(void)
} }
} }
void queue_alive_message()
{
int seconds = 0;
int ping = 0;
DEBUG0("Starting queue_alive thread\n");
while(handler->alive_go_on) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // one second
seconds += 1;
//DEBUG2("seconds = %d, %d\n", seconds, ALIVE_MESSAGE_INTERVAL_S);
if (seconds >= ALIVE_MESSAGE_INTERVAL_S && handler->alive_go_on) {
seconds = 0;
++ping;
DEBUG1("Pinging with %d\n", ping);
handler->alive_queue->enqueue(ping);
if (ping > 1000000000) { ping = 0; }
}
}
handler->alive_queue->enqueue(CMD_ALIVE_QUIT);
}
void rkt_webview_cleanup() void rkt_webview_cleanup()
{ {
if (handler != nullptr) { if (handler != nullptr) {
@@ -212,15 +243,22 @@ void rkt_webview_cleanup()
handler->guard_thread = nullptr; handler->guard_thread = nullptr;
} }
handler->alive_go_on = false;
handler->alive_thread->join();
delete handler->alive_thread;
handler->alive_thread = nullptr;
INFO0("Sending quit message\n"); INFO0("Sending quit message\n");
handler->command_queue->enqueue(CMD_QUIT); handler->command_queue->enqueue(CMD_QUIT);
INFO0("Message sent\n"); INFO0("Message sent\n");
bool stopped = false; bool stopped = false;
while(!stopped) { while(!stopped) {
int cmd; int cmd = -1;
std::string s; std::string s;
INFO0("Getting result of quit message\n"); INFO0("Getting result of quit message\n");
handler->command_result_queue->dequeue(cmd, s, true); if (!handler->command_result_queue->dequeue(cmd, s, MAX_WAIT_RESULT)) {
ERROR0("Other side has probably stopped working, no result on quit message\n");
}
INFO1("got %d\n", cmd); INFO1("got %d\n", cmd);
if (cmd == RESULT_QUIT) { if (cmd == RESULT_QUIT) {
stopped = true; stopped = true;
@@ -228,16 +266,23 @@ void rkt_webview_cleanup()
} }
} }
// Cleanup shared memory
delete handler->event_queue; delete handler->event_queue;
delete handler->command_result_queue; delete handler->command_result_queue;
delete handler->command_queue; delete handler->command_queue;
delete handler->alive_queue;
delete handler->shm; delete handler->shm;
// Cleanup Memory Queue
delete handler->evt_queue; delete handler->evt_queue;
// Cleanup Handler
delete handler; delete handler;
handler = nullptr; handler = nullptr;
} }
} }
void rkt_webview_init() void rkt_webview_init()
{ {
if (handler == nullptr) { if (handler == nullptr) {
@@ -269,6 +314,8 @@ void rkt_webview_init()
handler->command_result_queue = new ShmQueue(handler->shm, COMMAND_RESULT_SLOT, true); handler->command_result_queue = new ShmQueue(handler->shm, COMMAND_RESULT_SLOT, true);
handler->event_queue = new ShmQueue(handler->shm, EVENT_SLOT, true); handler->event_queue = new ShmQueue(handler->shm, EVENT_SLOT, true);
handler->alive_queue = new ShmQueue(handler->shm, ALIVE_SLOT, true);
// Start rktwebview_prg application with the right information // Start rktwebview_prg application with the right information
#ifndef DEBUG #ifndef DEBUG
handler->rkt_webview_prg_started = runRktWebview(handler); handler->rkt_webview_prg_started = runRktWebview(handler);
@@ -278,6 +325,10 @@ void rkt_webview_init()
handler->evt_cb = nullptr; handler->evt_cb = nullptr;
handler->evt_queue = new MemQueue(); handler->evt_queue = new MemQueue();
handler->guard_go_on = false; handler->guard_go_on = false;
handler->alive_go_on = true;
handler->alive_thread = new std::thread(queue_alive_message);
} else { } else {
handler->function_calls++; handler->function_calls++;
} }
@@ -339,7 +390,7 @@ rkt_wv_context_t rkt_webview_new_context(const char *boilerplate_js, const char
int result; int result;
std::string json_result; std::string json_result;
handler->command_result_queue->dequeue(result, json_result, true); while (!handler->command_result_queue->dequeue(result, json_result, MAX_WAIT_RESULT)) {}
return result; return result;
} }
@@ -356,7 +407,7 @@ int rkt_webview_create(rkt_wv_context_t context, rktwebview_t parent)
int result; int result;
std::string json_result; std::string json_result;
handler->command_result_queue->dequeue(result, json_result, true); while (!handler->command_result_queue->dequeue(result, json_result, MAX_WAIT_RESULT)) {}
return result; return result;
} }
@@ -383,7 +434,7 @@ void rkt_webview_close(rktwebview_t wv)
handler->command_queue->enqueue(cmd, j.dump()); \ handler->command_queue->enqueue(cmd, j.dump()); \
int result; \ int result; \
std::string json_result; \ std::string json_result; \
handler->command_result_queue->dequeue(result, json_result, true); \ while (!handler->command_result_queue->dequeue(result, json_result, MAX_WAIT_RESULT)) {} \
result_t r = static_cast<result_t>(result); \ result_t r = static_cast<result_t>(result); \
return r; return r;
@@ -427,7 +478,7 @@ rkt_data_t *rkt_webview_call_js(rktwebview_t wv, const char *js)
int result; int result;
std::string json_result; std::string json_result;
handler->command_result_queue->dequeue(result, json_result, true); while (!handler->command_result_queue->dequeue(result, json_result, MAX_WAIT_RESULT)) {}
rkt_data_t *r = new rkt_data_t(); rkt_data_t *r = new rkt_data_t();
r->kind = rkt_data_kind_t::js_result; r->kind = rkt_data_kind_t::js_result;
@@ -501,8 +552,9 @@ result_t rkt_webview_show_normal(rktwebview_t w)
void rkt_webview_set_loglevel(rkt_webview_loglevel_t l) void rkt_webview_set_loglevel(rkt_webview_loglevel_t l)
{ {
setLogLevel(l); // library side
auto f = [l]() { auto f = [l]() {
CMDRES0(CMD_SET_LOGLEVEL, static_cast<int>(l)); CMDRES0(CMD_SET_LOGLEVEL, static_cast<int>(l)); // webview process side
}; };
f(); f();
} }
@@ -605,7 +657,7 @@ rkt_data_t *rkt_webview_get_event()
} else { } else {
int wv; int wv;
std::string data; std::string data;
if (handler->event_queue->dequeue(wv, data, false)) { if (handler->event_queue->dequeue(wv, data, 0)) {
//fprintf(stderr, "got event %d %s\n", wv, data.c_str()); //fprintf(stderr, "got event %d %s\n", wv, data.c_str());
rkt_data_t *d = reinterpret_cast<rkt_data_t *>(malloc(sizeof(rkt_data_t))); rkt_data_t *d = reinterpret_cast<rkt_data_t *>(malloc(sizeof(rkt_data_t)));
d->kind = rkt_data_kind_t::event; d->kind = rkt_data_kind_t::event;
@@ -657,7 +709,7 @@ rkt_data_t *rkt_webview_info()
int open_windows_result; int open_windows_result;
std::string none; std::string none;
handler->command_result_queue->dequeue(open_windows_result, none, true); while (!handler->command_result_queue->dequeue(open_windows_result, none, MAX_WAIT_RESULT)) {}
d->data.metrics.open_windows = open_windows_result; d->data.metrics.open_windows = open_windows_result;
d->data.metrics.function_calls = handler->function_calls; d->data.metrics.function_calls = handler->function_calls;

View File

@@ -473,6 +473,16 @@ int Rktwebview_qt::nextHandle()
return h; return h;
} }
void Rktwebview_qt::closeAllWindows()
{
auto wvs = _views.keys();
int i;
for(i = 0; i < wvs.size(); i++) {
int wv = wvs[i];
rktWebViewClose(wv);
}
}
rkt_wv_context_t Rktwebview_qt::newContext(const QString &boilerplate_js, bool has_pem, const QString &optional_server_cert_pem) rkt_wv_context_t Rktwebview_qt::newContext(const QString &boilerplate_js, bool has_pem, const QString &optional_server_cert_pem)
{ {
Command c(COMMAND_NEW_CONTEXT); Command c(COMMAND_NEW_CONTEXT);

View File

@@ -77,6 +77,9 @@ public:
public: public:
int nextHandle(); int nextHandle();
public:
void closeAllWindows();
public: public:
rkt_wv_context_t newContext(const QString &boilerplate_js, bool has_pem, const QString &optional_server_cert_pem); rkt_wv_context_t newContext(const QString &boilerplate_js, bool has_pem, const QString &optional_server_cert_pem);
int rktWebViewCreate(rkt_wv_context_t context, rktwebview_t parent, event_cb_t js_evt_cb); int rktWebViewCreate(rkt_wv_context_t context, rktwebview_t parent, event_cb_t js_evt_cb);

View File

@@ -36,6 +36,8 @@ int main(int argc, char *argv[])
#endif #endif
int context = rkt_webview_new_context("", nullptr); int context = rkt_webview_new_context("", nullptr);
rkt_webview_set_loglevel(rkt_webview_loglevel_t::log_debug);
rkt_webview_register_evt_callback(evt_cb); rkt_webview_register_evt_callback(evt_cb);
int wv = rkt_webview_create(context, 0); int wv = rkt_webview_create(context, 0);
@@ -50,6 +52,7 @@ int main(int argc, char *argv[])
rkt_webview_set_url(wv, "https://wikipedia.org"); rkt_webview_set_url(wv, "https://wikipedia.org");
d = rkt_webview_info(); d = rkt_webview_info();
fprintf(stderr, "%s\n", d->data.metrics.log_file);
rkt_webview_free_data(d); rkt_webview_free_data(d);
while(rkt_webview_events_waiting() > 0) { while(rkt_webview_events_waiting() > 0) {
@@ -57,7 +60,7 @@ int main(int argc, char *argv[])
rkt_webview_free_data(d); rkt_webview_free_data(d);
} }
#ifdef _WIN32 #ifdef _WIN32
Sleep(10); Sleep(30000);
#else #else
sleep(10); sleep(10);
#endif #endif

31
shm.cpp
View File

@@ -211,6 +211,11 @@ public:
return new ShmSem(name, owner); return new ShmSem(name, owner);
} }
public:
void takeOwnership() {
_owner = true;
}
public: public:
ShmApiBase(const char *name, size_t size, bool owner) { ShmApiBase(const char *name, size_t size, bool owner) {
char *buf = reinterpret_cast<char *>(malloc(strlen(name) + 50)); char *buf = reinterpret_cast<char *>(malloc(strlen(name) + 50));
@@ -520,11 +525,15 @@ public:
ReleaseSemaphore(_sem, 1, NULL); ReleaseSemaphore(_sem, 1, NULL);
} }
void wait() { bool wait(int ms) {
DWORD r = WaitForSingleObject(_sem, INFINITE); DWORD r = WaitForSingleObject(_sem, ms);
if (r != WAIT_OBJECT_0) { if (r != WAIT_OBJECT_0) {
if (r != WAIT_TIMEOUT) {
ERROR1("sem_wait error: %ld\n", r); ERROR1("sem_wait error: %ld\n", r);
} }
return false;
}
return true;
} }
bool trywait() { bool trywait() {
@@ -535,6 +544,10 @@ public:
return r == WAIT_OBJECT_0; return r == WAIT_OBJECT_0;
} }
void takeOwnership() {
_owner = true;
}
public: public:
ShmSemApi(const char *n, bool owner) { ShmSemApi(const char *n, bool owner) {
_name = _strdup(n); _name = _strdup(n);
@@ -610,6 +623,11 @@ void *Shm::ref(int place)
return _shm_api->ref(place); return _shm_api->ref(place);
} }
void Shm::takeOwnership()
{
_shm_api->takeOwnership();
}
void Shm::info(int &usage, int &free_depth, int &free_size, int &item_depth, int &item_size, double &usage_factor) void Shm::info(int &usage, int &free_depth, int &free_size, int &item_depth, int &item_size, double &usage_factor)
{ {
_shm_api->info(usage, free_depth, free_size, item_depth, item_size, usage_factor); _shm_api->info(usage, free_depth, free_size, item_depth, item_size, usage_factor);
@@ -635,14 +653,19 @@ void ShmSem::post() {
_api->post(); _api->post();
} }
void ShmSem::wait() { bool ShmSem::wait(int ms) {
_api->wait(); return _api->wait(ms);
} }
bool ShmSem::trywait() { bool ShmSem::trywait() {
return _api->trywait(); return _api->trywait();
} }
void ShmSem::takeOwnership()
{
_api->takeOwnership();
}
ShmSem::ShmSem(const char *n, bool owner) { ShmSem::ShmSem(const char *n, bool owner) {
_api = new ShmSemApi(n, owner); _api = new ShmSemApi(n, owner);
} }

6
shm.h
View File

@@ -17,8 +17,9 @@ private:
public: public:
void post(); void post();
void wait(); bool wait(int ms);
bool trywait(); bool trywait();
void takeOwnership();
public: public:
ShmSem(const char *n, bool owner); ShmSem(const char *n, bool owner);
@@ -52,6 +53,9 @@ public:
public: public:
void *ref(int place); void *ref(int place);
public:
void takeOwnership();
public: public:
void info(int &usage, int &free_depth, int &free_size, int &item_depth, int &item_size, double &usage_factor); void info(int &usage, int &free_depth, int &free_size, int &item_depth, int &item_size, double &usage_factor);

View File

@@ -23,6 +23,11 @@ ShmQueue::ShmQueue(Shm *shm, ShmSlot slot, bool owner)
_queue_sem = shm->sem(buf, owner); _queue_sem = shm->sem(buf, owner);
} }
void ShmQueue::takeOwnership()
{
_queue_sem->takeOwnership();
}
ShmQueue::~ShmQueue() ShmQueue::~ShmQueue()
{ {
if (_owner) { if (_owner) {
@@ -83,15 +88,14 @@ void ShmQueue::enqueue(int cmd)
enqueue(cmd, s); enqueue(cmd, s);
} }
bool ShmQueue::dequeue(int &cmd, std::string &json_data, bool wait)
bool ShmQueue::dequeue(int &cmd, std::string &json_data, int wait_ms)
{ {
if (wait) { bool something_came = _queue_sem->wait(wait_ms);
_queue_sem->wait(); if (!something_came) {
} else {
if (!_queue_sem->trywait()) {
return false; return false;
} }
}
_shm->lock(); _shm->lock();
ShmPlace item_place = _queue->first; ShmPlace item_place = _queue->first;

View File

@@ -37,10 +37,13 @@ public:
int depth(); int depth();
public: public:
bool dequeue(int &cmd, std::string &json_data, bool wait = false); bool dequeue(int &cmd, std::string &json_data, int wait_ms = 0);
void enqueue(int cmd, const std::string &json_data); void enqueue(int cmd, const std::string &json_data);
void enqueue(int cmd); void enqueue(int cmd);
public:
void takeOwnership();
public: public:
ShmQueue(Shm *shm, ShmSlot slot, bool owner); ShmQueue(Shm *shm, ShmSlot slot, bool owner);
~ShmQueue(); ~ShmQueue();