From 5b8d3fa4e98c47db727d191f3726aa356b4d857f Mon Sep 17 00:00:00 2001 From: David Florness Date: Mon, 15 Feb 2021 14:22:47 -0500 Subject: [PATCH] Don't process an event when it's already being processed --- election/event.go | 15 ++++++++++++--- election/msg.go | 25 +++++++++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/election/event.go b/election/event.go index e387d1d..81af48d 100644 --- a/election/event.go +++ b/election/event.go @@ -24,6 +24,7 @@ type EventStore struct { Client *mautrix.Client `json:"-"` EventHandlers map[event.Type]func(*event.Event)bool `json:"-"` + Processing map[id.EventID]struct{} `json:"-"` } func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*event.Event)bool) *EventStore { @@ -32,6 +33,7 @@ func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*ev Client: client, EventHandlers: eventHandlers, + Processing: make(map[id.EventID]struct{}), } } @@ -166,17 +168,23 @@ func (store *EventStore) GetSumEvent(roomID id.RoomID, sumID id.EventID) *SumEve } func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, eventType event.Type) (*event.Event, error) { + var err error // see if we've handled this event before store.RLock() - evt, exists := store.Events[eventID] + evt, wasProcessed := store.Events[eventID] + _, isProcessing := store.Processing[eventID] store.RUnlock() - if exists { + if wasProcessed { // This event was seen before. If evt != nil, it was handled // successfully and vice versa. return evt, nil } + if isProcessing { + // don't fetch and wait for result from handler + goto process + } // we've never seen this event before - evt, err := store.fetchEvent(roomID, eventID) + evt, err = store.fetchEvent(roomID, eventID) if err != nil { return nil, fmt.Errorf("couldn't fetch %s event '%s': %s", eventType.Type, eventID, err) } @@ -188,6 +196,7 @@ func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, if err != nil { return nil, fmt.Errorf("couldn't parse %s event '%s' content: %s", eventType.Type, eventID, err) } +process: if !store.EventHandlers[eventType](evt) { return nil, fmt.Errorf("couldn't handle %s event '%s'", eventType.Type, eventID) } diff --git a/election/msg.go b/election/msg.go index c134c48..be99bc6 100644 --- a/election/msg.go +++ b/election/msg.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "reflect" + "time" log "github.com/sirupsen/logrus" "golang.org/x/crypto/nacl/box" @@ -109,6 +110,7 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma } else { eventStore.Client = client eventStore.EventHandlers = eventHandlers + eventStore.Processing = make(map[id.EventID]struct{}) } wrapper := func(f func(*event.Event) bool) func(*event.Event) bool { @@ -117,12 +119,26 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma log.Debugf("event %s was redacted", evt.ID) return } - eventStore.RLock() - handledEvent, exists := eventStore.Events[evt.ID] - eventStore.RUnlock() - if exists { + eventStore.Lock() + _, evtIsProcessing := eventStore.Processing[evt.ID] + handledEvent, eventHandled := eventStore.Events[evt.ID] + if evtIsProcessing { + eventStore.Unlock() + log.Debugf("event %s is being processed; awaiting success result", evt.ID) + for !eventHandled { + eventStore.RLock() + handledEvent, eventHandled = eventStore.Events[evt.ID] + eventStore.RUnlock() + time.Sleep(time.Millisecond * 50) + } + return handledEvent != nil + } else if eventHandled { + eventStore.Unlock() log.Debugf("event %s was already handled", evt.ID) return handledEvent != nil + } else { + eventStore.Processing[evt.ID] = struct{}{} + eventStore.Unlock() } success = f(evt) eventStore.Lock() @@ -132,6 +148,7 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma } else { eventStore.Events[evt.ID] = nil } + delete(eventStore.Processing, evt.ID) eventStore.Unlock() return } -- 2.38.4