M main.go => main.go +27 -29
@@ 22,30 22,21 @@ import (
)
var (
- logger = log.Logger("tallyard")
- protocolID = protocol.ID("/tallyard/0.0.0")
-
- candidates []Candidate
- optionsMerkle *merkletree.MerkleTree
-
+ logger = log.Logger("tallyard")
+ protocolID = protocol.ID("/tallyard/0.0.0")
+ candidates []Candidate
+ optionsMerkle *merkletree.MerkleTree
rendezvousNonce Nonce
merkleRoot []byte
- masterID peer.ID
-
- me Me
-
- // slave: signifies when election was closed by master
- //
- // master: signifies when user hits ENTER to close the election
- //
- // returns the number of peers know by master
- closeElection chan<- int
+ me Me
+ election Election
)
func bootstrap() {
var err error
me.ctx = context.Background()
+ election.remoteVoters = make(map[peer.ID]*Voter)
me.Host, err = libp2p.New(me.ctx,
libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
@@ 83,18 74,21 @@ func bootstrap() {
}
wg.Wait()
- if masterID == "" { // we are the master
+ if election.masterID == "" { // we are the master
fmt.Println("share this with peers:")
fmt.Printf("%s0%s\n",
base58.Encode(optionsMerkle.MerkleRoot()),
me.ID())
logger.Info("waiting for incoming streams and finding voters...")
- me.SetStreamHandler(protocolID, streamHandler)
+ election.Lock()
ch := make(chan int, 1)
- closeElection = ch
+ election.close = ch
go findPeers(ch)
+ election.Unlock()
+
+ me.SetStreamHandler(protocolID, streamHandler)
fmt.Println("press ENTER to solidify group of voters and start voting")
stdReader := bufio.NewReader(os.Stdin)
@@ 104,10 98,13 @@ func bootstrap() {
}
logger.Info("ENTER has been pressed; closing election")
- n := len(me.otherVoters)
- closeElection <- n
- close(closeElection)
- for _, voter := range me.otherVoters {
+ election.Lock()
+ n := len(election.remoteVoters)
+ election.close <- n
+ close(election.close)
+ election.Unlock()
+ election.RLock()
+ for _, voter := range election.remoteVoters {
stream, err := me.NewStream(me.ctx, voter.addrInfo.ID, protocolID)
if err != nil {
panic(err)
@@ 117,9 114,10 @@ func bootstrap() {
writer.Flush()
stream.Close()
}
+ election.RUnlock()
} else { // we are a slave
logger.Info("attempting to open stream with master peer...")
- stream, err := me.NewStream(me.ctx, masterID, protocolID)
+ stream, err := me.NewStream(me.ctx, election.masterID, protocolID)
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
if err != nil {
panic(err)
@@ 161,15 159,13 @@ func bootstrap() {
logger.Info("checking authenticity of election info...")
verifyElectionInfo()
- // now that we have election info, begin handling streams
- me.SetStreamHandler(protocolID, streamHandler)
// channel used to signify when election is closed
ch := make(chan int, 1)
- closeElection = ch
+ election.close = ch
+ // now that we have election info, begin handling streams
+ me.SetStreamHandler(protocolID, streamHandler)
findPeers(ch)
}
-
- startVoting()
}
func main() {
@@ 194,4 190,6 @@ func main() {
}
bootstrap()
+
+ startVoting()
}
M poly.go => poly.go +3 -3
@@ 34,11 34,11 @@ func NewRandomPoly(degree uint, entropy uint, ballot []byte) *Poly {
p := &Poly{constant, make([]*big.Int, degree)}
// number of bits per coefficient
- numBits := uint(math.Ceil(float64(entropy)/float64(degree)))
+ numBits := uint(math.Ceil(float64(entropy) / float64(degree)))
// number of bytes per coefficient
numBytes := numBits / 8
- if numBits % 8 > 0 {
+ if numBits%8 > 0 {
numBytes += 1
}
@@ 58,7 58,7 @@ func (p *Poly) Eval(input *big.Int) *big.Int {
res.Set(p.constant)
for i, coef := range p.coefs {
- degree := big.NewInt(int64(i+1))
+ degree := big.NewInt(int64(i + 1))
term := &big.Int{}
term.Exp(input, degree, nil)
term.Mul(term, coef)
M ui.go => ui.go +4 -5
@@ 64,17 64,17 @@ func joinElection() {
zeroi := strings.IndexByte(electionKey, '0')
var err error
+ logger.Info("merkle root:", electionKey[:zeroi])
merkleRoot, err = base58.Decode(electionKey[:zeroi])
if err != nil {
panic(err)
}
- logger.Info("merkle root:", merkleRoot)
- masterID, err = peer.Decode(electionKey[zeroi+1:])
+ election.masterID, err = peer.Decode(electionKey[zeroi+1:])
if err != nil {
panic(err)
}
- logger.Info("master ID:", masterID)
+ logger.Info("master ID:", election.masterID)
})
if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
panic(err)
@@ 98,8 98,7 @@ func vote(candidates []Candidate) []byte {
form.AddButton("Submit", func() {
app.Stop()
for i := 0; i < len(candidates); i++ {
- rank, err := strconv.Atoi(form.GetFormItem(i).
- (*tview.InputField).GetText())
+ rank, err := strconv.Atoi(form.GetFormItem(i).(*tview.InputField).GetText())
if err != nil {
panic(err)
}
M voter.go => voter.go +62 -58
@@ 21,8 21,9 @@ import (
)
type Voter struct {
+ sync.RWMutex
sum *big.Int
- sumMutex sync.RWMutex
+ input *big.Int
output *big.Int
addrInfo peer.AddrInfo
}
@@ 30,13 31,22 @@ type Voter struct {
type Me struct {
Voter
host.Host
- ctx context.Context
- kdht *dht.IpfsDHT
- otherVoters []*Voter
- poly *Poly
- polyMutex sync.RWMutex
- input *big.Int
- inputMutex sync.RWMutex
+ ctx context.Context
+ kdht *dht.IpfsDHT
+ poly *Poly
+}
+
+type Election struct {
+ sync.RWMutex
+ // for slave: signifies when election was closed by master
+ //
+ // for master: signifies when user hits ENTER to close the election
+ //
+ // the number of peers know by master is passed through it
+ close chan<- int
+ closed bool // used by handleCmd to prevent closing election more than once
+ masterID peer.ID
+ remoteVoters map[peer.ID]*Voter
}
func stripNewline(str string) string {
@@ 49,9 59,6 @@ func stripNewline(str string) string {
return str
}
-// used by handleCmd to prevent canceling election more than once
-var electionClosed bool
-
func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
switch cmd {
case "info":
@@ 61,12 68,14 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
}
rw.Flush()
case "close":
- if electionClosed {
- logger.Warning("election already closed")
+ election.Lock()
+ defer election.Unlock()
+ if peer := stream.Conn().RemotePeer(); peer != election.masterID {
+ logger.Warning("received close command from non-master:", peer)
return
}
- if peer := stream.Conn().RemotePeer(); peer != masterID {
- logger.Warning("received close command from non-master:", peer)
+ if election.closed {
+ logger.Warning("election already closed")
return
}
str, err := rw.ReadString('\n')
@@ 78,34 87,33 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
if err != nil {
panic(err)
}
- closeElection <- numPeers
- close(closeElection)
- electionClosed = true
+ election.close <- numPeers
+ close(election.close)
+ election.closed = true
case "shake":
- p := stream.Conn().RemotePeer()
- if electionClosed {
+ election.Lock()
+ defer election.Unlock()
+ peerID := stream.Conn().RemotePeer()
+ if election.closed {
logger.Warning("peer attempted to shake after "+
- "election was closed:", p)
+ "election was closed:", peerID)
return
}
- for _, voter := range me.otherVoters {
- if p == voter.addrInfo.ID {
- logger.Warning("peer attempted to shake after "+
- "having already done so", p)
- return
- }
+ if _, exists := election.remoteVoters[peerID]; exists {
+ logger.Warning("peer attempted to shake after having already done so", peerID)
+ return
}
- me.otherVoters = append(me.otherVoters, &Voter{
+ election.remoteVoters[peerID] = &Voter{
addrInfo: peer.AddrInfo{
- ID: p,
+ ID: peerID,
Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()},
},
- })
+ }
case "eval": // peer is giving their input and requesting output from our poly
- me.inputMutex.RLock()
- defer me.inputMutex.RUnlock()
- if me.input == nil {
- logger.Warning("peer attempted to eval before voting started:",
+ me.RLock()
+ defer me.RUnlock()
+ if me.poly == nil {
+ logger.Warning("peer attempted to eval before we had our poly:",
stream.Conn().RemotePeer())
return
}
@@ 128,13 136,8 @@ func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
rw.WriteString(base58.Encode(output.Bytes()))
rw.Flush()
case "sum":
- if !voting {
- logger.Warning("peer attempted to fetch sum "+
- "before voting started:", stream.Conn().RemotePeer())
- return
- }
- me.sumMutex.RLock()
- defer me.sumMutex.RUnlock()
+ me.RLock()
+ defer me.RUnlock()
if me.sum == nil {
logger.Info("peer attempted to fetch sum "+
"before we computed it:", stream.Conn().RemotePeer())
@@ 174,7 177,7 @@ func findPeers(closeElection <-chan int) {
}
numPeers := -1
for {
- if numPeers != -1 && numPeers == len(me.otherVoters) {
+ if numPeers != -1 && numPeers == len(election.remoteVoters) {
break
}
select {
@@ 193,16 196,13 @@ func findPeers(closeElection <-chan int) {
writer.WriteString("shake")
writer.Flush()
stream.Close()
- me.otherVoters = append(me.otherVoters, &Voter{addrInfo: peer})
- if numPeers != -1 && numPeers == len(me.otherVoters) {
- break
- }
+ election.remoteVoters[peer.ID] = &Voter{addrInfo: peer}
} else {
logger.Warning("connection failed:", err)
}
case numPeers = <-closeElection:
- if len(me.otherVoters) > numPeers {
- logger.Error("found more peers than master!")
+ if len(election.remoteVoters) > numPeers {
+ logger.Fatal("found more peers than master!")
os.Exit(1)
}
}
@@ 213,7 213,7 @@ func findPeers(closeElection <-chan int) {
func (voter *Voter) fetchNumber(cmd string, args ...string) *big.Int {
printErr := func(err error, msg string) {
logger.Errorf("%s: %s during `%s'; retrying in 2 seconds",
- voter.addrInfo.ID, cmd, msg)
+ voter.addrInfo.ID, msg, cmd)
time.Sleep(time.Second * 2)
}
retry:
@@ 248,6 248,10 @@ retry:
goto retry
}
retB58 = stripNewline(retB58)
+ if retB58 == "" {
+ printErr(err, "empty string")
+ goto retry
+ }
retBytes, err := base58.Decode(retB58)
if err != nil {
printErr(err, "couldn't base58-decode contents from stream")
@@ 268,11 272,11 @@ func startVoting() {
ballot := vote(candidates)
// no +1 since we want degree k-1 where k is total number of voters
- me.poly = NewRandomPoly(uint(len(me.otherVoters)), 1024, ballot)
+ me.poly = NewRandomPoly(uint(len(election.remoteVoters)), 1024, ballot)
// get outputs
var wg sync.WaitGroup
- for _, voter := range me.otherVoters {
+ for _, voter := range election.remoteVoters {
wg.Add(1)
go func(voter *Voter) {
voter.output = voter.fetchNumber("eval",
@@ 285,23 289,23 @@ func startVoting() {
wg.Wait()
// calculate sum
- me.sumMutex.Lock()
+ me.Lock()
me.sum = me.poly.Eval(me.input)
- for _, voter := range me.otherVoters {
+ for _, voter := range election.remoteVoters {
me.sum.Add(me.sum, voter.output)
}
- me.sumMutex.Unlock()
+ me.Unlock()
// get sums
- for _, voter := range me.otherVoters {
+ for _, voter := range election.remoteVoters {
wg.Add(1)
- go func (voter *Voter) {
- me.sumMutex.RLock()
+ go func(voter *Voter) {
+ me.RLock()
voter.sum = voter.fetchNumber("sum",
base58.Encode(me.sum.Bytes()))
- me.sumMutex.RUnlock()
logger.Infof("voter %s sum: %s",
voter.addrInfo.ID, voter.sum)
+ me.RUnlock()
wg.Done()
}(voter)
}