00%
blog.info()
← Back to Home
SEQUENCE // Building The Blog

Adding Activity Logs (SSE Part 2)

Author Thorn Hall
0

I thought it would make the site feel more "alive" if users could see activity, such as when another user views or likes an article.

To accomplish this, I used Server-Sent Events (SSE).

The Initial Router Changes

We already have an existing SSE handler and route for our API:

1rootMux.HandleFunc("GET /api/streams/stats", h.HandleStreamStats)

It's tempting to add another route:

1// tempting to add
2rootMux.HandleFunc("GET /api/streams/activities", h.HandleStreamActivities)

We don't want to be wasteful. Although HTTP2 will multiplex connections to the same client, this actually breaks our local environment, which doesn't run via HTTPS. The browser can only have 6 outgoing connections to one domain at once, and if we added another endpoint, we'd be using 2 of our 6 budget.

What's even worse is that when a user switches pages, the browser is lazy about cleaning up existing SSE connections. That means in our local environment, which uses HTTP1.1, we only need to visit 3 pages before our connection allowance is completely spent. The result is pages that hang indefinitely.

Because of this, in order to conserve resources, we reuse the same connection we already have for sending users stats (uptime, memory, etc) in order to also send them activity. This will require a refactor:

1// old
2rootMux.HandleFunc("GET /api/streams/stats", h.HandleStreamStats)
3
4// new
5rootMux.HandleFunc("GET /api/streams", h.HandleStream)

The Handler

Since we're modifying our SSE to serve two different features via one connection, we need to refactor our Handler, which currently assumes one SSE connection = 1 feature.

 1// old:
 2func (h *Handler) HandleStreamStats(w http.ResponseWriter, r *http.Request) {
 3    // ...
 4}
 5
 6// new: 
 7func (h *Handler) HandleStream(w http.ResponseWriter, r *http.Request) {
 8	flusher, ok := setStreamHeaders(w)
 9	if !ok {
10		return
11	}
12
13	flusher.Flush()
14
15	h.streamData(flusher, w, r)
16}

streamData handles sending all of our data through the same stream. Here is what it looks like:

 1func (h *Handler) streamData(flusher http.Flusher, w http.ResponseWriter, r *http.Request) {
 2	ticker := time.NewTicker(1 * time.Second)
 3	defer ticker.Stop()
 4
 5	if err := h.sendStats(w, flusher); err != nil {
 6		return
 7	}
 8
 9	id := fmt.Sprintf("%s-%d", r.RemoteAddr, time.Now().UnixNano())
10	sub := make(chan string, 10)
11
12	h.monitor.Subscribe(id, sub)
13	defer h.monitor.Unsubscribe(id)
14
15	for {
16		select {
17		case <-r.Context().Done():
18			return
19		case <-ticker.C:
20			if err := h.sendStats(w, flusher); err != nil {
21				return
22			}
23		case action := <-sub:
24			data, _ := json.Marshal(AppActivity{Action: action})
25
26			_, err := fmt.Fprintf(w, "data: %s\n\n", data)
27			if err != nil {
28				return
29			}
30			flusher.Flush()
31		}
32	}
33}

It's important to note that sendStats is not new. That's previously what we used in the handler for sending telemetry data to the user every second. The new functionality is specifically in this case:

1case action := <-sub:
2    data, _ := json.Marshal(AppActivity{Action: action})
3
4    _, err := fmt.Fprintf(w, "data: %s\n\n", data)
5    if err != nil {
6        return
7    }
8    flusher.Flush()
9

I've annotated the infinite for loop to explain the code:

 1	for {
 2        // All cases are in one select. It's tempting to run cases 2 and 3 in their own goroutines.
 3        // However, http connections are not thread safe, so we need a single threaded solution.
 4		select {
 5        // User canceled the request. Exit.
 6		case <-r.Context().Done():
 7			return
 8        // Every 1 second, stream stats to the client.
 9		case <-ticker.C:
10			if err := h.sendStats(w, flusher); err != nil {
11				return
12			}
13        // An activity was received from the subscription, stream it.
14		case action := <-sub:
15			data, _ := json.Marshal(AppActivity{Action: action})
16
17			_, err := fmt.Fprintf(w, "data: %s\n\n", data)
18			if err != nil {
19				return
20			}
21			flusher.Flush()
22		}
23	}

Notice in the same function we create a subscription channel to listen for updates:

1id := fmt.Sprintf("%s-%d", r.RemoteAddr, time.Now().UnixNano())
2sub := make(chan string, 10)
3
4h.monitor.Subscribe(id, sub)
5defer h.monitor.Unsubscribe(id)

Now, I will go over the code for h.monitor, which is of type activity.Monitor.

Monitor

The code looks like this:

 1package activity
 2
 3import "sync"
 4
 5type Monitor struct {
 6	subscribers map[string]chan<- string
 7	mu          *sync.RWMutex
 8}
 9
10func NewMonitor() *Monitor {
11	subscribers := make(map[string]chan<- string)
12	var mu sync.RWMutex
13	return &Monitor{subscribers: subscribers, mu: &mu}
14}
15
16func (m *Monitor) PublishActivity(activity string) {
17	m.mu.RLock()
18	defer m.mu.RUnlock()
19	for _, sub := range m.subscribers {
20		select {
21		case sub <- activity:
22			continue
23		default:
24			continue
25		}
26	}
27}
28
29func (m *Monitor) Subscribe(id string, sub chan<- string) {
30	m.mu.Lock()
31	defer m.mu.Unlock()
32	m.subscribers[id] = sub
33}
34
35func (m *Monitor) Unsubscribe(id string) {
36	m.mu.Lock()
37	defer m.mu.Unlock()
38	delete(m.subscribers, id)
39}

Notice the monitor has a subscribers member variable, which is used to keep track of a set of HTTP connections that are listening for activity.

The Subscribe method is used to add a new subscriber. Unsubscribe, of course, does the opposite.

1func (m *Monitor) Subscribe(id string, sub chan<- string) {
2	m.mu.Lock()
3	defer m.mu.Unlock()
4	m.subscribers[id] = sub
5}

We use a sync.RWMutex to protect the subscribers map. This is because it's possible for multiple threads to try to subscribe at the same time (for example: 2 users connect at the same moment). Without the mutex, we could cause data corruption or panics. We specifically use a RWMutex instead of a plain Mutex because we anticipate that we're going to be reading from subscribers far more often than writing to it.

The PublishActivity method looks like this:

 1func (m *Monitor) PublishActivity(activity string) {
 2	m.mu.RLock()
 3	defer m.mu.RUnlock()
 4    // notify all subscribers of this activity
 5	for _, sub := range m.subscribers {
 6		select {
 7        // the mechanism we use to send the activity to the subscriber
 8		case sub <- activity:
 9			continue
10		default:
11            // client is slow for some reason: drop this action
12			continue
13		}
14	}
15}

Misc Changes

We need to modify our middleware to accept the monitor object. All of our traffic flows through the middleware, so it's the perfect place to listen and publish activity:

 1func WithLogger(next http.Handler, log *slog.Logger, monitor *activity.Monitor) http.Handler {
 2	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 3		metrics := httpsnoop.CaptureMetrics(next, w, r)
 4		if metrics.Code == 200 {
 5			activityStr := fmt.Sprintf("Anonymous user: %s %s, duration: %s", r.Method, r.URL.Path, metrics.Duration)
 6			go monitor.PublishActivity(activityStr) // don't block request
 7		}
 8		log.Info("http response data",
 9			"method", r.Method,
10			"path", r.URL.Path,
11			"bytes", metrics.Written,
12			"status_code", metrics.Code,
13			"duration", metrics.Duration,
14		)
15	})
16}

Besides that, we need to modify the handler to also accept the same *monitor object.

An Aside: Cleaning up Zombie Connections on the Frontend

I mentioned previously how the browser is lazy about closing SSE connections. This causes issues when developing locally. In addition to that, since our app is a Multi-Page app, we create a new SSE connection on every page load. This is a waste of resources as well. To fix it we add a script to the <head> of our layout.html, which is applied to every page:

 1<script>
 2    // handles the initial page load
 3    if (!window.globalStream) {
 4        window.globalStream = new EventSource('/api/streams');
 5    }
 6    // handles when the user presses the "back" button
 7    window.addEventListener('pageshow', (event) => {
 8        if (!window.globalStream) {
 9            window.globalStream = new EventSource('/api/streams');
10        }
11    });
12    // force the browser to clean up our zombie connections when the user navigates to another page
13    window.addEventListener('beforeunload', () => {
14        if (window.globalStream) {
15            window.globalStream.close();
16            window.globalStream = null;
17        }
18    });
19</script>

Conclusion

Now the home page displays anonymous activity to all users!

View Abstract Syntax Tree (Build-Time Generated)
Document
Paragraph
Text "I thought it would make the..."
Text " article."
Paragraph
Text "To accomplish this, I used ..."
Text "(SSE)."
Heading
Text "The Initial Router"
Text " Changes"
Paragraph
Text "We already have an existing..."
Text " API:"
FencedCodeBlock code: "rootMux.HandleFun..."
Paragraph
Text "It's tempting to add another"
Text " route:"
FencedCodeBlock code: "// tempting to add "
Paragraph
Text "We don't want to be wastefu..."
CodeSpan
Text "6"
Text " outgoing connections to on..."
CodeSpan
Text "2"
Text " of our "
CodeSpan
Text "6"
Text " budget."
Paragraph
Text "What's even worse is that w..."
Text " indefinitely."
Paragraph
Text "Because of this, in order t..."
Text " refactor:"
FencedCodeBlock code: "// old "
Heading
Text "The"
Text " Handler"
Paragraph
Text "Since we're modifying our S..."
CodeSpan
Text "Handler"
Text ", which currently assumes o..."
Text " feature."
FencedCodeBlock code: "// old: "
Paragraph
CodeSpan
Text "streamData"
Text " handles sending all of our..."
Text " like:"
FencedCodeBlock code: "func (h *Handler)..."
Paragraph
Text "It's important to note that "
CodeSpan
Text "sendStats"
Text " is not new. That's previou..."
CodeSpan
Text "case"
Text ":"
FencedCodeBlock code: "case action := <-..."
Paragraph
Text "I've annotated the infinite "
CodeSpan
Text "for"
Text " loop to explain the"
Text " code:"
FencedCodeBlock code: " for { "
Paragraph
Text "Notice in the same function..."
CodeSpan
Text "channel"
Text " to listen for"
Text " updates:"
FencedCodeBlock code: "id := fmt.Sprintf..."
Paragraph
Text "Now, I will go over the cod..."
CodeSpan
Text "h.monitor"
Text ", which is of type "
CodeSpan
Text "activity.Monitor"
Text "."
Heading
Text "Monitor"
Paragraph
Text "The code looks like"
Text " this:"
FencedCodeBlock code: "package activity "
Paragraph
Text "Notice the monitor has a "
CodeSpan
Text "subscribers"
Text " member variable, which is ..."
Text " activity."
Paragraph
Text "The "
CodeSpan
Text "Subscribe"
Text " method is used to add a ne..."
CodeSpan
Text "Unsubscribe"
Text ", of course, does the"
Text " opposite."
FencedCodeBlock code: "func (m *Monitor)..."
Paragraph
Text "We use a "
CodeSpan
Text "sync.RWMutex"
Text " to protect the "
CodeSpan
Text "subscribers"
Text " map. This is because it's ..."
CodeSpan
Text "RWMutex"
Text " instead of a plain "
CodeSpan
Text "Mutex"
Text " because we anticipate that..."
CodeSpan
Text "subscribers"
Text " far more often than writin..."
Text " it."
Paragraph
Text "The "
CodeSpan
Text "PublishActivity"
Text " method looks like"
Text " this:"
FencedCodeBlock code: "func (m *Monitor)..."
Heading
Text "Misc"
Text " Changes"
Paragraph
Text "We need to modify our middl..."
CodeSpan
Text "monitor"
Text " object. All of our traffic..."
Text " activity:"
FencedCodeBlock code: "func WithLogger(n..."
Paragraph
Text "Besides that, we need to mo..."
CodeSpan
Text "handler"
Text " to also accept the same "
CodeSpan
Text "*monitor"
Text " object."
Heading
Text "An Aside: Cleaning up Zombi..."
Text " Frontend"
Paragraph
Text "I mentioned previously how ..."
CodeSpan
Text "<head>"
Text " of our "
CodeSpan
Text "layout.html"
Text ", which is applied to every"
Text " page:"
FencedCodeBlock code: "<script> "
Heading
Text "Conclusion"
Paragraph
Text "Now the home page displays ..."
Text "!"