~edwargix/tallyard

8c939acb699f227cd277c682f30221b705425f2f — David Florness 5 years ago 88de616
Ensure we have everyones' inputs after getting everyones' sums
1 files changed, 54 insertions(+), 6 deletions(-)

M voter.go
M voter.go => voter.go +54 -6
@@ 137,6 137,7 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
		}
		peer.Lock()
		peer.input = new(big.Int).SetBytes(inputBytes)
		logger.Infof("%s input: %s", peer.addrInfo.ID, peer.input)
		peer.Unlock()
		output := me.poly.Eval(peer.input)
		rw.WriteString(base58.Encode(output.Bytes()))


@@ 151,6 152,18 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
		}
		rw.WriteString(base58.Encode(me.sum.Bytes()))
		rw.Flush()
	case "input":
		me.RLock()
		defer me.RUnlock()
		if me.input == nil {
			logger.Info("peer attempted to fetch input "+
				"before we had one:", stream.Conn().RemotePeer())
			return
		}
		rw.WriteString(base58.Encode(me.input.Bytes()))
		rw.Flush()
	default:
		logger.Warningf("uknown command %s", cmd)
	}
}



@@ 216,7 229,7 @@ func findPeers(closeElection <-chan int) {
	logger.Info("done finding peers")
}

func (voter *Voter) fetchNumber(cmd string, args ...string) *big.Int {
func (voter *Voter) fetchNumber(done chan<- *big.Int, cmd string, args ...string) {
	printErr := func(err error, msg string) {
		logger.Errorf("%s: %s fetcing `%s'; retrying in 2 seconds",
			voter.addrInfo.ID, msg, cmd)


@@ 263,7 276,7 @@ retry:
		printErr(err, "couldn't base58-decode contents from stream")
		goto retry
	}
	return new(big.Int).SetBytes(retBytes)
	done <- new(big.Int).SetBytes(retBytes)
}

func startVoting() {


@@ 272,6 285,7 @@ func startVoting() {
	if err != nil {
		panic(err)
	}
	logger.Infof("our input: %s", me.input)

	ballot := vote(candidates)



@@ 283,10 297,19 @@ func startVoting() {
	for _, voter := range election.remoteVoters {
		wg.Add(1)
		go func(voter *Voter) {
			voter.output = voter.fetchNumber("eval",
			done := make(chan *big.Int, 1)
			me.RLock()
			go voter.fetchNumber(done, "eval",
				base58.Encode(me.input.Bytes()))
			me.RUnlock()
			output := <- done
			voter.Lock()
			voter.output = output
			voter.Unlock()
			voter.RLock()
			logger.Infof("voter %s output: %s",
				voter.addrInfo.ID, voter.output)
			voter.RUnlock()
			wg.Done()
		}(voter)
	}


@@ 304,17 327,42 @@ func startVoting() {
	for _, voter := range election.remoteVoters {
		wg.Add(1)
		go func(voter *Voter) {
			done := make(chan *big.Int, 1)
			me.RLock()
			voter.sum = voter.fetchNumber("sum",
				base58.Encode(me.sum.Bytes()))
			go voter.fetchNumber(done, "sum", base58.Encode(me.sum.Bytes()))
			me.RUnlock()
			sum := <- done
			voter.Lock()
			voter.sum = sum
			voter.Unlock()
			voter.RLock()
			logger.Infof("voter %s sum: %s",
				voter.addrInfo.ID, voter.sum)
			me.RUnlock()
			voter.RUnlock()
			wg.Done()
		}(voter)
	}
	wg.Wait()

	// ensure we have everyone's inputs
	for _, voter := range election.remoteVoters {
		voter.RLock()
		haveInput := voter.input != nil
		voter.RUnlock()
		if !haveInput {
			wg.Add(1)
			go func(voter *Voter) {
				done := make(chan *big.Int, 1)
				go voter.fetchNumber(done, "input")
				input := <- done
				voter.Lock()
				voter.input = input
				voter.Unlock()
			}(voter)
		}
	}
	wg.Wait()

	// temporary
	select {}
}