From 377fc6da4792cc38e36706528ab6ceb2d0089c2c Mon Sep 17 00:00:00 2001 From: David Florness Date: Sun, 7 Jun 2020 20:13:18 -0600 Subject: [PATCH] Begin voting loop --- main.go | 22 +++---- voter.go | 180 +++++++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 182 insertions(+), 20 deletions(-) diff --git a/main.go b/main.go index 9df67b9..c1b4153 100644 --- a/main.go +++ b/main.go @@ -32,7 +32,7 @@ var ( merkleRoot []byte masterID peer.ID - me Voter + me Me // slave: signifies when election was closed by master // @@ -47,7 +47,7 @@ func bootstrap() { me.ctx = context.Background() - me.h, err = libp2p.New(me.ctx, + me.Host, err = libp2p.New(me.ctx, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { var err error me.kdht, err = dht.New(me.ctx, h) @@ -65,8 +65,8 @@ func bootstrap() { panic(err) } - logger.Info("host:", me.h.ID()) - logger.Info(me.h.Addrs()) + logger.Info("host:", me.ID()) + logger.Info(me.Addrs()) var wg sync.WaitGroup for _, peerAddr := range dht.DefaultBootstrapPeers { @@ -74,7 +74,7 @@ func bootstrap() { wg.Add(1) go func() { defer wg.Done() - if err := me.h.Connect(me.ctx, *peerInfo); err != nil { + if err := me.Connect(me.ctx, *peerInfo); err != nil { logger.Warning(err) } else { logger.Info("connection established with bootstrap node:", *peerInfo) @@ -87,10 +87,10 @@ func bootstrap() { fmt.Println("share this with peers:") fmt.Printf("%s0%s\n", base58.Encode(optionsMerkle.MerkleRoot()), - me.h.ID()) + me.ID()) logger.Info("waiting for incoming streams and finding voters...") - me.h.SetStreamHandler(protocolID, streamHandler) + me.SetStreamHandler(protocolID, streamHandler) ch := make(chan int, 1) closeElection = ch @@ -108,7 +108,7 @@ func bootstrap() { closeElection <- n close(closeElection) for _, voter := range me.otherVoters { - stream, err := me.h.NewStream(me.ctx, voter.ID, protocolID) + stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID) if err != nil { panic(err) } @@ -119,7 +119,7 @@ func bootstrap() { } } else { // we are a slave logger.Info("attempting to open stream with master peer...") - stream, err := me.h.NewStream(me.ctx, masterID, protocolID) + stream, err := me.NewStream(me.ctx, masterID, protocolID) rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) if err != nil { panic(err) @@ -162,12 +162,14 @@ func bootstrap() { verifyElectionInfo() // now that we have election info, begin handling streams - me.h.SetStreamHandler(protocolID, streamHandler) + me.SetStreamHandler(protocolID, streamHandler) // channel used to signify when election is closed ch := make(chan int, 1) closeElection = ch findPeers(ch) } + + startVoting() } func main() { diff --git a/voter.go b/voter.go index 85d29d1..832dbd7 100644 --- a/voter.go +++ b/voter.go @@ -5,22 +5,38 @@ import ( "context" "fmt" "io" + "math/big" "os" "strconv" + "sync" + "time" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" discovery "github.com/libp2p/go-libp2p-discovery" dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/mr-tron/base58/base58" "github.com/multiformats/go-multiaddr" ) type Voter struct { + sum *big.Int + sumMutex sync.RWMutex + output *big.Int + addrInfo peer.AddrInfo +} + +type Me struct { + Voter + host.Host ctx context.Context - h host.Host kdht *dht.IpfsDHT - otherVoters []peer.AddrInfo + otherVoters []*Voter + poly *Poly + polyMutex sync.RWMutex + input *big.Int + inputMutex sync.RWMutex } func stripNewline(str string) string { @@ -73,16 +89,59 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) { return } for _, voter := range me.otherVoters { - if p == voter.ID { + if p == voter.addrInfo.ID { logger.Warning("peer attempted to shake after "+ "having already done so", p) return } } - me.otherVoters = append(me.otherVoters, peer.AddrInfo{ - ID: p, - Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()}, + me.otherVoters = append(me.otherVoters, &Voter{ + addrInfo: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()}, + }, }) + case "eval": // peer is giving their input and requesting output from our poly + me.inputMutex.RLock() + defer me.inputMutex.RUnlock() + if me.input == nil { + logger.Warning("peer attempted to eval before voting started:", + stream.Conn().RemotePeer()) + return + } + inputb58, err := rw.ReadString('\n') + if err != nil && err != io.EOF { + logger.Warning("unable to read input from peer during eval:", + stream.Conn().RemotePeer()) + return + } + inputb58 = stripNewline(inputb58) + inputBytes, err := base58.Decode(inputb58) + if err != nil { + logger.Warning("unable to base58 decode input from peer during eval:", + stream.Conn().RemotePeer()) + return + } + input := &big.Int{} + input.SetBytes(inputBytes) + output := me.poly.Eval(input) + rw.WriteString(base58.Encode(output.Bytes())) + rw.Flush() + case "sum": + if !voting { + logger.Warning("peer attempted to fetch sum "+ + "before voting started:", stream.Conn().RemotePeer()) + return + } + me.sumMutex.RLock() + defer me.sumMutex.RUnlock() + if me.sum == nil { + logger.Info("peer attempted to fetch sum "+ + "before we computed it:", stream.Conn().RemotePeer()) + return + } + rw.WriteString(base58.Encode(me.sum.Bytes())) + rw.Flush() } } @@ -120,21 +179,21 @@ func findPeers(closeElection <-chan int) { } select { case peer := <-peerChan: - if peer.ID == me.h.ID() { + if peer.ID == me.ID() { continue } logger.Info("found voter:", peer) logger.Info("connecting to:", peer) - err = me.h.Connect(me.ctx, peer) + err = me.Connect(me.ctx, peer) - stream, err := me.h.NewStream(me.ctx, peer.ID, protocolID) + stream, err := me.NewStream(me.ctx, peer.ID, protocolID) if err == nil { writer := bufio.NewWriter(stream) writer.WriteString("shake") writer.Flush() stream.Close() - me.otherVoters = append(me.otherVoters, peer) + me.otherVoters = append(me.otherVoters, &Voter{addrInfo: peer}) if numPeers != -1 && numPeers == len(me.otherVoters) { break } @@ -150,3 +209,104 @@ func findPeers(closeElection <-chan int) { } logger.Info("done finding peers") } + +func (voter *Voter) fetchNumber(cmd string, args ...string) *big.Int { + printErr := func(err error, msg string) { + logger.Errorf("%s: %s during `%s'; retrying in 2 seconds", + voter.addrInfo.ID, cmd, msg) + time.Sleep(time.Second * 2) + } +retry: + stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID) + if err != nil { + printErr(err, "couldn't open stream") + goto retry + } + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + out := cmd + for _, arg := range args { + out += "\n" + arg + } + _, err = rw.WriteString(out) + if err != nil { + printErr(err, "couldn't write to stream") + goto retry + } + err = rw.Flush() + if err != nil { + printErr(err, "couldn't flush stream") + goto retry + } + err = stream.Close() // only closes writing + if err != nil { + printErr(err, "couldn't close stream") + goto retry + } + retB58, err := rw.ReadString('\n') + if err != nil && err != io.EOF { + printErr(err, "couldn't read string from stream") + goto retry + } + retB58 = stripNewline(retB58) + retBytes, err := base58.Decode(retB58) + if err != nil { + printErr(err, "couldn't base58-decode contents from stream") + goto retry + } + bi := &big.Int{} + bi.SetBytes(retBytes) + return bi +} + +func startVoting() { + var err error + me.input, err = RandomBigInt(128, false) + if err != nil { + panic(err) + } + + ballot := vote(candidates) + + // no +1 since we want degree k-1 where k is total number of voters + me.poly = NewRandomPoly(uint(len(me.otherVoters)), 1024, ballot) + + // get outputs + var wg sync.WaitGroup + for _, voter := range me.otherVoters { + wg.Add(1) + go func(voter *Voter) { + voter.output = voter.fetchNumber("eval", + base58.Encode(me.input.Bytes())) + logger.Infof("voter %s output: %s", + voter.addrInfo.ID, voter.output) + wg.Done() + }(voter) + } + wg.Wait() + + // calculate sum + me.sumMutex.Lock() + me.sum = me.poly.Eval(me.input) + for _, voter := range me.otherVoters { + me.sum.Add(me.sum, voter.output) + } + me.sumMutex.Unlock() + + // get sums + for _, voter := range me.otherVoters { + wg.Add(1) + go func (voter *Voter) { + me.sumMutex.RLock() + voter.sum = voter.fetchNumber("sum", + base58.Encode(me.sum.Bytes())) + me.sumMutex.RUnlock() + logger.Infof("voter %s sum: %s", + voter.addrInfo.ID, voter.sum) + wg.Done() + }(voter) + } + wg.Wait() + + // temporary + select {} +} -- 2.38.4