From e18fa4ed898295b1e74229d5f287c6967b6091e4 Mon Sep 17 00:00:00 2001 From: David Florness Date: Mon, 8 Jun 2020 16:23:02 -0600 Subject: [PATCH] Cleanup datastructures and lint --- main.go | 56 +++++++++++++------------- poly.go | 6 +-- ui.go | 9 ++--- voter.go | 120 ++++++++++++++++++++++++++++--------------------------- 4 files changed, 96 insertions(+), 95 deletions(-) diff --git a/main.go b/main.go index c1b4153..263686b 100644 --- a/main.go +++ b/main.go @@ -22,30 +22,21 @@ import ( ) var ( - logger = log.Logger("tallyard") - protocolID = protocol.ID("/tallyard/0.0.0") - - candidates []Candidate - optionsMerkle *merkletree.MerkleTree - + logger = log.Logger("tallyard") + protocolID = protocol.ID("/tallyard/0.0.0") + candidates []Candidate + optionsMerkle *merkletree.MerkleTree rendezvousNonce Nonce merkleRoot []byte - masterID peer.ID - - me Me - - // slave: signifies when election was closed by master - // - // master: signifies when user hits ENTER to close the election - // - // returns the number of peers know by master - closeElection chan<- int + me Me + election Election ) func bootstrap() { var err error me.ctx = context.Background() + election.remoteVoters = make(map[peer.ID]*Voter) me.Host, err = libp2p.New(me.ctx, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { @@ -83,18 +74,21 @@ func bootstrap() { } wg.Wait() - if masterID == "" { // we are the master + if election.masterID == "" { // we are the master fmt.Println("share this with peers:") fmt.Printf("%s0%s\n", base58.Encode(optionsMerkle.MerkleRoot()), me.ID()) logger.Info("waiting for incoming streams and finding voters...") - me.SetStreamHandler(protocolID, streamHandler) + election.Lock() ch := make(chan int, 1) - closeElection = ch + election.close = ch go findPeers(ch) + election.Unlock() + + me.SetStreamHandler(protocolID, streamHandler) fmt.Println("press ENTER to solidify group of voters and start voting") stdReader := bufio.NewReader(os.Stdin) @@ -104,10 +98,13 @@ func bootstrap() { } logger.Info("ENTER has been pressed; closing election") - n := len(me.otherVoters) - closeElection <- n - close(closeElection) - for _, voter := range me.otherVoters { + election.Lock() + n := len(election.remoteVoters) + election.close <- n + close(election.close) + election.Unlock() + election.RLock() + for _, voter := range election.remoteVoters { stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID) if err != nil { panic(err) @@ -117,9 +114,10 @@ func bootstrap() { writer.Flush() stream.Close() } + election.RUnlock() } else { // we are a slave logger.Info("attempting to open stream with master peer...") - stream, err := me.NewStream(me.ctx, masterID, protocolID) + stream, err := me.NewStream(me.ctx, election.masterID, protocolID) rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) if err != nil { panic(err) @@ -161,15 +159,13 @@ func bootstrap() { logger.Info("checking authenticity of election info...") verifyElectionInfo() - // now that we have election info, begin handling streams - me.SetStreamHandler(protocolID, streamHandler) // channel used to signify when election is closed ch := make(chan int, 1) - closeElection = ch + election.close = ch + // now that we have election info, begin handling streams + me.SetStreamHandler(protocolID, streamHandler) findPeers(ch) } - - startVoting() } func main() { @@ -194,4 +190,6 @@ func main() { } bootstrap() + + startVoting() } diff --git a/poly.go b/poly.go index 8ebf89c..559a1e3 100644 --- a/poly.go +++ b/poly.go @@ -34,11 +34,11 @@ func NewRandomPoly(degree uint, entropy uint, ballot []byte) *Poly { p := &Poly{constant, make([]*big.Int, degree)} // number of bits per coefficient - numBits := uint(math.Ceil(float64(entropy)/float64(degree))) + numBits := uint(math.Ceil(float64(entropy) / float64(degree))) // number of bytes per coefficient numBytes := numBits / 8 - if numBits % 8 > 0 { + if numBits%8 > 0 { numBytes += 1 } @@ -58,7 +58,7 @@ func (p *Poly) Eval(input *big.Int) *big.Int { res.Set(p.constant) for i, coef := range p.coefs { - degree := big.NewInt(int64(i+1)) + degree := big.NewInt(int64(i + 1)) term := &big.Int{} term.Exp(input, degree, nil) term.Mul(term, coef) diff --git a/ui.go b/ui.go index 4c93346..011bc8a 100644 --- a/ui.go +++ b/ui.go @@ -64,17 +64,17 @@ func joinElection() { zeroi := strings.IndexByte(electionKey, '0') var err error + logger.Info("merkle root:", electionKey[:zeroi]) merkleRoot, err = base58.Decode(electionKey[:zeroi]) if err != nil { panic(err) } - logger.Info("merkle root:", merkleRoot) - masterID, err = peer.Decode(electionKey[zeroi+1:]) + election.masterID, err = peer.Decode(electionKey[zeroi+1:]) if err != nil { panic(err) } - logger.Info("master ID:", masterID) + logger.Info("master ID:", election.masterID) }) if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil { panic(err) @@ -98,8 +98,7 @@ func vote(candidates []Candidate) []byte { form.AddButton("Submit", func() { app.Stop() for i := 0; i < len(candidates); i++ { - rank, err := strconv.Atoi(form.GetFormItem(i). - (*tview.InputField).GetText()) + rank, err := strconv.Atoi(form.GetFormItem(i).(*tview.InputField).GetText()) if err != nil { panic(err) } diff --git a/voter.go b/voter.go index 832dbd7..aedf05e 100644 --- a/voter.go +++ b/voter.go @@ -21,8 +21,9 @@ import ( ) type Voter struct { + sync.RWMutex sum *big.Int - sumMutex sync.RWMutex + input *big.Int output *big.Int addrInfo peer.AddrInfo } @@ -30,13 +31,22 @@ type Voter struct { type Me struct { Voter host.Host - ctx context.Context - kdht *dht.IpfsDHT - otherVoters []*Voter - poly *Poly - polyMutex sync.RWMutex - input *big.Int - inputMutex sync.RWMutex + ctx context.Context + kdht *dht.IpfsDHT + poly *Poly +} + +type Election struct { + sync.RWMutex + // for slave: signifies when election was closed by master + // + // for master: signifies when user hits ENTER to close the election + // + // the number of peers know by master is passed through it + close chan<- int + closed bool // used by handleCmd to prevent closing election more than once + masterID peer.ID + remoteVoters map[peer.ID]*Voter } func stripNewline(str string) string { @@ -49,9 +59,6 @@ func stripNewline(str string) string { return str } -// used by handleCmd to prevent canceling election more than once -var electionClosed bool - func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { switch cmd { case "info": @@ -61,12 +68,14 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { } rw.Flush() case "close": - if electionClosed { - logger.Warning("election already closed") + election.Lock() + defer election.Unlock() + if peer := stream.Conn().RemotePeer(); peer != election.masterID { + logger.Warning("received close command from non-master:", peer) return } - if peer := stream.Conn().RemotePeer(); peer != masterID { - logger.Warning("received close command from non-master:", peer) + if election.closed { + logger.Warning("election already closed") return } str, err := rw.ReadString('\n') @@ -78,34 +87,33 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { if err != nil { panic(err) } - closeElection <- numPeers - close(closeElection) - electionClosed = true + election.close <- numPeers + close(election.close) + election.closed = true case "shake": - p := stream.Conn().RemotePeer() - if electionClosed { + election.Lock() + defer election.Unlock() + peerID := stream.Conn().RemotePeer() + if election.closed { logger.Warning("peer attempted to shake after "+ - "election was closed:", p) + "election was closed:", peerID) return } - for _, voter := range me.otherVoters { - if p == voter.addrInfo.ID { - logger.Warning("peer attempted to shake after "+ - "having already done so", p) - return - } + if _, exists := election.remoteVoters[peerID]; exists { + logger.Warning("peer attempted to shake after having already done so", peerID) + return } - me.otherVoters = append(me.otherVoters, &Voter{ + election.remoteVoters[peerID] = &Voter{ addrInfo: peer.AddrInfo{ - ID: p, + ID: peerID, Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()}, }, - }) + } case "eval": // peer is giving their input and requesting output from our poly - me.inputMutex.RLock() - defer me.inputMutex.RUnlock() - if me.input == nil { - logger.Warning("peer attempted to eval before voting started:", + me.RLock() + defer me.RUnlock() + if me.poly == nil { + logger.Warning("peer attempted to eval before we had our poly:", stream.Conn().RemotePeer()) return } @@ -128,13 +136,8 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { rw.WriteString(base58.Encode(output.Bytes())) rw.Flush() case "sum": - if !voting { - logger.Warning("peer attempted to fetch sum "+ - "before voting started:", stream.Conn().RemotePeer()) - return - } - me.sumMutex.RLock() - defer me.sumMutex.RUnlock() + me.RLock() + defer me.RUnlock() if me.sum == nil { logger.Info("peer attempted to fetch sum "+ "before we computed it:", stream.Conn().RemotePeer()) @@ -174,7 +177,7 @@ func findPeers(closeElection <-chan int) { } numPeers := -1 for { - if numPeers != -1 && numPeers == len(me.otherVoters) { + if numPeers != -1 && numPeers == len(election.remoteVoters) { break } select { @@ -193,16 +196,13 @@ func findPeers(closeElection <-chan int) { writer.WriteString("shake") writer.Flush() stream.Close() - me.otherVoters = append(me.otherVoters, &Voter{addrInfo: peer}) - if numPeers != -1 && numPeers == len(me.otherVoters) { - break - } + election.remoteVoters[peer.ID] = &Voter{addrInfo: peer} } else { logger.Warning("connection failed:", err) } case numPeers = <-closeElection: - if len(me.otherVoters) > numPeers { - logger.Error("found more peers than master!") + if len(election.remoteVoters) > numPeers { + logger.Fatal("found more peers than master!") os.Exit(1) } } @@ -213,7 +213,7 @@ func findPeers(closeElection <-chan int) { func (voter *Voter) fetchNumber(cmd string, args ...string) *big.Int { printErr := func(err error, msg string) { logger.Errorf("%s: %s during `%s'; retrying in 2 seconds", - voter.addrInfo.ID, cmd, msg) + voter.addrInfo.ID, msg, cmd) time.Sleep(time.Second * 2) } retry: @@ -248,6 +248,10 @@ retry: goto retry } retB58 = stripNewline(retB58) + if retB58 == "" { + printErr(err, "empty string") + goto retry + } retBytes, err := base58.Decode(retB58) if err != nil { printErr(err, "couldn't base58-decode contents from stream") @@ -268,11 +272,11 @@ func startVoting() { ballot := vote(candidates) // no +1 since we want degree k-1 where k is total number of voters - me.poly = NewRandomPoly(uint(len(me.otherVoters)), 1024, ballot) + me.poly = NewRandomPoly(uint(len(election.remoteVoters)), 1024, ballot) // get outputs var wg sync.WaitGroup - for _, voter := range me.otherVoters { + for _, voter := range election.remoteVoters { wg.Add(1) go func(voter *Voter) { voter.output = voter.fetchNumber("eval", @@ -285,23 +289,23 @@ func startVoting() { wg.Wait() // calculate sum - me.sumMutex.Lock() + me.Lock() me.sum = me.poly.Eval(me.input) - for _, voter := range me.otherVoters { + for _, voter := range election.remoteVoters { me.sum.Add(me.sum, voter.output) } - me.sumMutex.Unlock() + me.Unlock() // get sums - for _, voter := range me.otherVoters { + for _, voter := range election.remoteVoters { wg.Add(1) - go func (voter *Voter) { - me.sumMutex.RLock() + go func(voter *Voter) { + me.RLock() voter.sum = voter.fetchNumber("sum", base58.Encode(me.sum.Bytes())) - me.sumMutex.RUnlock() logger.Infof("voter %s sum: %s", voter.addrInfo.ID, voter.sum) + me.RUnlock() wg.Done() }(voter) } -- 2.38.4