From 8c939acb699f227cd277c682f30221b705425f2f Mon Sep 17 00:00:00 2001 From: David Florness Date: Mon, 8 Jun 2020 19:24:53 -0600 Subject: [PATCH] Ensure we have everyones' inputs after getting everyones' sums --- voter.go | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 6 deletions(-) diff --git a/voter.go b/voter.go index cfaeb0f..edd1b10 100644 --- a/voter.go +++ b/voter.go @@ -137,6 +137,7 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { } peer.Lock() peer.input = new(big.Int).SetBytes(inputBytes) + logger.Infof("%s input: %s", peer.addrInfo.ID, peer.input) peer.Unlock() output := me.poly.Eval(peer.input) rw.WriteString(base58.Encode(output.Bytes())) @@ -151,6 +152,18 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { } rw.WriteString(base58.Encode(me.sum.Bytes())) rw.Flush() + case "input": + me.RLock() + defer me.RUnlock() + if me.input == nil { + logger.Info("peer attempted to fetch input "+ + "before we had one:", stream.Conn().RemotePeer()) + return + } + rw.WriteString(base58.Encode(me.input.Bytes())) + rw.Flush() + default: + logger.Warningf("uknown command %s", cmd) } } @@ -216,7 +229,7 @@ func findPeers(closeElection <-chan int) { logger.Info("done finding peers") } -func (voter *Voter) fetchNumber(cmd string, args ...string) *big.Int { +func (voter *Voter) fetchNumber(done chan<- *big.Int, cmd string, args ...string) { printErr := func(err error, msg string) { logger.Errorf("%s: %s fetcing `%s'; retrying in 2 seconds", voter.addrInfo.ID, msg, cmd) @@ -263,7 +276,7 @@ retry: printErr(err, "couldn't base58-decode contents from stream") goto retry } - return new(big.Int).SetBytes(retBytes) + done <- new(big.Int).SetBytes(retBytes) } func startVoting() { @@ -272,6 +285,7 @@ func startVoting() { if err != nil { panic(err) } + logger.Infof("our input: %s", me.input) ballot := vote(candidates) @@ -283,10 +297,19 @@ func startVoting() { for _, voter := range election.remoteVoters { wg.Add(1) go func(voter *Voter) { - voter.output = voter.fetchNumber("eval", + done := make(chan *big.Int, 1) + me.RLock() + go voter.fetchNumber(done, "eval", base58.Encode(me.input.Bytes())) + me.RUnlock() + output := <- done + voter.Lock() + voter.output = output + voter.Unlock() + voter.RLock() logger.Infof("voter %s output: %s", voter.addrInfo.ID, voter.output) + voter.RUnlock() wg.Done() }(voter) } @@ -304,17 +327,42 @@ func startVoting() { for _, voter := range election.remoteVoters { wg.Add(1) go func(voter *Voter) { + done := make(chan *big.Int, 1) me.RLock() - voter.sum = voter.fetchNumber("sum", - base58.Encode(me.sum.Bytes())) + go voter.fetchNumber(done, "sum", base58.Encode(me.sum.Bytes())) + me.RUnlock() + sum := <- done + voter.Lock() + voter.sum = sum + voter.Unlock() + voter.RLock() logger.Infof("voter %s sum: %s", voter.addrInfo.ID, voter.sum) - me.RUnlock() + voter.RUnlock() wg.Done() }(voter) } wg.Wait() + // ensure we have everyone's inputs + for _, voter := range election.remoteVoters { + voter.RLock() + haveInput := voter.input != nil + voter.RUnlock() + if !haveInput { + wg.Add(1) + go func(voter *Voter) { + done := make(chan *big.Int, 1) + go voter.fetchNumber(done, "input") + input := <- done + voter.Lock() + voter.input = input + voter.Unlock() + }(voter) + } + } + wg.Wait() + // temporary select {} } -- 2.38.4