@@ 24,7 24,7 @@ type EventStore struct {
Client *mautrix.Client `json:"-"`
EventHandlers map[event.Type]func(*event.Event)bool `json:"-"`
- Processing map[id.EventID]struct{} `json:"-"`
+ Processing map[id.EventID]<-chan struct{} `json:"-"`
}
func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*event.Event)bool) *EventStore {
@@ 33,7 33,7 @@ func NewEventStore(client *mautrix.Client, eventHandlers map[event.Type]func(*ev
Client: client,
EventHandlers: eventHandlers,
- Processing: make(map[id.EventID]struct{}),
+ Processing: make(map[id.EventID]<-chan struct{}),
}
}
@@ 190,11 190,10 @@ func (store *EventStore) GetSumEvent(roomID id.RoomID, sumID id.EventID) *SumEve
}
func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID, eventType event.Type) (*event.Event, error) {
- var err error
// see if we've handled this event before
store.RLock()
evt, wasProcessed := store.Events[eventID]
- _, isProcessing := store.Processing[eventID]
+ done, isProcessing := store.Processing[eventID]
store.RUnlock()
if wasProcessed {
// This event was seen before. If evt != nil, it was handled
@@ 202,11 201,22 @@ func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID,
return evt, nil
}
if isProcessing {
- // don't fetch and wait for result from handler
- goto process
+ // wait for result from handler and skip fetch
+ log.Debugf("event %s is processing; waiting for processing to finish", eventID)
+ <-done
+ store.RLock()
+ defer store.RUnlock()
+ evt, wasProcessed = store.Events[eventID]
+ if !wasProcessed {
+ return nil, fmt.Errorf("event %s isn't in events map despite seeming to be processed", eventID)
+ }
+ if evt == nil {
+ return nil, fmt.Errorf("event %s failed processing while we were waiting for it", eventID)
+ }
+ return evt, nil
}
// we've never seen this event before
- evt, err = store.fetchEvent(roomID, eventID)
+ evt, err := store.fetchEvent(roomID, eventID)
if err != nil {
return nil, fmt.Errorf("couldn't fetch %s event '%s': %s", eventType.Type, eventID, err)
}
@@ 218,7 228,6 @@ func (store *EventStore) getAndHandleEvent(roomID id.RoomID, eventID id.EventID,
if err != nil {
return nil, fmt.Errorf("couldn't parse %s event '%s' content: %s", eventType.Type, eventID, err)
}
-process:
if !store.EventHandlers[eventType](evt) {
return nil, fmt.Errorf("couldn't handle %s event '%s'", eventType.Type, eventID)
}
@@ 5,7 5,6 @@ import (
"encoding/base64"
"fmt"
"reflect"
- "time"
"github.com/consensys/gnark-crypto/ecc"
"github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
@@ 140,51 139,60 @@ func (elections *ElectionsMap) SetupEventHooks(client *mautrix.Client, syncer ma
} else {
eventStore.Client = client
eventStore.EventHandlers = eventHandlers
- eventStore.Processing = make(map[id.EventID]struct{})
+ eventStore.Processing = make(map[id.EventID]<-chan struct{})
}
- // TODO: this is a freaking mess
wrapper := func(f func(*event.Event, *mautrix.Client) bool) func(*event.Event) bool {
return func(evt *event.Event) bool {
if evt == nil {
- // TODO: this is only here to help find a bug
- log.Debugf("evt is nil")
+ // this is only here to make potential
+ // bug-hunting easier
+ log.Error("evt is nil")
+ return false
}
if evt.Unsigned.RedactedBecause != nil {
log.Debugf("event %s was redacted", evt.ID)
return false
}
+
+ // don't process events that have been (or are being)
+ // processed already
eventStore.Lock()
- _, evtIsProcessing := eventStore.Processing[evt.ID]
- handledEvent, eventHandled := eventStore.Events[evt.ID]
- if evtIsProcessing {
- eventStore.Unlock()
- log.Debugf("event %s is being processed; awaiting success result", evt.ID)
- for !eventHandled {
- eventStore.RLock()
- handledEvent, eventHandled = eventStore.Events[evt.ID]
- eventStore.RUnlock()
- time.Sleep(time.Millisecond * 50)
- }
- return handledEvent != nil
- } else if eventHandled {
- eventStore.Unlock()
+ handledEvent, wasProcessed := eventStore.Events[evt.ID]
+ doneRChan, isProcessing := eventStore.Processing[evt.ID]
+ if wasProcessed {
+ defer eventStore.Unlock()
log.Debugf("event %s was already handled", evt.ID)
return handledEvent != nil
- } else {
- eventStore.Processing[evt.ID] = struct{}{}
+ }
+ if isProcessing {
eventStore.Unlock()
+ // event is already being processed, so wait for
+ // success result and return it
+ log.Debugf("event %s is being processed; awaiting success result", evt.ID)
+ <-doneRChan
+ eventStore.RLock()
+ defer eventStore.RUnlock()
+ handledEvent, wasProcessed = eventStore.Events[evt.ID]
+ return handledEvent != nil
}
+ done := make(chan struct{})
+ eventStore.Processing[evt.ID] = done
+ eventStore.Unlock()
+
+ // process event
success := f(evt, client)
+
+ // store event and pass it to any routines waiting for it
eventStore.Lock()
- // see EventStore doc for success explanation
+ defer eventStore.Unlock()
if success {
eventStore.Events[evt.ID] = evt
} else {
eventStore.Events[evt.ID] = nil
}
delete(eventStore.Processing, evt.ID)
- eventStore.Unlock()
+ close(done)
return success
}
}