~edwargix/tallyard

5b8d3fa4e98c47db727d191f3726aa356b4d857f — David Florness 3 years ago 4086f1d
Don't process an event when it's already being processed
2 files changed, 33 insertions(+), 7 deletions(-)

M election/event.go
M election/msg.go
M election/event.go => election/event.go +12 -3
@@ 24,6 24,7 @@ type EventStore struct {

	Client        *mautrix.Client                       `json:"-"`
	EventHandlers map[event.Type]func(*event.Event)bool `json:"-"`
	Processing    map[id.EventID]struct{}               `json:"-"`
}

func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*event.Event)bool) *EventStore {


@@ 32,6 33,7 @@ func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*ev

		Client:        client,
		EventHandlers: eventHandlers,
		Processing:    make(map[id.EventID]struct{}),
	}
}



@@ 166,17 168,23 @@ func (store *EventStore) GetSumEvent(roomID id.RoomID, sumID id.EventID) *SumEve
}

func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, eventType event.Type) (*event.Event, error) {
	var err error
	// see if we've handled this event before
	store.RLock()
	evt, exists := store.Events[eventID]
	evt, wasProcessed := store.Events[eventID]
	_, isProcessing := store.Processing[eventID]
	store.RUnlock()
	if exists {
	if wasProcessed {
		// This event was seen before.  If evt != nil, it was handled
		// successfully and vice versa.
		return evt, nil
	}
	if isProcessing {
		// don't fetch and wait for result from handler
		goto process
	}
	// we've never seen this event before
	evt, err := store.fetchEvent(roomID, eventID)
	evt, err = store.fetchEvent(roomID, eventID)
	if err != nil {
		return nil, fmt.Errorf("couldn't fetch %s event '%s': %s", eventType.Type, eventID, err)
	}


@@ 188,6 196,7 @@ func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID,
	if err != nil {
		return nil, fmt.Errorf("couldn't parse %s event '%s' content: %s", eventType.Type, eventID, err)
	}
process:
	if !store.EventHandlers[eventType](evt) {
		return nil, fmt.Errorf("couldn't handle %s event '%s'", eventType.Type, eventID)
	}

M election/msg.go => election/msg.go +21 -4
@@ 5,6 5,7 @@ import (
	"fmt"
	"math/big"
	"reflect"
	"time"

	log "github.com/sirupsen/logrus"
	"golang.org/x/crypto/nacl/box"


@@ 109,6 110,7 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma
	} else {
		eventStore.Client = client
		eventStore.EventHandlers = eventHandlers
		eventStore.Processing = make(map[id.EventID]struct{})
	}

	wrapper := func(f func(*event.Event) bool) func(*event.Event) bool {


@@ 117,12 119,26 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma
				log.Debugf("event %s was redacted", evt.ID)
				return
			}
			eventStore.RLock()
			handledEvent, exists := eventStore.Events[evt.ID]
			eventStore.RUnlock()
			if exists {
			eventStore.Lock()
			_, evtIsProcessing := eventStore.Processing[evt.ID]
			handledEvent, eventHandled := eventStore.Events[evt.ID]
			if evtIsProcessing {
				eventStore.Unlock()
				log.Debugf("event %s is being processed; awaiting success result", evt.ID)
				for !eventHandled {
					eventStore.RLock()
					handledEvent, eventHandled = eventStore.Events[evt.ID]
					eventStore.RUnlock()
					time.Sleep(time.Millisecond * 50)
				}
				return handledEvent != nil
			} else if eventHandled {
				eventStore.Unlock()
				log.Debugf("event %s was already handled", evt.ID)
				return handledEvent != nil
			} else {
				eventStore.Processing[evt.ID] = struct{}{}
				eventStore.Unlock()
			}
			success = f(evt)
			eventStore.Lock()


@@ 132,6 148,7 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma
			} else {
				eventStore.Events[evt.ID] = nil
			}
			delete(eventStore.Processing, evt.ID)
			eventStore.Unlock()
			return
		}