From 8188adf0f1a0482c269d1eb6350dd91dddc9ed29 Mon Sep 17 00:00:00 2001
From: Ben Gamari <ben@smart-cactus.org>
Date: Fri, 18 Dec 2020 10:14:09 -0500
Subject: [PATCH] eventlog: Fix various races

Previously the eventlog infrastructure had a couple of races that could
pop up when using the startEventLog/endEventLog interfaces. In
particular, stopping and then later restarting logging could result in
data preceding the eventlog header, breaking the integrity of the
stream.

To fix this we rework the invariants regarding the eventlog and
generally tighten up the concurrency control surrounding starting and
stopping of logging.

We also fix an unrelated bug, wherein log events from disabled
capabilities could end up never flushed.
---
 rts/RtsStartup.c                            |   4 +
 rts/Trace.h                                 |  14 +-
 rts/eventlog/EventLog.c                     | 134 +++++++++++++++++---
 rts/eventlog/EventLog.h                     |   3 +
 testsuite/tests/rts/InitEventLogging.stdout |   1 -
 5 files changed, 136 insertions(+), 20 deletions(-)

diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c
index 818fbaa34645..bd8e5d57334c 100644
--- a/rts/RtsStartup.c
+++ b/rts/RtsStartup.c
@@ -507,6 +507,10 @@ hs_exit_(bool wait_foreign)
     // also outputs the stats (+RTS -s) info.
     exitStorage();
 
+    /* flush and clean up capabilities' eventlog buffers before cleaning up
+     * scheduler */
+    finishCapEventLogging();
+
     /* free the tasks */
     freeScheduler();
 
diff --git a/rts/Trace.h b/rts/Trace.h
index cad83363d3a5..08b42fe9bda8 100644
--- a/rts/Trace.h
+++ b/rts/Trace.h
@@ -8,7 +8,7 @@
 
 #pragma once
 
-#include "rts/EventLogFormat.h"
+#include "eventlog/EventLog.h"
 #include "sm/NonMovingCensus.h"
 #include "Capability.h"
 
@@ -616,6 +616,18 @@ INLINE_HEADER void traceCapDisable(Capability *cap STG_UNUSED)
 {
     traceCapEvent(cap, EVENT_CAP_DISABLE);
     dtraceCapDisable((EventCapNo)cap->no);
+
+    // Ensure that the eventlog buffer is flushed since otherwise its events
+    // may never make it to the output stream.
+    // See Note [Eventlog concurrency].
+#if defined(TRACING)
+    if (eventlog_enabled) {
+        flushLocalEventsBuf(cap);
+    }
+# else
+    flushLocalEventsBuf(cap);
+#endif
+
 }
 
 INLINE_HEADER void traceEventThreadWakeup(Capability *cap       STG_UNUSED,
diff --git a/rts/eventlog/EventLog.c b/rts/eventlog/EventLog.c
index ddd2cc02df13..876f1ff7b60e 100644
--- a/rts/eventlog/EventLog.c
+++ b/rts/eventlog/EventLog.c
@@ -27,7 +27,65 @@
 #include <unistd.h>
 #endif
 
-bool eventlog_enabled;
+Mutex state_change_mutex;
+bool eventlog_enabled; // protected by state_change_mutex to ensure
+                       // serialisation of calls to
+                       // startEventLogging/endEventLogging
+
+/* Note [Eventlog concurrency]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ * The eventlog is designed to handle high rates of concurrent event posting by
+ * multiple capabilities. For this reason, each capability has its own local
+ * event buffer, which is flushed when filled.
+ *
+ * Additionally, there is a single global event buffer (`eventBuf`), which is
+ * used for various administrative things (e.g. posting the header) and in
+ * cases where we want to post events yet don't hold a capability.
+ *
+ * Whether or not events are posted is determined by the global flag
+ * eventlog_enabled.  Naturally, starting and stopping of logging are a bit
+ * subtle. In particular, we need to ensure that the header is the *first*
+ * thing to appear in the event stream. To ensure that multiple threads don't
+ * race to start/stop logging we protect eventlog_enabled with
+ * state_change_mutex. Moreover, we only set eventlog_enabled *after* we have
+ * posted the header.
+ *
+ * Event buffers uphold the invariant that they always begin with a start-block
+ * marker. This is enforced by calls to postBlockMarker in:
+ *
+ *  - initEventsBufs (during buffer initialization)
+ *  - printAndClearEventBuf (after the buffer is filled)
+ *  - moreCapEventBufs (when the number of capabilities changes)
+ *
+ * The one place where we *don't* want a block marker is when posting the
+ * eventlog header. We achieve this by first resetting the eventlog buffer
+ * before posting the header (in postHeader).
+ *
+ * Stopping eventlogging is a bit involved:
+ *
+ *  1. first take state_change_mutex, to ensure we don't race with another
+ *     thread to stop logging.
+ *  2. disable eventlog_enabled, to ensure that no capabilities post further
+ *     events
+ *  3. request that all capabilities flush their eventlog buffers. This
+ *     achieves two things: (a) ensures that all events make it to the output
+ *     stream, and (b) serves as a memory barrier, ensuring that all
+ *     capabilities see that eventlogging is now disabled
+ *  4. wait until all capabilities have flushed.
+ *  5. post the end-of-data marker
+ *  6. stop the writer
+ *  7. release state_change_mutex
+ *
+ * Note that a corrollary of this is that !eventlog_enabled implies that the
+ * eventlog buffers are all empty (modulo the block marker that all buffers
+ * always have).
+ *
+ * Changing the number of capabilities is fairly straightforward since we hold
+ * all capabilities when the capability count is changed. The one slight corner
+ * case is that we must ensure that the buffers of any disabled capabilities are
+ * flushed, lest their events are stuck in limbo. This is achieved with a call to
+ * flushLocalEventsBuf in traceCapDisable.
+ */
 
 static const EventLogWriter *event_log_writer = NULL;
 
@@ -36,6 +94,8 @@ static const EventLogWriter *event_log_writer = NULL;
 static int flushCount;
 
 // Struct for record keeping of buffer to store event types and events.
+//
+// Invariant: The event buffer will always begin with a block-start marker.
 typedef struct _EventsBuf {
   StgInt8 *begin;
   StgInt8 *pos;
@@ -512,6 +572,10 @@ init_event_types(void)
 static void
 postHeaderEvents(void)
 {
+    // The header must appear first in the output stream, without the
+    // the block start marker we previously added in printAndClearEventBuf.
+    resetEventsBuf(&eventBuf);
+
     // Write in buffer: the header begin marker.
     postInt32(&eventBuf, EVENT_HEADER_BEGIN);
 
@@ -571,6 +635,7 @@ initEventLogging()
     initEventsBuf(&eventBuf, EVENT_LOG_SIZE, (EventCapNo)(-1));
 #if defined(THREADED_RTS)
     initMutex(&eventBufMutex);
+    initMutex(&state_change_mutex);
 #endif
 }
 
@@ -589,31 +654,39 @@ startEventLogging_(void)
 {
     initEventLogWriter();
 
+    ACQUIRE_LOCK(&eventBufMutex);
     postHeaderEvents();
 
-    // Flush capEventBuf with header.
     /*
      * Flush header and data begin marker to the file, thus preparing the
      * file to have events written to it.
      */
     printAndClearEventBuf(&eventBuf);
 
-    for (uint32_t c = 0; c < get_n_capabilities(); ++c) {
-        postBlockMarker(&capEventBuf[c]);
-    }
+    RELEASE_LOCK(&eventBufMutex);
+
     return true;
 }
 
 bool
 startEventLogging(const EventLogWriter *ev_writer)
 {
+    // Fail early if we race with another thread.
+    if (TRY_ACQUIRE_LOCK(&state_change_mutex) != 0) {
+        return false;
+    }
+
+    // Check whether eventlogging has already been enabled.
     if (eventlog_enabled || event_log_writer) {
+        RELEASE_LOCK(&state_change_mutex);
         return false;
     }
 
-    eventlog_enabled = true;
     event_log_writer = ev_writer;
-    return startEventLogging_();
+    bool ret = startEventLogging_();
+    eventlog_enabled = true;
+    RELEASE_LOCK(&state_change_mutex);
+    return ret;
 }
 
 // Called during forkProcess in the child to restart the eventlog writer.
@@ -628,18 +701,42 @@ restartEventLogging(void)
     }
 }
 
+// Flush and free capability eventlog buffers in preparation for RTS shutdown.
+void
+finishCapEventLogging(void)
+{
+    if (eventlog_enabled) {
+        // Flush all events remaining in the capabilities' buffers and free them.
+        // N.B. at this point we hold all capabilities.
+        for (uint32_t c = 0; c < n_capabilities; ++c) {
+            if (capEventBuf[c].begin != NULL) {
+                printAndClearEventBuf(&capEventBuf[c]);
+                stgFree(capEventBuf[c].begin);
+            }
+        }
+    }
+}
+
 void
 endEventLogging(void)
 {
-    if (!eventlog_enabled)
+    ACQUIRE_LOCK(&state_change_mutex);
+    if (!eventlog_enabled) {
+        RELEASE_LOCK(&state_change_mutex);
         return;
+    }
+
+    eventlog_enabled = false;
 
     // Flush all events remaining in the buffers.
-    for (uint32_t c = 0; c < n_capabilities; ++c) {
-        printAndClearEventBuf(&capEventBuf[c]);
+    //
+    // N.B. Don't flush if shutting down: this was done in
+    // finishCapEventLogging and the capabilities have already been freed.
+    if (sched_state != SCHED_SHUTTING_DOWN) {
+        flushEventLog(NULL);
     }
-    printAndClearEventBuf(&eventBuf);
-    resetEventsBuf(&eventBuf); // we don't want the block marker
+
+    ACQUIRE_LOCK(&eventBufMutex);
 
     // Mark end of events (data).
     postEventTypeNum(&eventBuf, EVENT_DATA_END);
@@ -647,11 +744,15 @@ endEventLogging(void)
     // Flush the end of data marker.
     printAndClearEventBuf(&eventBuf);
 
+    RELEASE_LOCK(&eventBufMutex);
+
     stopEventLogWriter();
     event_log_writer = NULL;
-    eventlog_enabled = false;
+
+    RELEASE_LOCK(&state_change_mutex);
 }
 
+/* N.B. we hold all capabilities when this is called */
 void
 moreCapEventBufs (uint32_t from, uint32_t to)
 {
@@ -663,6 +764,7 @@ moreCapEventBufs (uint32_t from, uint32_t to)
                                      "moreCapEventBufs");
     }
 
+    // Initialize buffers for new capabilities
     for (uint32_t c = from; c < to; ++c) {
         initEventsBuf(&capEventBuf[c], EVENT_LOG_SIZE, c);
     }
@@ -680,11 +782,6 @@ moreCapEventBufs (uint32_t from, uint32_t to)
 void
 freeEventLogging(void)
 {
-    // Free events buffer.
-    for (uint32_t c = 0; c < n_capabilities; ++c) {
-        if (capEventBuf[c].begin != NULL)
-            stgFree(capEventBuf[c].begin);
-    }
     if (capEventBuf != NULL)  {
         stgFree(capEventBuf);
     }
@@ -1571,6 +1668,7 @@ void initEventsBuf(EventsBuf* eb, StgWord64 size, EventCapNo capno)
     eb->size = size;
     eb->marker = NULL;
     eb->capno = capno;
+    postBlockMarker(eb);
 }
 
 void resetEventsBuf(EventsBuf* eb)
diff --git a/rts/eventlog/EventLog.h b/rts/eventlog/EventLog.h
index e78db809c7f6..a412b491bbf5 100644
--- a/rts/eventlog/EventLog.h
+++ b/rts/eventlog/EventLog.h
@@ -26,6 +26,7 @@ extern bool eventlog_enabled;
 
 void initEventLogging(void);
 void restartEventLogging(void);
+void finishCapEventLogging(void);
 void freeEventLogging(void);
 void abortEventLogging(void); // #4512 - after fork child needs to abort
 void moreCapEventBufs (uint32_t from, uint32_t to);
@@ -182,6 +183,8 @@ void postTickyCounterSamples(StgEntCounter *p);
 
 #else /* !TRACING */
 
+INLINE_HEADER void finishCapEventLogging(void) {}
+
 INLINE_HEADER void flushLocalEventsBuf(Capability *cap STG_UNUSED)
 { /* nothing */ }
 
diff --git a/testsuite/tests/rts/InitEventLogging.stdout b/testsuite/tests/rts/InitEventLogging.stdout
index ef5ce6f2d398..2729aa1c32bd 100644
--- a/testsuite/tests/rts/InitEventLogging.stdout
+++ b/testsuite/tests/rts/InitEventLogging.stdout
@@ -3,5 +3,4 @@ init
 write
 write
 write
-write
 stop
-- 
GitLab