From 9f8b544cd6cef7526cbf221ebe32b2f2c1295b7b Mon Sep 17 00:00:00 2001 From: David Florness Date: Wed, 5 Aug 2020 13:39:53 -0600 Subject: [PATCH] Local voter instead of "me" and election struct refactor --- election.go | 67 +++++++++++++++++++++++++++++++ main.go | 89 ++++++++++++++++++++++------------------- merkle.go | 30 ++------------ ui.go | 21 +++++----- voter.go | 113 ++++++++++++++++++++++------------------------------ 5 files changed, 176 insertions(+), 144 deletions(-) create mode 100644 election.go diff --git a/election.go b/election.go new file mode 100644 index 0000000..fc60c68 --- /dev/null +++ b/election.go @@ -0,0 +1,67 @@ +package main + +import ( + "strings" + "sync" + + "github.com/libp2p/go-libp2p-core/peer" + "github.com/mr-tron/base58/base58" +) + +type Election struct { + sync.RWMutex + + Candidates []Candidate + // for slave: signifies when election was closed by master + // + // for master: signifies when user hits ENTER to close the election + // + // the number of peers known by master is passed through it + close chan<- int + // used by handleCmd to prevent closing election more than once + closed bool + ElectionKey string + masterID peer.ID + merkleRoot []byte + remoteVoters map[peer.ID]*Voter + rendezvousNonce Nonce +} + +func NewElection() *Election { + return &Election{ + remoteVoters: make(map[peer.ID]*Voter), + } +} + +func (e *Election) MasterID() peer.ID { + if e.masterID != "" { + return e.masterID + } + zeroi := strings.IndexByte(e.ElectionKey, '0') + if zeroi == -1 { + panic("invalid election key") + } + var err error + e.masterID, err = peer.Decode(e.ElectionKey[zeroi+1:]) + if err != nil { + panic(err) + } + logger.Info("masted ID:", e.masterID) + return e.masterID +} + +func (e *Election) MerkleRoot() []byte { + if len(e.merkleRoot) > 0 { + return e.merkleRoot + } + zeroi := strings.IndexByte(e.ElectionKey, '0') + if zeroi == -1 { + panic("invalid election key") + } + logger.Info("merkle root:", e.ElectionKey[:zeroi]) + merkleRoot, err := base58.Decode(e.ElectionKey[:zeroi]) + if err != nil { + panic(err) + } + return merkleRoot +} diff --git a/main.go b/main.go index 58017c5..bf27f61 100644 --- a/main.go +++ b/main.go @@ -24,39 +24,43 @@ const protocolID = protocol.ID("/tallyard/0.0.0") var logger = log.Logger("tallyard") -func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string, hostOpts ...libp2p.Option) { - me.ctx = context.Background() - election.remoteVoters = make(map[peer.ID]*Voter) +func NewLocalVoter(hostOpts ...libp2p.Option) *LocalVoter { + localVoter := new(LocalVoter) + localVoter.ctx = context.Background() routing := libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { var err error - me.kdht, err = dht.New(me.ctx, h) + localVoter.kdht, err = dht.New(localVoter.ctx, h) if err != nil { - return me.kdht, err + return localVoter.kdht, err } logger.Info("boostrapping the DHT") - if err = me.kdht.Bootstrap(me.ctx); err != nil { + if err = localVoter.kdht.Bootstrap(localVoter.ctx); err != nil { panic(err) } - return me.kdht, err + return localVoter.kdht, err }) var err error - me.Host, err = libp2p.New(me.ctx, append(hostOpts, routing)...) + localVoter.Host, err = libp2p.New(localVoter.ctx, append(hostOpts, routing)...) if err != nil { panic(err) } - logger.Info("host: ", me.ID()) - logger.Info("addrs: ", me.Addrs()) + logger.Info("host: ", localVoter.ID()) + logger.Info("addrs: ", localVoter.Addrs()) + return localVoter +} + +func (localVoter *LocalVoter) Bootstrap(election *Election) { var wg sync.WaitGroup for _, peerAddr := range dht.DefaultBootstrapPeers { peerInfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) wg.Add(1) go func() { defer wg.Done() - if err := me.Connect(me.ctx, *peerInfo); err != nil { + if err := localVoter.Connect(localVoter.ctx, *peerInfo); err != nil { logger.Warning(err) } else { logger.Info("connection established with bootstrap node:", *peerInfo) @@ -65,15 +69,15 @@ func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string } wg.Wait() - if election.masterID == "" { // we are the master + if election.ElectionKey == "" { // we are the master fmt.Println("share this with peers:") - electionKey = fmt.Sprintf("%s0%s", base58.Encode(merkleRoot), me.ID()) - fmt.Printf("%s\n", electionKey) + election.ElectionKey = fmt.Sprintf("%s0%s", base58.Encode(election.MerkleRoot()), localVoter.ID()) + fmt.Printf("%s\n", election.ElectionKey) err := saveElectionInfo( - electionKey, - me.Peerstore().PrivKey(me.ID()), - me.Peerstore().PubKey(me.ID()), + election.ElectionKey, + localVoter.Peerstore().PrivKey(localVoter.ID()), + localVoter.Peerstore().PubKey(localVoter.ID()), ) if err != nil { panic(err) @@ -84,11 +88,11 @@ func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string election.Lock() ch := make(chan int, 1) election.close = ch - go findPeers(ch, election, me) + go findPeers(ch, election, localVoter) election.Unlock() - me.SetStreamHandler(protocolID, func(stream network.Stream) { - streamHandler(stream, election, me) + localVoter.SetStreamHandler(protocolID, func(stream network.Stream) { + streamHandler(stream, election, localVoter) }) fmt.Println("press ENTER to solidify group of voters and start voting") @@ -106,7 +110,7 @@ func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string election.Unlock() election.RLock() for _, voter := range election.remoteVoters { - stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID) + stream, err := localVoter.NewStream(localVoter.ctx, voter.addrInfo.ID, protocolID) if err != nil { panic(err) } @@ -118,16 +122,16 @@ func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string election.RUnlock() } else { // we are a slave err := saveElectionInfo( - electionKey, - me.Peerstore().PrivKey(me.ID()), - me.Peerstore().PubKey(me.ID()), + election.ElectionKey, + localVoter.Peerstore().PrivKey(localVoter.ID()), + localVoter.Peerstore().PubKey(localVoter.ID()), ) if err != nil { panic(err) } logger.Info("attempting to open stream with master peer...") - stream, err := me.NewStream(me.ctx, election.masterID, protocolID) + stream, err := localVoter.NewStream(localVoter.ctx, election.MasterID(), protocolID) rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) if err != nil { panic(err) @@ -157,7 +161,7 @@ func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string } str = stripNewline(str) if str != "" { - election.candidates = append(election.candidates, Candidate(str)) + election.Candidates = append(election.Candidates, Candidate(str)) } if err == io.EOF { break @@ -166,16 +170,16 @@ func bootstrap(election *Election, me *Me, merkleRoot []byte, electionKey string logger.Info("done fetching election info") logger.Info("checking authenticity of election info...") - verifyElectionInfo(election, merkleRoot) + verifyElectionInfo(election, election.MerkleRoot()) // channel used to signify when election is closed ch := make(chan int, 1) election.close = ch // now that we have election info, begin handling streams - me.SetStreamHandler(protocolID, func(stream network.Stream) { - streamHandler(stream, election, me) + localVoter.SetStreamHandler(protocolID, func(stream network.Stream) { + streamHandler(stream, election, localVoter) }) - findPeers(ch, election, me) + findPeers(ch, election, localVoter) } } @@ -183,7 +187,7 @@ func main() { log.SetAllLoggers(log.LevelWarn) verbose := flag.Bool("v", false, "enable verbose logging for debugging") - master := flag.Bool("m", false, "indicate that this node is the " + + master := flag.Bool("m", false, "indicate that this node is the "+ "master and the candidates are given via positional arguments") flag.Parse() if *verbose { @@ -198,35 +202,34 @@ func main() { election *Election electionKey string identityOpt libp2p.Option = libp2p.RandomIdentity - me *Me = new(Me) - merkleRoot []byte ) if *master { // we are the master and the candidates are the positional // arguments - election = new(Election) + election = NewElection() for _, candidate := range flag.Args() { - election.candidates = append( - election.candidates, + election.Candidates = append( + election.Candidates, Candidate(candidate), ) } + election.GenMerkleFromCandidates() } else if electionKey = flag.Arg(0); electionKey != "" { // we are a slave slave the and election key was given via CLI // args - election = new(Election) - merkleRoot, election.masterID = splitElectionKey(electionKey) + election = NewElection() + election.ElectionKey = electionKey } else { // create/join election via TUI - election, merkleRoot, electionKey = tui() + election = tui() if election == nil { // tui form wasn't submitted (maybe the user hit ^C) os.Exit(0) } } - if electionKey != "" { // we are a slave + if electionKey != "" { // try to recover election info if we've joined before electionInfo, err := getElectionInfo(electionKey) if err != nil { @@ -243,7 +246,9 @@ func main() { } } - bootstrap(election, me, merkleRoot, electionKey, identityOpt) + localVoter := NewLocalVoter(identityOpt) + + localVoter.Bootstrap(election) - startVoting(election, me) + startVoting(election, localVoter) } diff --git a/merkle.go b/merkle.go index 39e2d6c..f468a89 100644 --- a/merkle.go +++ b/merkle.go @@ -6,10 +6,8 @@ import ( "crypto/sha256" "fmt" "os" - "strings" "github.com/cbergoon/merkletree" - "github.com/libp2p/go-libp2p-core/peer" "github.com/mr-tron/base58/base58" ) @@ -57,7 +55,7 @@ func (n Nonce) Equals(other merkletree.Content) (bool, error) { func verifyElectionInfo(election *Election, merkleRoot []byte) { content := []merkletree.Content{election.rendezvousNonce} var err error - for _, eo := range election.candidates { + for _, eo := range election.Candidates { content = append(content, eo) } optionsMerkle, err := merkletree.NewTree(content) @@ -72,35 +70,15 @@ func verifyElectionInfo(election *Election, merkleRoot []byte) { } } -func splitElectionKey(electionKey string) (merkleRoot []byte, masterID peer.ID) { - var err error - zeroi := strings.IndexByte(electionKey, '0') - if zeroi == -1 { - panic("invalid election key") - } - logger.Info("merkle root:", electionKey[:zeroi]) - merkleRoot, err = base58.Decode(electionKey[:zeroi]) - if err != nil { - panic(err) - } - masterID, err = peer.Decode(electionKey[zeroi+1:]) - if err != nil { - panic(err) - } - logger.Info("master ID:", masterID) - return merkleRoot, masterID -} - -func (e *Election) createMerkle(candidates []Candidate) (merkleRoot []byte) { +func (e *Election) GenMerkleFromCandidates() { e.rendezvousNonce = NewNonce() content := []merkletree.Content{e.rendezvousNonce} - for _, cand := range candidates { + for _, cand := range e.Candidates { content = append(content, cand) } optionsMerkle, err := merkletree.NewTree(content) if err != nil { panic(err) } - merkleRoot = optionsMerkle.MerkleRoot() - return merkleRoot + e.merkleRoot = optionsMerkle.MerkleRoot() } diff --git a/ui.go b/ui.go index aa2f177..8fcc98a 100644 --- a/ui.go +++ b/ui.go @@ -10,27 +10,26 @@ import ( "github.com/rivo/tview" ) -func tui() (election *Election, merkleRoot []byte, electionKey string) { +func tui() (election *Election) { app := tview.NewApplication() done := func(buttonIndex int, buttonLabel string) { app.Stop() - election = new(Election) + election = NewElection() switch buttonLabel { case "Create Election": - election.candidates = createElection() + election.Candidates = getCandidatesTUI() // TODO: slaves should check that len candidates >= 2 - if election.candidates == nil || len(election.candidates) == 0 { + if election.Candidates == nil || len(election.Candidates) == 0 { fmt.Printf("no candidates entered; exiting\n") os.Exit(0) } - merkleRoot = election.createMerkle(election.candidates) + election.GenMerkleFromCandidates() case "Join Election": - electionKey := joinElection() - if electionKey == "" { + election.ElectionKey = getElectionKeyTUI() + if election.ElectionKey == "" { fmt.Printf("no election key given; exiting\n") os.Exit(0) } - merkleRoot, election.masterID = splitElectionKey(electionKey) } } modal := tview.NewModal(). @@ -40,10 +39,10 @@ func tui() (election *Election, merkleRoot []byte, electionKey string) { if err := app.SetRoot(modal, false).EnableMouse(true).Run(); err != nil { panic(err) } - return election, merkleRoot, electionKey + return election } -func createElection() (candidates []Candidate) { +func getCandidatesTUI() (candidates []Candidate) { var form *tview.Form n := 3 app := tview.NewApplication() @@ -79,7 +78,7 @@ func createElection() (candidates []Candidate) { return candidates } -func joinElection() (electionKey string) { +func getElectionKeyTUI() (electionKey string) { app := tview.NewApplication() var form *tview.Form done := func() { diff --git a/voter.go b/voter.go index 32c9104..eb2ddfb 100644 --- a/voter.go +++ b/voter.go @@ -30,13 +30,12 @@ type Voter struct { addrInfo peer.AddrInfo } -type Me struct { +type LocalVoter struct { Voter host.Host - ctx context.Context - kdht *dht.IpfsDHT - - poly *Poly + ctx context.Context + kdht *dht.IpfsDHT + poly *Poly // mutexs only used for atomicity; atomicity.Value sucks because we lose // type safety with interface{} @@ -45,22 +44,6 @@ type Me struct { inputMu sync.RWMutex // TODO remove by generating input right away } -type Election struct { - sync.RWMutex - // for slave: signifies when election was closed by master - // - // for master: signifies when user hits ENTER to close the election - // - // the number of peers know by master is passed through it - close chan<- int - closed bool // used by handleCmd to prevent closing election more than once - - candidates []Candidate - masterID peer.ID - remoteVoters map[peer.ID]*Voter - rendezvousNonce Nonce -} - func stripNewline(str string) string { if str == "" { return str @@ -71,11 +54,11 @@ func stripNewline(str string) string { return str } -func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream, election *Election, me *Me) { +func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream, election *Election, localVoter *LocalVoter) { switch cmd { case "info": rw.WriteString(fmt.Sprintf("%s\n", election.rendezvousNonce)) - for _, option := range election.candidates { + for _, option := range election.Candidates { rw.WriteString(fmt.Sprintf("%s\n", option)) } rw.Flush() @@ -122,9 +105,9 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream, election }, } case "eval": // peer is giving their input and requesting output from our poly - me.polyMu.RLock() - defer me.polyMu.RUnlock() - if me.poly == nil { + localVoter.polyMu.RLock() + defer localVoter.polyMu.RUnlock() + if localVoter.poly == nil { logger.Warning("peer attempted to eval before we had our poly:", stream.Conn().RemotePeer()) return @@ -151,25 +134,25 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream, election defer peer.inputMu.Unlock() peer.input = new(big.Int).SetBytes(inputBytes) logger.Infof("%s input: %s", peer.addrInfo.ID, peer.input) - output := me.poly.Eval(peer.input) + output := localVoter.poly.Eval(peer.input) rw.WriteString(base58.Encode(output.Bytes())) rw.Flush() case "sum": - me.sumMu.RLock() - defer me.sumMu.RUnlock() - if me.sum == nil { + localVoter.sumMu.RLock() + defer localVoter.sumMu.RUnlock() + if localVoter.sum == nil { logger.Info("peer attempted to fetch sum "+ "before we computed it:", stream.Conn().RemotePeer()) return } - rw.WriteString(base58.Encode(me.sum.Bytes())) + rw.WriteString(base58.Encode(localVoter.sum.Bytes())) rw.Flush() default: logger.Warningf("uknown command %s", cmd) } } -func streamHandler(stream network.Stream, election *Election, me *Me) { +func streamHandler(stream network.Stream, election *Election, localVoter *LocalVoter) { logger.Info("got a new stream:", stream) logger.Info("remote peer:", stream.Conn().RemotePeer()) rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) @@ -181,18 +164,18 @@ func streamHandler(stream network.Stream, election *Election, me *Me) { cmd = stripNewline(cmd) logger.Info("cmd:", cmd) - handleCmd(cmd, rw, stream, election, me) + handleCmd(cmd, rw, stream, election, localVoter) stream.Close() } -func findPeers(closeElection <-chan int, election *Election, me *Me) { - routingDiscovery := discovery.NewRoutingDiscovery(me.kdht) +func findPeers(closeElection <-chan int, election *Election, localVoter *LocalVoter) { + routingDiscovery := discovery.NewRoutingDiscovery(localVoter.kdht) logger.Info("announcing ourselves") - discovery.Advertise(me.ctx, routingDiscovery, string(election.rendezvousNonce)) + discovery.Advertise(localVoter.ctx, routingDiscovery, string(election.rendezvousNonce)) logger.Info("successfully announced!") fmt.Println("finding other voters...") - peerChan, err := routingDiscovery.FindPeers(me.ctx, string(election.rendezvousNonce)) + peerChan, err := routingDiscovery.FindPeers(localVoter.ctx, string(election.rendezvousNonce)) if err != nil { panic(err) } @@ -203,17 +186,17 @@ func findPeers(closeElection <-chan int, election *Election, me *Me) { } select { case peer := <-peerChan: - if peer.ID == me.ID() { + if peer.ID == localVoter.ID() { continue } fmt.Printf("found voter: %s\n", peer.ID) logger.Info("connecting to:", peer) - err = me.Connect(me.ctx, peer) + err = localVoter.Connect(localVoter.ctx, peer) if err != nil { logger.Warn("couldn't connect to peer: ", err) continue } - stream, err := me.NewStream(me.ctx, peer.ID, protocolID) + stream, err := localVoter.NewStream(localVoter.ctx, peer.ID, protocolID) if err != nil { logger.Warn("couldn't open stream with peer: ", err) continue @@ -234,14 +217,14 @@ func findPeers(closeElection <-chan int, election *Election, me *Me) { logger.Info("done finding peers") } -func (voter *Voter) fetchNumber(election *Election, me *Me, cmd string, args ...string) *big.Int { +func (voter *Voter) fetchNumber(election *Election, localVoter *LocalVoter, cmd string, args ...string) *big.Int { printErr := func(err error, msg string) { logger.Errorf("%s: %s while fetcing `%s'; retrying in 2 seconds", voter.addrInfo.ID, msg, cmd) time.Sleep(time.Second * 2) } retry: - stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID) + stream, err := localVoter.NewStream(localVoter.ctx, voter.addrInfo.ID, protocolID) if err != nil { printErr(err, "couldn't open stream") goto retry @@ -284,31 +267,31 @@ retry: return new(big.Int).SetBytes(retBytes) } -func startVoting(election *Election, me *Me) { +func startVoting(election *Election, localVoter *LocalVoter) { var err error - me.inputMu.Lock() - me.input, err = RandomBigInt(128, false) - me.inputMu.Unlock() + localVoter.inputMu.Lock() + localVoter.input, err = RandomBigInt(128, false) + localVoter.inputMu.Unlock() if err != nil { panic(err) } - logger.Infof("our input: %s", me.input) + logger.Infof("our input: %s", localVoter.input) - ballot := vote(election.candidates) + ballot := vote(election.Candidates) logger.Infof("our ballot: %v", ballot) // no +1 since we want degree k-1 where k is total number of voters - me.polyMu.Lock() - me.poly = NewRandomPoly(uint(len(election.remoteVoters)), 1024, ballot) - me.polyMu.Unlock() - logger.Infof("our constant: %s", me.poly.constant) + localVoter.polyMu.Lock() + localVoter.poly = NewRandomPoly(uint(len(election.remoteVoters)), 1024, ballot) + localVoter.polyMu.Unlock() + logger.Infof("our constant: %s", localVoter.poly.constant) // get outputs var wg sync.WaitGroup for _, voter := range election.remoteVoters { wg.Add(1) go func(voter *Voter) { - voter.output = voter.fetchNumber(election, me, "eval", base58.Encode(me.input.Bytes())) + voter.output = voter.fetchNumber(election, localVoter, "eval", base58.Encode(localVoter.input.Bytes())) logger.Infof("voter %s output: %s", voter.addrInfo.ID, voter.output) wg.Done() }(voter) @@ -316,19 +299,19 @@ func startVoting(election *Election, me *Me) { wg.Wait() // calculate sum - me.sumMu.Lock() - me.sum = me.poly.Eval(me.input) + localVoter.sumMu.Lock() + localVoter.sum = localVoter.poly.Eval(localVoter.input) for _, voter := range election.remoteVoters { - me.sum.Add(me.sum, voter.output) + localVoter.sum.Add(localVoter.sum, voter.output) } - me.sumMu.Unlock() - logger.Infof("our sum: %s", me.sum) + localVoter.sumMu.Unlock() + logger.Infof("our sum: %s", localVoter.sum) // get sums for _, voter := range election.remoteVoters { wg.Add(1) go func(voter *Voter) { - voter.sum = voter.fetchNumber(election, me, "sum") + voter.sum = voter.fetchNumber(election, localVoter, "sum") logger.Infof("voter %s sum: %s", voter.addrInfo.ID, voter.sum) wg.Done() @@ -336,7 +319,7 @@ func startVoting(election *Election, me *Me) { } wg.Wait() - mat := constructPolyMatrix(election, me) + mat := constructPolyMatrix(election, localVoter) mat.RREF() constant := mat[0][len(mat[0])-1] @@ -347,16 +330,16 @@ func startVoting(election *Election, me *Me) { result := constant.Num().Bytes() // number of bytes we need to insert at the front since they're zero - diff := (len(election.candidates)*len(election.candidates)) - len(result) + diff := (len(election.Candidates)*len(election.Candidates)) - len(result) result = append(make([]byte, diff), result...) - printResults(result, election.candidates) + printResults(result, election.Candidates) // temporary select {} } -func constructPolyMatrix(election *Election, me *Me) Matrix { +func constructPolyMatrix(election *Election, localVoter *LocalVoter) Matrix { mat := make(Matrix, len(election.remoteVoters) + 1) // row for everyone (including ourselves) i := 0 @@ -378,9 +361,9 @@ func constructPolyMatrix(election *Election, me *Me) Matrix { row[0].SetInt64(1) var j int64 for j = 1; j <= int64(len(election.remoteVoters)); j++ { - row[j].SetInt(new(big.Int).Exp(me.input, big.NewInt(j), nil)) + row[j].SetInt(new(big.Int).Exp(localVoter.input, big.NewInt(j), nil)) } - row[j].SetInt(me.sum) + row[j].SetInt(localVoter.sum) return mat } -- 2.38.4