Adding Activity Logs (SSE Part 2)
I thought it would make the site feel more "alive" if users could see activity, such as when another user views or likes an article.
To accomplish this, I used Server-Sent Events (SSE).
The Initial Router Changes
We already have an existing SSE handler and route for our API:
1rootMux.HandleFunc("GET /api/streams/stats", h.HandleStreamStats)
It's tempting to add another route:
1// tempting to add
2rootMux.HandleFunc("GET /api/streams/activities", h.HandleStreamActivities)
We don't want to be wasteful. Although HTTP2 will multiplex connections to the same client, this actually breaks our local environment, which doesn't run via HTTPS. The browser can only have 6 outgoing connections to one domain at once, and if we added another endpoint, we'd be using 2 of our 6 budget.
What's even worse is that when a user switches pages, the browser is lazy about cleaning up existing SSE connections. That means in our local environment, which uses HTTP1.1, we only need to visit 3 pages before our connection allowance is completely spent. The result is pages that hang indefinitely.
Because of this, in order to conserve resources, we reuse the same connection we already have for sending users stats (uptime, memory, etc) in order to also send them activity. This will require a refactor:
1// old
2rootMux.HandleFunc("GET /api/streams/stats", h.HandleStreamStats)
3
4// new
5rootMux.HandleFunc("GET /api/streams", h.HandleStream)
The Handler
Since we're modifying our SSE to serve two different features via one connection, we need to refactor our Handler, which currently assumes one SSE connection = 1 feature.
1// old:
2func (h *Handler) HandleStreamStats(w http.ResponseWriter, r *http.Request) {
3 // ...
4}
5
6// new:
7func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
8 flusher, ok := setStreamHeaders(w)
9 if !ok {
10 return
11 }
12
13 flusher.Flush()
14
15 h.streamData(flusher, w, r)
16}
streamData handles sending all of our data through the same stream. Here is what it looks like:
1func (h *Handler) streamData(flusher http.Flusher, w http.ResponseWriter, r *http.Request) {
2 ticker := time.NewTicker(1 * time.Second)
3 defer ticker.Stop()
4
5 if err := h.sendStats(w, flusher); err != nil {
6 return
7 }
8
9 id := fmt.Sprintf("%s-%d", r.RemoteAddr, time.Now().UnixNano())
10 sub := make(chan string, 10)
11
12 h.monitor.Subscribe(id, sub)
13 defer h.monitor.Unsubscribe(id)
14
15 for {
16 select {
17 case <-r.Context().Done():
18 return
19 case <-ticker.C:
20 if err := h.sendStats(w, flusher); err != nil {
21 return
22 }
23 case action := <-sub:
24 data, _ := json.Marshal(AppActivity{Action: action})
25
26 _, err := fmt.Fprintf(w, "data: %s\n\n", data)
27 if err != nil {
28 return
29 }
30 flusher.Flush()
31 }
32 }
33}
It's important to note that sendStats is not new. That's previously what we used in the handler for sending telemetry data to the user every second. The new functionality is specifically in this case:
1case action := <-sub:
2 data, _ := json.Marshal(AppActivity{Action: action})
3
4 _, err := fmt.Fprintf(w, "data: %s\n\n", data)
5 if err != nil {
6 return
7 }
8 flusher.Flush()
9
I've annotated the infinite for loop to explain the code:
1 for {
2 // All cases are in one select. It's tempting to run cases 2 and 3 in their own goroutines.
3 // However, http connections are not thread safe, so we need a single threaded solution.
4 select {
5 // User canceled the request. Exit.
6 case <-r.Context().Done():
7 return
8 // Every 1 second, stream stats to the client.
9 case <-ticker.C:
10 if err := h.sendStats(w, flusher); err != nil {
11 return
12 }
13 // An activity was received from the subscription, stream it.
14 case action := <-sub:
15 data, _ := json.Marshal(AppActivity{Action: action})
16
17 _, err := fmt.Fprintf(w, "data: %s\n\n", data)
18 if err != nil {
19 return
20 }
21 flusher.Flush()
22 }
23 }
Notice in the same function we create a subscription channel to listen for updates:
1id := fmt.Sprintf("%s-%d", r.RemoteAddr, time.Now().UnixNano())
2sub := make(chan string, 10)
3
4h.monitor.Subscribe(id, sub)
5defer h.monitor.Unsubscribe(id)
Now, I will go over the code for h.monitor, which is of type activity.Monitor.
Monitor
The code looks like this:
1package activity
2
3import "sync"
4
5type Monitor struct {
6 subscribers map[string]chan<- string
7 mu *sync.RWMutex
8}
9
10func NewMonitor() *Monitor {
11 subscribers := make(map[string]chan<- string)
12 var mu sync.RWMutex
13 return &Monitor{subscribers: subscribers, mu: &mu}
14}
15
16func (m *Monitor) PublishActivity(activity string) {
17 m.mu.RLock()
18 defer m.mu.RUnlock()
19 for _, sub := range m.subscribers {
20 select {
21 case sub <- activity:
22 continue
23 default:
24 continue
25 }
26 }
27}
28
29func (m *Monitor) Subscribe(id string, sub chan<- string) {
30 m.mu.Lock()
31 defer m.mu.Unlock()
32 m.subscribers[id] = sub
33}
34
35func (m *Monitor) Unsubscribe(id string) {
36 m.mu.Lock()
37 defer m.mu.Unlock()
38 delete(m.subscribers, id)
39}
Notice the monitor has a subscribers member variable, which is used to keep track of a set of HTTP connections that are listening for activity.
The Subscribe method is used to add a new subscriber. Unsubscribe, of course, does the opposite.
1func (m *Monitor) Subscribe(id string, sub chan<- string) {
2 m.mu.Lock()
3 defer m.mu.Unlock()
4 m.subscribers[id] = sub
5}
We use a sync.RWMutex to protect the subscribers map. This is because it's possible for multiple threads to try to subscribe at the same time (for example: 2 users connect at the same moment). Without the mutex, we could cause data corruption or panics. We specifically use a RWMutex instead of a plain Mutex because we anticipate that we're going to be reading from subscribers far more often than writing to it.
The PublishActivity method looks like this:
1func (m *Monitor) PublishActivity(activity string) {
2 m.mu.RLock()
3 defer m.mu.RUnlock()
4 // notify all subscribers of this activity
5 for _, sub := range m.subscribers {
6 select {
7 // the mechanism we use to send the activity to the subscriber
8 case sub <- activity:
9 continue
10 default:
11 // client is slow for some reason: drop this action
12 continue
13 }
14 }
15}
Misc Changes
We need to modify our middleware to accept the monitor object. All of our traffic flows through the middleware, so it's the perfect place to listen and publish activity:
1func WithLogger(next http.Handler, log *slog.Logger, monitor *activity.Monitor) http.Handler {
2 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3 metrics := httpsnoop.CaptureMetrics(next, w, r)
4 if metrics.Code == 200 {
5 activityStr := fmt.Sprintf("Anonymous user: %s %s, duration: %s", r.Method, r.URL.Path, metrics.Duration)
6 go monitor.PublishActivity(activityStr) // don't block request
7 }
8 log.Info("http response data",
9 "method", r.Method,
10 "path", r.URL.Path,
11 "bytes", metrics.Written,
12 "status_code", metrics.Code,
13 "duration", metrics.Duration,
14 )
15 })
16}
Besides that, we need to modify the handler to also accept the same *monitor object.
An Aside: Cleaning up Zombie Connections on the Frontend
I mentioned previously how the browser is lazy about closing SSE connections. This causes issues when developing locally. In addition to that, since our app is a Multi-Page app, we create a new SSE connection on every page load. This is a waste of resources as well. To fix it we add a script to the <head> of our layout.html, which is applied to every page:
1<script>
2 // handles the initial page load
3 if (!window.globalStream) {
4 window.globalStream = new EventSource('/api/streams');
5 }
6 // handles when the user presses the "back" button
7 window.addEventListener('pageshow', (event) => {
8 if (!window.globalStream) {
9 window.globalStream = new EventSource('/api/streams');
10 }
11 });
12 // force the browser to clean up our zombie connections when the user navigates to another page
13 window.addEventListener('beforeunload', () => {
14 if (window.globalStream) {
15 window.globalStream.close();
16 window.globalStream = null;
17 }
18 });
19</script>
Conclusion
Now the home page displays anonymous activity to all users!