Simplify communication loop

This commit is contained in:
45Tatami 2022-01-05 18:32:18 +01:00
parent bcd22f1ab0
commit 9218d180ab
1 changed files with 33 additions and 51 deletions

View File

@ -35,23 +35,19 @@ using std::codecvt_utf8_utf16;
std::thread comm_thread; std::thread comm_thread;
wstring remote = L"localhost:30501"; wstring remote = L"localhost:30501";
wstring config_file_path;
HWND hwnd = NULL;
// Mutex/cv protects following vars
mutex conn_mut;
std::condition_variable conn_cv;
std::atomic<bool> comm_thread_run; std::atomic<bool> comm_thread_run;
std::atomic<bool> want_connect; std::atomic<bool> want_connect;
mutex connect_mut;
std::condition_variable connect_cv;
std::deque<wstring> msg_q; std::deque<wstring> msg_q;
mutex msg_q_mut;
std::condition_variable msg_q_cv;
SOCKET _connect(); SOCKET _connect();
bool _send(SOCKET &, string const &); bool _send(SOCKET &, string const &);
wstring config_file_path;
HWND hwnd = NULL;
wstring getEditBoxText(HWND hndl, int item) { wstring getEditBoxText(HWND hndl, int item) {
if (hwnd == NULL) if (hwnd == NULL)
return L""; return L"";
@ -101,7 +97,7 @@ void write_config_val(LPCSTR key, LPCSTR val)
void toggle_want_connect() void toggle_want_connect()
{ {
unique_lock<mutex> conn_lk{connect_mut}; unique_lock<mutex> conn_lk{conn_mut};
want_connect = !want_connect; want_connect = !want_connect;
@ -118,20 +114,11 @@ void toggle_want_connect()
SendMessage(edit, EM_SETREADONLY, FALSE, NULL); SendMessage(edit, EM_SETREADONLY, FALSE, NULL);
} }
connect_cv.notify_one(); conn_cv.notify_one();
// We're not modifying the queue but the connection thread might currently
// be waiting on it so we need to notify it too
unique_lock<mutex> q_lk{msg_q_mut};
msg_q_cv.notify_one();
} }
/** /**
* Connect to remote and wait for messages in queue to send until comm_thread_run is false * Connect to remote and wait for messages in queue to send until comm_thread_run is false
* TODO The main loop uses 2 condition variables and pretty much 3 protected
* variables: the comm_thread_run, the queue, and connection wanted. The current
* approach should be thread safe but it is ugly and easy to break on changes
* in the loop. One condition variable for all three could work better.
*/ */
void comm_loop() void comm_loop()
{ {
@ -147,43 +134,43 @@ void comm_loop()
SOCKET sock = INVALID_SOCKET; SOCKET sock = INVALID_SOCKET;
// Note: The communication thread is one big if/elseif/else. We keep the
// mutex locked except when waiting for an event after which we will loop
// again to see what happened and on long operations like connection
// attempts or sending data.
// This allows protecting muliple variables without performance problems.
comm_thread_run = true; comm_thread_run = true;
unique_lock<mutex> lk{conn_mut};
while (comm_thread_run) { while (comm_thread_run) {
// If we are not connected, try to connect if should, wait if we shouldn't // If we are not connected, try to connect if wanted, wait if we don't
// If we are, but shouldn't, disconnect
unique_lock<mutex> conn_lk{connect_mut};
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {
if (want_connect) { if (want_connect) {
conn_lk.unlock(); // Don't lock for connect lk.unlock(); // Don't lock for connect
sock = _connect(); sock = _connect();
lk.lock();
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {
log("Connection failed. Retrying soon."); log("Connection failed. Retrying soon.");
conn_lk.lock(); conn_cv.wait_for(lk, 1000ms);
connect_cv.wait_for(conn_lk, 1000ms);
} else { } else {
log("Successfully connected"); log("Successfully connected");
} }
} else { } else {
connect_cv.wait(conn_lk); conn_cv.wait(lk);
} }
continue; // If we are connected, but shouldn't be, disconnect
} if (!want_connect) { } else if (!want_connect) {
log("Disconnecting"); log("Disconnecting");
closesocket(sock); closesocket(sock);
sock = INVALID_SOCKET; sock = INVALID_SOCKET;
continue; // If we are connected but there's no data available, wait
}
conn_lk.unlock();
unique_lock<mutex> lk{msg_q_mut};
if (!comm_thread_run) {
continue; // Need to check again as otherwise we could wait forever
} else if (msg_q.empty()) { } else if (msg_q.empty()) {
msg_q_cv.wait(lk); conn_cv.wait(lk);
// We are connected and there is data available
} else { } else {
// Remove first element, unlock, push back on error // Remove first element, unlock, push back on error
wstring msg = msg_q.front(); wstring msg = msg_q.front();
msg_q.pop_front(); msg_q.pop_front();
lk.unlock(); lk.unlock();
string msg_utf8 = string msg_utf8 =
@ -199,6 +186,8 @@ void comm_loop()
msg_q.push_front(msg); msg_q.push_front(msg);
lk.unlock(); lk.unlock();
} }
lk.lock();
} }
} }
@ -291,16 +280,11 @@ BOOL WINAPI DllMain(HMODULE hModule, DWORD ul_reason_for_call, LPVOID lpReserved
break; break;
case DLL_PROCESS_DETACH: case DLL_PROCESS_DETACH:
{ {
unique_lock<mutex> lk_conn{connect_mut}; unique_lock<mutex> lk{conn_mut};
unique_lock<mutex> lk_q{msg_q_mut};
comm_thread_run = false; comm_thread_run = false;
lk.unlock();
lk_conn.unlock(); conn_cv.notify_one();
lk_q.unlock();
connect_cv.notify_one();
msg_q_cv.notify_one();
if (comm_thread.joinable()) if (comm_thread.joinable())
comm_thread.join(); comm_thread.join();
@ -360,9 +344,6 @@ SOCKET _connect() {
freeaddrinfo(result); freeaddrinfo(result);
if (sock != INVALID_SOCKET)
log("Connection success");
return sock; return sock;
} }
@ -397,13 +378,14 @@ bool _send(SOCKET &sock, string const &msg) {
bool ProcessSentence(wstring & sentence, SentenceInfo sentenceInfo) bool ProcessSentence(wstring & sentence, SentenceInfo sentenceInfo)
{ {
if (sentenceInfo["current select"]) { if (sentenceInfo["current select"]) {
lock_guard<mutex> lock{ msg_q_mut }; log("Received sentence");
lock_guard<mutex> lock{conn_mut};
if (msg_q.size() >= MSG_Q_CAP) if (msg_q.size() >= MSG_Q_CAP)
msg_q.pop_front(); msg_q.pop_front();
msg_q.push_back(wstring{ sentence }); msg_q.push_back(wstring{ sentence });
msg_q_cv.notify_one(); conn_cv.notify_one();
} }
return false; return false;