~edwargix/tallyard

b88850f1ecdb9a39f4995e4b0a12dc93a7cc46e7 — David Florness 5 years ago c60710b
The master has the power to close elections
5 files changed, 396 insertions(+), 175 deletions(-)

M main.go
D master.go
A merkle.go
A ui.go
A voter.go
M main.go => main.go +95 -138
@@ 2,13 2,10 @@ package main

import (
	"bufio"
	"bytes"
	"context"
	"crypto/sha256"
	"fmt"
	"io"
	"os"
	"strings"
	"sync"

	"github.com/cbergoon/merkletree"


@@ 24,145 21,52 @@ import (
	"github.com/whyrusleeping/go-logging"
)

type ElectionOption string

var (
	logger = log.Logger("tallyard")
	ProtocolID = protocol.ID("/tallyard/0.0.0")
	logger     = log.Logger("tallyard")
	protocolID = protocol.ID("/tallyard/0.0.0")

	electionOptions []ElectionOption
	candidates    []ElectionOption
	optionsMerkle *merkletree.MerkleTree

	merkleRoot []byte
	masterID peer.ID

	ctx context.Context
	h host.Host
	kdht *dht.IpfsDHT
)

func (eo ElectionOption) CalculateHash() ([]byte, error) {
	h := sha256.New()
	if _, err := h.Write([]byte(string(eo))); err != nil {
		return nil, err
	}
	return h.Sum(nil), nil
}

func (eo ElectionOption) Equals(other merkletree.Content) (bool, error) {
	return string(eo) == string(other.(ElectionOption)), nil
}

func createElection() {
	var form *tview.Form
	n := 3
	app := tview.NewApplication()
	form = tview.NewForm().
		AddInputField("1.", "", 50, nil, nil).
		AddInputField("2.", "", 50, nil, nil).
		AddButton("+", func() {
			form.AddInputField(fmt.Sprintf("%d.", n), "", 50, nil, nil)
			n++
		}).
		AddButton("-", func() {
			// TODO: ensure from joiner that there are at least two
			// candidates
			if n > 3 {
				form.RemoveFormItem(n-2)
				n--
			}
		}).
		AddButton("Done", func() {
			// TODO: ensure none of the candidates are empty
			app.Stop()
			var content []merkletree.Content
			for i := 0; i < n-1; i++ {
				eo := ElectionOption(form.GetFormItem(i).(*tview.InputField).GetText())
				electionOptions = append(electionOptions, eo)
				content = append(content, eo)
			}
			var err error
			optionsMerkle, err = merkletree.NewTree(content)
			if err != nil {
				panic(err)
			}
		})
	if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
		panic(err)
	}
}

func joinElection() {
	app := tview.NewApplication()
	var form *tview.Form
	form = tview.NewForm().
		AddInputField("Election key:", "", 50, nil, nil).
		AddButton("Continue", func() {
			app.Stop()
			electionKey := form.GetFormItem(0).(*tview.InputField).GetText()
	rendezvousNonce Nonce
	merkleRoot      []byte
	masterID        peer.ID

			zeroi := strings.IndexByte(electionKey, '0')
			var err error
			merkleRoot, err = base58.Decode(electionKey[:zeroi])
			if err != nil {
				panic(err)
			}
			logger.Info("merkle root:", merkleRoot)

			masterID, err = peer.Decode(electionKey[zeroi+1:])
			if err != nil {
				panic(err)
			}
			logger.Info("master ID:", masterID)
		})
	if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
		panic(err)
	}
}
	me Voter

func checkMerkle() {
	var content []merkletree.Content
	var err error
	for _, eo := range electionOptions {
		content = append(content, eo)
	}
	optionsMerkle, err = merkletree.NewTree(content)
	if err != nil {
		panic(err)
	}
	if bytes.Compare(optionsMerkle.MerkleRoot(), merkleRoot) == 0 {
		fmt.Println("options merkle verification succeeded!")
	} else {
		fmt.Println("options merkle verification failed; exiting")
		os.Exit(1)
	}
}
	// slave: signifies when election was closed by master
	//
	// master: signifies when user hits ENTER to close the election
	//
	// returns the number of peers know by master
	closeElection chan<- int
)

func bootstrap() {
	var err error

	ctx = context.Background()
	me.ctx = context.Background()

	h, err = libp2p.New(ctx,
	me.h, err = libp2p.New(me.ctx,
		libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
			var err error
			kdht, err = dht.New(ctx, h)
			me.kdht, err = dht.New(me.ctx, h)
			if err != nil {
				return kdht, err
				return me.kdht, err
			}
			logger.Info("boostrapping the DHT")
			if err = kdht.Bootstrap(ctx); err != nil {
			if err = me.kdht.Bootstrap(me.ctx); err != nil {
				panic(err)
			}
			return kdht, err
			return me.kdht, err
		}),
	)
	if err != nil {
		panic(err)
	}

	logger.Info("host:", h.ID())
	logger.Info(h.Addrs())
	logger.Info("host:", me.h.ID())
	logger.Info(me.h.Addrs())

	var wg sync.WaitGroup
	for _, peerAddr := range dht.DefaultBootstrapPeers {


@@ 170,7 74,7 @@ func bootstrap() {
		wg.Add(1)
		go func() {
			defer wg.Done()
			if err := h.Connect(ctx, *peerInfo); err != nil {
			if err := me.h.Connect(me.ctx, *peerInfo); err != nil {
				logger.Warning(err)
			} else {
				logger.Info("connection established with bootstrap node:", *peerInfo)


@@ 183,38 87,91 @@ func bootstrap() {
		fmt.Println("share this with peers:")
		fmt.Printf("%s0%s\n",
			base58.Encode(optionsMerkle.MerkleRoot()),
			h.ID())
		logger.Info("waiting for incoming streams...")
		h.SetStreamHandler(ProtocolID, masterStreamHandler)
			me.h.ID())

		logger.Info("waiting for incoming streams and finding voters...")
		me.h.SetStreamHandler(protocolID, streamHandler)

		ch := make(chan int, 1)
		closeElection = ch

		go func() {
			fmt.Println("press ENTER to solidify group of voters and start voting")
			stdReader := bufio.NewReader(os.Stdin)
			_, err := stdReader.ReadString('\n')
			if err != nil {
				panic(err)
			}

			logger.Info("ENTER has been pressed; closing election")
			n := len(me.otherVoters)
			closeElection <- n
			for _, voter := range me.otherVoters {
				stream, err := me.h.NewStream(me.ctx, voter.ID, protocolID)
				if err != nil {
					panic(err)
				}
				writer := bufio.NewWriter(stream)
				writer.WriteString(fmt.Sprintf("close\n%d", n))
				writer.Flush()
				stream.Close()
			}
		}()

		findPeers(ch)

		select {} // temporary
	} else { // we are a slave

		logger.Info("attempting to open stream with master peer...")
		stream, err := h.NewStream(ctx, masterID, ProtocolID)
		stream, err := me.h.NewStream(me.ctx, masterID, protocolID)
		rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
		if err != nil {
			panic(err)
		}
		logger.Info("opened stream with master peer")
		_, err = rw.WriteString("options")

		logger.Info("fetching election info from master")
		_, err = rw.WriteString("info")
		if err != nil {
			panic(err)
		}
		rw.Flush()
		stream.Close()  // only stops writing
		stream.Close() // only stops writing
		// first field is the rendezvous string, which is used for peer
		// discovery
		str, err := rw.ReadString('\n')
		if err != nil && err != io.EOF {
			panic(err)
		}
		str = stripNewline(str)
		rendezvousNonce = Nonce(str)
		// remaining fields are the candidates
		for {
			str, err := rw.ReadString('\n')
			if err == io.EOF {
				break
			} else if err != nil {
			if err != nil && err != io.EOF {
				panic(err)
			}
			if str[len(str)-1] == '\n' {
				str = str[:len(str)-1]
			str = stripNewline(str)
			if str != "" {
				candidates = append(candidates, ElectionOption(str))
				fmt.Println(str)
			}
			if err == io.EOF {
				break
			}
			electionOptions = append(electionOptions, ElectionOption(str))
			fmt.Println(str)
		}
		logger.Info("stream with master peer closed")
		checkMerkle()
		logger.Info("done fetching election info")

		logger.Info("checking authenticity of election info...")
		verifyElectionInfo()

		// now that we have election info, begin handling streams
		me.h.SetStreamHandler(protocolID, streamHandler)
		// channel used to signify when election is closed
		ch := make(chan int, 1)
		closeElection = ch
		findPeers(ch)
	}
}



@@ 229,8 186,10 @@ func main() {
		SetDoneFunc(func(buttonIndex int, buttonLabel string) {
			app.Stop()
			switch buttonLabel {
			case "Create Election": createElection()
			case "Join Election": joinElection()
			case "Create Election":
				createElection()
			case "Join Election":
				joinElection()
			}
		})
	if err := app.SetRoot(modal, false).EnableMouse(true).Run(); err != nil {


@@ 238,6 197,4 @@ func main() {
	}

	bootstrap()

	select {}
}

D master.go => master.go +0 -37
@@ 1,37 0,0 @@
package main

import (
	"bufio"
	"fmt"
	"io"

	"github.com/libp2p/go-libp2p-core/network"
)

func handleMasterCmd(cmd string, writer *bufio.Writer) {
	switch cmd {
	case "options":
		for _, option := range electionOptions {
			writer.WriteString(fmt.Sprintf("%s\n", option))
		}

		writer.Flush()
	}
}

func masterStreamHandler(stream network.Stream) {
	logger.Info("got a new stream!")
	rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

	cmd, err := rw.ReadString('\n')
	if err != nil && err != io.EOF {
		panic(err)
	}
	if cmd[len(cmd)-1] == '\n' {
		cmd = cmd[:len(cmd)-1]
	}

	fmt.Println(cmd)
	handleMasterCmd(cmd, rw.Writer)
	stream.Close()
}

A merkle.go => merkle.go +71 -0
@@ 0,0 1,71 @@
package main

import (
	"bytes"
	"crypto/rand"
	"crypto/sha256"
	"fmt"
	"os"

	"github.com/cbergoon/merkletree"
	"github.com/mr-tron/base58/base58"
)

type ElectionOption string

func (eo ElectionOption) CalculateHash() ([]byte, error) {
	h := sha256.New()
	if _, err := h.Write([]byte(eo)); err != nil {
		return nil, err
	}
	return h.Sum(nil), nil
}

func (eo ElectionOption) Equals(other merkletree.Content) (bool, error) {
	return eo == other.(ElectionOption), nil
}

type Nonce string

func NewNonce() Nonce {
	randBytes := make([]byte, 128)
	_, err := rand.Read(randBytes)
	if err != nil {
		panic(err)
	}
	return Nonce(base58.Encode(randBytes))
}

func (n Nonce) CalculateHash() ([]byte, error) {
	h := sha256.New()
	b, err := base58.Decode(string(n))
	if err != nil {
		return nil, err
	}
	if _, err = h.Write(b); err != nil {
		return nil, err
	}
	return h.Sum(nil), nil
}

func (n Nonce) Equals(other merkletree.Content) (bool, error) {
	return n == other.(Nonce), nil
}

func verifyElectionInfo() {
	content := []merkletree.Content{rendezvousNonce}
	var err error
	for _, eo := range candidates {
		content = append(content, eo)
	}
	optionsMerkle, err = merkletree.NewTree(content)
	if err != nil {
		panic(err)
	}
	if bytes.Compare(optionsMerkle.MerkleRoot(), merkleRoot) == 0 {
		fmt.Println("election info verification succeeded!")
	} else {
		fmt.Println("election info verification failed; exiting")
		os.Exit(1)
	}
}

A ui.go => ui.go +79 -0
@@ 0,0 1,79 @@
package main

import (
	"fmt"
	"strings"

	"github.com/cbergoon/merkletree"
	"github.com/libp2p/go-libp2p-core/peer"
	"github.com/mr-tron/base58/base58"
	"github.com/rivo/tview"
)

func createElection() {
	var form *tview.Form
	n := 3
	app := tview.NewApplication()
	form = tview.NewForm().
		AddInputField("1.", "", 50, nil, nil).
		AddInputField("2.", "", 50, nil, nil).
		AddButton("+", func() {
			form.AddInputField(fmt.Sprintf("%d.", n), "", 50, nil, nil)
			n++
		}).
		AddButton("-", func() {
			// TODO: ensure from joiner that there are at least two
			// candidates
			if n > 3 {
				form.RemoveFormItem(n - 2)
				n--
			}
		}).
		AddButton("Done", func() {
			// TODO: ensure none of the candidates are empty
			app.Stop()
			rendezvousNonce = NewNonce()
			content := []merkletree.Content{rendezvousNonce}
			for i := 0; i < n-1; i++ {
				eo := ElectionOption(form.GetFormItem(i).(*tview.InputField).GetText())
				candidates = append(candidates, eo)
				content = append(content, eo)
			}
			var err error
			optionsMerkle, err = merkletree.NewTree(content)
			if err != nil {
				panic(err)
			}
		})
	if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
		panic(err)
	}
}

func joinElection() {
	app := tview.NewApplication()
	var form *tview.Form
	form = tview.NewForm().
		AddInputField("Election key:", "", 100, nil, nil).
		AddButton("Continue", func() {
			app.Stop()
			electionKey := form.GetFormItem(0).(*tview.InputField).GetText()

			zeroi := strings.IndexByte(electionKey, '0')
			var err error
			merkleRoot, err = base58.Decode(electionKey[:zeroi])
			if err != nil {
				panic(err)
			}
			logger.Info("merkle root:", merkleRoot)

			masterID, err = peer.Decode(electionKey[zeroi+1:])
			if err != nil {
				panic(err)
			}
			logger.Info("master ID:", masterID)
		})
	if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
		panic(err)
	}
}

A voter.go => voter.go +151 -0
@@ 0,0 1,151 @@
package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"os"
	"strconv"

	"github.com/libp2p/go-libp2p-core/host"
	"github.com/libp2p/go-libp2p-core/network"
	"github.com/libp2p/go-libp2p-core/peer"
	discovery "github.com/libp2p/go-libp2p-discovery"
	dht "github.com/libp2p/go-libp2p-kad-dht"
	"github.com/multiformats/go-multiaddr"
)

type Voter struct {
	ctx         context.Context
	h           host.Host
	kdht        *dht.IpfsDHT
	otherVoters []peer.AddrInfo
}

func stripNewline(str string) string {
	if str == "" {
		return str
	}
	if str[len(str)-1] == '\n' {
		return str[:len(str)-1]
	}
	return str
}

// used by handleCmd to prevent canceling election more than once
var electionClosed bool

func handleCmd(cmd string, rw *bufio.ReadWriter, stream network.Stream) {
	switch cmd {
	case "info":
		rw.WriteString(fmt.Sprintf("%s\n", rendezvousNonce))
		for _, option := range candidates {
			rw.WriteString(fmt.Sprintf("%s\n", option))
		}
		rw.Flush()
	case "close":
		if electionClosed {
			logger.Warning("election already closed")
			return
		}
		if peer := stream.Conn().RemotePeer(); peer != masterID {
			logger.Warning("received close command from non-master:", peer)
			return
		}
		str, err := rw.ReadString('\n')
		if err != nil && err != io.EOF {
			panic(err)
		}
		str = stripNewline(str)
		numPeers, err := strconv.Atoi(str)
		if err != nil {
			panic(err)
		}
		closeElection <- numPeers
		electionClosed = true
	case "shake":
		p := stream.Conn().RemotePeer()
		if electionClosed {
			logger.Warning("peer attempted to shake after "+
				"election was closed:", p)
			return
		}
		for _, voter := range me.otherVoters {
			if p == voter.ID {
				logger.Warning("peer attempted to shake after "+
					"having already done so", p)
				return
			}
		}
		me.otherVoters = append(me.otherVoters, peer.AddrInfo{
			ID:    p,
			Addrs: []multiaddr.Multiaddr{stream.Conn().RemoteMultiaddr()},
		})
	}
}

func streamHandler(stream network.Stream) {
	logger.Info("got a new stream!", stream)
	logger.Info("remote peer:", stream.Conn().RemotePeer())
	rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

	cmd, err := rw.ReadString('\n')
	if err != nil && err != io.EOF {
		panic(err)
	}
	cmd = stripNewline(cmd)

	logger.Info("cmd:", cmd)
	handleCmd(cmd, rw, stream)
	stream.Close()
}

func findPeers(closeElection <-chan int) {
	routingDiscovery := discovery.NewRoutingDiscovery(me.kdht)
	logger.Info("announcing ourselves")
	discovery.Advertise(me.ctx, routingDiscovery, string(rendezvousNonce))
	logger.Info("successfully announced!")

	logger.Info("searching for other voters...")
	peerChan, err := routingDiscovery.FindPeers(me.ctx, string(rendezvousNonce))
	if err != nil {
		panic(err)
	}
	numPeers := -1
	for {
		if numPeers != -1 && numPeers == len(me.otherVoters) {
			break
		}
		select {
		case peer := <-peerChan:
			if peer.ID == me.h.ID() {
				continue
			}
			logger.Info("found voter:", peer)
			logger.Info("connecting to:", peer)

			err = me.h.Connect(me.ctx, peer)

			stream, err := me.h.NewStream(me.ctx, peer.ID, protocolID)
			if err == nil {
				writer := bufio.NewWriter(stream)
				writer.WriteString("shake")
				writer.Flush()
				stream.Close()
				me.otherVoters = append(me.otherVoters, peer)
				if numPeers != -1 && numPeers == len(me.otherVoters) {
					break
				}
			} else {
				logger.Warning("connection failed:", err)
			}
		case numPeers = <-closeElection:
			if len(me.otherVoters) > numPeers {
				logger.Error("found more peers than master!")
				os.Exit(1)
			}
		}
	}
	logger.Info("done finding peers")
}