From 19e636e74e3fa2b2ea3c2b9add1bc9a93b12f2b5 Mon Sep 17 00:00:00 2001 From: David Florness Date: Sat, 12 Mar 2022 16:49:36 -0500 Subject: [PATCH] Fix race condition in processing logic This fixes the following nil pointer panic that happens when a message is being processed but also requested in event.go. I have no idea what I was thinking with the `goto process` code in getAndHandleEvent ... waiting for evals... panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x190 pc=0x9eb25b] goroutine 1 [running]: tallyard.xyz/election.(*ElectionsMap).SetupEventHooks.func1.1(0x0, 0xc0010589c0) /home/matrix/src/election/msg.go:153 +0x5b tallyard.xyz/election.(*EventStore).getAndHandleEvent(0xc000112340, 0xc000130030, 0x21, 0xc001a63b30, 0x2c, 0xc0b2f8, 0x12, 0x1, 0xc0014481e0, 0x0, ...) /home/matrix/src/election/event.go:222 +0x176 tallyard.xyz/election.(*EventStore).GetEvalsEvent(0xc000112340, 0xc000130030, 0x21, 0xc001a63b30, 0x2c, 0xc00025a030) /home/matrix/src/election/event.go:163 +0x98 tallyard.xyz/election.(*Election).SendSum(0xc001a80a00, 0xc000022620, 0xc000112340, 0x1, 0x1) /home/matrix/src/election/voter.go:457 +0x4e4 main.main() /home/matrix/src/cmd/tallyard/main.go:181 +0xb46 --- election/event.go | 25 +++++++++++++++------- election/msg.go | 54 +++++++++++++++++++++++++++-------------------- 2 files changed, 48 insertions(+), 31 deletions(-) diff --git a/election/event.go b/election/event.go index 8a3a1a1..b183f42 100644 --- a/election/event.go +++ b/election/event.go @@ -24,7 +24,7 @@ type EventStore struct { Client *mautrix.Client `json:"-"` EventHandlers map[event.Type]func(*event.Event)bool `json:"-"` - Processing map[id.EventID]struct{} `json:"-"` + Processing map[id.EventID]<-chan struct{} `json:"-"` } func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*event.Event)bool) *EventStore { @@ -33,7 +33,7 @@ func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*ev Client: client, EventHandlers: eventHandlers, - Processing: make(map[id.EventID]struct{}), + Processing: make(map[id.EventID]<-chan struct{}), } } @@ -190,11 +190,10 @@ func (store *EventStore) GetSumEvent(roomID id.RoomID, sumID id.EventID) *SumEve } func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, eventType event.Type) (*event.Event, error) { - var err error // see if we've handled this event before store.RLock() evt, wasProcessed := store.Events[eventID] - _, isProcessing := store.Processing[eventID] + done, isProcessing := store.Processing[eventID] store.RUnlock() if wasProcessed { // This event was seen before. If evt != nil, it was handled @@ -202,11 +201,22 @@ func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, return evt, nil } if isProcessing { - // don't fetch and wait for result from handler - goto process + // wait for result from handler and skip fetch + log.Debugf("event %s is processing; waiting for processing to finish", eventID) + <-done + store.RLock() + defer store.RUnlock() + evt, wasProcessed = store.Events[eventID] + if !wasProcessed { + return nil, fmt.Errorf("event %s isn't in events map despite seeming to be processed", eventID) + } + if evt == nil { + return nil, fmt.Errorf("event %s failed processing while we were waiting for it", eventID) + } + return evt, nil } // we've never seen this event before - evt, err = store.fetchEvent(roomID, eventID) + evt, err := store.fetchEvent(roomID, eventID) if err != nil { return nil, fmt.Errorf("couldn't fetch %s event '%s': %s", eventType.Type, eventID, err) } @@ -218,7 +228,6 @@ func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, if err != nil { return nil, fmt.Errorf("couldn't parse %s event '%s' content: %s", eventType.Type, eventID, err) } -process: if !store.EventHandlers[eventType](evt) { return nil, fmt.Errorf("couldn't handle %s event '%s'", eventType.Type, eventID) } diff --git a/election/msg.go b/election/msg.go index 4412212..49fad4c 100644 --- a/election/msg.go +++ b/election/msg.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "fmt" "reflect" - "time" "github.com/consensys/gnark-crypto/ecc" "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" @@ -140,51 +139,60 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma } else { eventStore.Client = client eventStore.EventHandlers = eventHandlers - eventStore.Processing = make(map[id.EventID]struct{}) + eventStore.Processing = make(map[id.EventID]<-chan struct{}) } - // TODO: this is a freaking mess wrapper := func(f func(*event.Event, *mautrix.Client) bool) func(*event.Event) bool { return func(evt *event.Event) bool { if evt == nil { - // TODO: this is only here to help find a bug - log.Debugf("evt is nil") + // this is only here to make potential + // bug-hunting easier + log.Error("evt is nil") + return false } if evt.Unsigned.RedactedBecause != nil { log.Debugf("event %s was redacted", evt.ID) return false } + + // don't process events that have been (or are being) + // processed already eventStore.Lock() - _, evtIsProcessing := eventStore.Processing[evt.ID] - handledEvent, eventHandled := eventStore.Events[evt.ID] - if evtIsProcessing { - eventStore.Unlock() - log.Debugf("event %s is being processed; awaiting success result", evt.ID) - for !eventHandled { - eventStore.RLock() - handledEvent, eventHandled = eventStore.Events[evt.ID] - eventStore.RUnlock() - time.Sleep(time.Millisecond * 50) - } - return handledEvent != nil - } else if eventHandled { - eventStore.Unlock() + handledEvent, wasProcessed := eventStore.Events[evt.ID] + doneRChan, isProcessing := eventStore.Processing[evt.ID] + if wasProcessed { + defer eventStore.Unlock() log.Debugf("event %s was already handled", evt.ID) return handledEvent != nil - } else { - eventStore.Processing[evt.ID] = struct{}{} + } + if isProcessing { eventStore.Unlock() + // event is already being processed, so wait for + // success result and return it + log.Debugf("event %s is being processed; awaiting success result", evt.ID) + <-doneRChan + eventStore.RLock() + defer eventStore.RUnlock() + handledEvent, wasProcessed = eventStore.Events[evt.ID] + return handledEvent != nil } + done := make(chan struct{}) + eventStore.Processing[evt.ID] = done + eventStore.Unlock() + + // process event success := f(evt, client) + + // store event and pass it to any routines waiting for it eventStore.Lock() - // see EventStore doc for success explanation + defer eventStore.Unlock() if success { eventStore.Events[evt.ID] = evt } else { eventStore.Events[evt.ID] = nil } delete(eventStore.Processing, evt.ID) - eventStore.Unlock() + close(done) return success } } -- 2.38.4