@@ 32,7 32,7 @@ var (
merkleRoot []byte
masterID peer.ID
- me Voter
+ me Me
// slave: signifies when election was closed by master
//
@@ 47,7 47,7 @@ func bootstrap() {
me.ctx = context.Background()
- me.h, err = libp2p.New(me.ctx,
+ me.Host, err = libp2p.New(me.ctx,
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
var err error
me.kdht, err = dht.New(me.ctx, h)
@@ 65,8 65,8 @@ func bootstrap() {
panic(err)
}
- logger.Info("host:", me.h.ID())
- logger.Info(me.h.Addrs())
+ logger.Info("host:", me.ID())
+ logger.Info(me.Addrs())
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
@@ 74,7 74,7 @@ func bootstrap() {
wg.Add(1)
go func() {
defer wg.Done()
- if err := me.h.Connect(me.ctx, *peerInfo); err != nil {
+ if err := me.Connect(me.ctx, *peerInfo); err != nil {
logger.Warning(err)
} else {
logger.Info("connection established with bootstrap node:", *peerInfo)
@@ 87,10 87,10 @@ func bootstrap() {
fmt.Println("share this with peers:")
fmt.Printf("%s0%s\n",
base58.Encode(optionsMerkle.MerkleRoot()),
- me.h.ID())
+ me.ID())
logger.Info("waiting for incoming streams and finding voters...")
- me.h.SetStreamHandler(protocolID, streamHandler)
+ me.SetStreamHandler(protocolID, streamHandler)
ch := make(chan int, 1)
closeElection = ch
@@ 108,7 108,7 @@ func bootstrap() {
closeElection <- n
close(closeElection)
for _, voter := range me.otherVoters {
- stream, err := me.h.NewStream(me.ctx, voter.ID, protocolID)
+ stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID)
if err != nil {
panic(err)
}
@@ 119,7 119,7 @@ func bootstrap() {
}
} else { // we are a slave
logger.Info("attempting to open stream with master peer...")
- stream, err := me.h.NewStream(me.ctx, masterID, protocolID)
+ stream, err := me.NewStream(me.ctx, masterID, protocolID)
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if err != nil {
panic(err)
@@ 162,12 162,14 @@ func bootstrap() {
verifyElectionInfo()
// now that we have election info, begin handling streams
- me.h.SetStreamHandler(protocolID, streamHandler)
+ me.SetStreamHandler(protocolID, streamHandler)
// channel used to signify when election is closed
ch := make(chan int, 1)
closeElection = ch
findPeers(ch)
}
+
+ startVoting()
}
func main() {
@@ 5,22 5,38 @@ import (
"context"
"fmt"
"io"
+ "math/big"
"os"
"strconv"
+ "sync"
+ "time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
+ "github.com/mr-tron/base58/base58"
"github.com/multiformats/go-multiaddr"
)
type Voter struct {
+ sum *big.Int
+ sumMutex sync.RWMutex
+ output *big.Int
+ addrInfo peer.AddrInfo
+}
+
+type Me struct {
+ Voter
+ host.Host
ctx context.Context
- h host.Host
kdht *dht.IpfsDHT
- otherVoters []peer.AddrInfo
+ otherVoters []*Voter
+ poly *Poly
+ polyMutex sync.RWMutex
+ input *big.Int
+ inputMutex sync.RWMutex
}
func stripNewline(str string) string {
@@ 73,16 89,59 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
return
}
for _, voter := range me.otherVoters {
- if p == voter.ID {
+ if p == voter.addrInfo.ID {
logger.Warning("peer attempted to shake after "+
"having already done so", p)
return
}
}
- me.otherVoters = append(me.otherVoters, peer.AddrInfo{
- ID: p,
- Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()},
+ me.otherVoters = append(me.otherVoters, &Voter{
+ addrInfo: peer.AddrInfo{
+ ID: p,
+ Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()},
+ },
})
+ case "eval": // peer is giving their input and requesting output from our poly
+ me.inputMutex.RLock()
+ defer me.inputMutex.RUnlock()
+ if me.input == nil {
+ logger.Warning("peer attempted to eval before voting started:",
+ stream.Conn().RemotePeer())
+ return
+ }
+ inputb58, err := rw.ReadString('\n')
+ if err != nil && err != io.EOF {
+ logger.Warning("unable to read input from peer during eval:",
+ stream.Conn().RemotePeer())
+ return
+ }
+ inputb58 = stripNewline(inputb58)
+ inputBytes, err := base58.Decode(inputb58)
+ if err != nil {
+ logger.Warning("unable to base58 decode input from peer during eval:",
+ stream.Conn().RemotePeer())
+ return
+ }
+ input := &big.Int{}
+ input.SetBytes(inputBytes)
+ output := me.poly.Eval(input)
+ rw.WriteString(base58.Encode(output.Bytes()))
+ rw.Flush()
+ case "sum":
+ if !voting {
+ logger.Warning("peer attempted to fetch sum "+
+ "before voting started:", stream.Conn().RemotePeer())
+ return
+ }
+ me.sumMutex.RLock()
+ defer me.sumMutex.RUnlock()
+ if me.sum == nil {
+ logger.Info("peer attempted to fetch sum "+
+ "before we computed it:", stream.Conn().RemotePeer())
+ return
+ }
+ rw.WriteString(base58.Encode(me.sum.Bytes()))
+ rw.Flush()
}
}
@@ 120,21 179,21 @@ func findPeers(closeElection <-chan int) {
}
select {
case peer := <-peerChan:
- if peer.ID == me.h.ID() {
+ if peer.ID == me.ID() {
continue
}
logger.Info("found voter:", peer)
logger.Info("connecting to:", peer)
- err = me.h.Connect(me.ctx, peer)
+ err = me.Connect(me.ctx, peer)
- stream, err := me.h.NewStream(me.ctx, peer.ID, protocolID)
+ stream, err := me.NewStream(me.ctx, peer.ID, protocolID)
if err == nil {
writer := bufio.NewWriter(stream)
writer.WriteString("shake")
writer.Flush()
stream.Close()
- me.otherVoters = append(me.otherVoters, peer)
+ me.otherVoters = append(me.otherVoters, &Voter{addrInfo: peer})
if numPeers != -1 && numPeers == len(me.otherVoters) {
break
}
@@ 150,3 209,104 @@ func findPeers(closeElection <-chan int) {
}
logger.Info("done finding peers")
}
+
+func (voter *Voter) fetchNumber(cmd string, args ...string) *big.Int {
+ printErr := func(err error, msg string) {
+ logger.Errorf("%s: %s during `%s'; retrying in 2 seconds",
+ voter.addrInfo.ID, cmd, msg)
+ time.Sleep(time.Second * 2)
+ }
+retry:
+ stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID)
+ if err != nil {
+ printErr(err, "couldn't open stream")
+ goto retry
+ }
+ rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
+ out := cmd
+ for _, arg := range args {
+ out += "\n" + arg
+ }
+ _, err = rw.WriteString(out)
+ if err != nil {
+ printErr(err, "couldn't write to stream")
+ goto retry
+ }
+ err = rw.Flush()
+ if err != nil {
+ printErr(err, "couldn't flush stream")
+ goto retry
+ }
+ err = stream.Close() // only closes writing
+ if err != nil {
+ printErr(err, "couldn't close stream")
+ goto retry
+ }
+ retB58, err := rw.ReadString('\n')
+ if err != nil && err != io.EOF {
+ printErr(err, "couldn't read string from stream")
+ goto retry
+ }
+ retB58 = stripNewline(retB58)
+ retBytes, err := base58.Decode(retB58)
+ if err != nil {
+ printErr(err, "couldn't base58-decode contents from stream")
+ goto retry
+ }
+ bi := &big.Int{}
+ bi.SetBytes(retBytes)
+ return bi
+}
+
+func startVoting() {
+ var err error
+ me.input, err = RandomBigInt(128, false)
+ if err != nil {
+ panic(err)
+ }
+
+ ballot := vote(candidates)
+
+ // no +1 since we want degree k-1 where k is total number of voters
+ me.poly = NewRandomPoly(uint(len(me.otherVoters)), 1024, ballot)
+
+ // get outputs
+ var wg sync.WaitGroup
+ for _, voter := range me.otherVoters {
+ wg.Add(1)
+ go func(voter *Voter) {
+ voter.output = voter.fetchNumber("eval",
+ base58.Encode(me.input.Bytes()))
+ logger.Infof("voter %s output: %s",
+ voter.addrInfo.ID, voter.output)
+ wg.Done()
+ }(voter)
+ }
+ wg.Wait()
+
+ // calculate sum
+ me.sumMutex.Lock()
+ me.sum = me.poly.Eval(me.input)
+ for _, voter := range me.otherVoters {
+ me.sum.Add(me.sum, voter.output)
+ }
+ me.sumMutex.Unlock()
+
+ // get sums
+ for _, voter := range me.otherVoters {
+ wg.Add(1)
+ go func (voter *Voter) {
+ me.sumMutex.RLock()
+ voter.sum = voter.fetchNumber("sum",
+ base58.Encode(me.sum.Bytes()))
+ me.sumMutex.RUnlock()
+ logger.Infof("voter %s sum: %s",
+ voter.addrInfo.ID, voter.sum)
+ wg.Done()
+ }(voter)
+ }
+ wg.Wait()
+
+ // temporary
+ select {}
+}