~edwargix/tallyard

680a588bc07e80ed688d47b36b425b61cbc4e85c — David Florness 5 years ago 22f4c12
Separate main master and slave logic
1 files changed, 89 insertions(+), 81 deletions(-)

M main.go
M main.go => main.go +89 -81
@@ 52,9 52,7 @@ func NewLocalVoter(hostOpts ...libp2p.Option) *LocalVoter {
	return localVoter
}

func bootstrap(election *Election) {
	localVoter := election.localVoter

func (localVoter *LocalVoter) Bootstrap() {
	var wg sync.WaitGroup
	for _, peerAddr := range dht.DefaultBootstrapPeers {
		peerInfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)


@@ 69,100 67,104 @@ func bootstrap(election *Election) {
		}()
	}
	wg.Wait()
}

	if election.masterID == localVoter.ID() { // we are the master
		fmt.Println("share this with peers:")
		fmt.Printf("%s\n", election.electionKey)
func (election *Election) setupMaster() {
	fmt.Println("share this with peers:")
	fmt.Printf("%s\n", election.electionKey)

		logger.Info("waiting for incoming streams and finding voters...")
	logger.Info("waiting for incoming streams and finding voters...")

		election.Lock()
		ch := make(chan int, 1)
		election.close = ch
		go findPeers(ch, election)
		election.Unlock()
	election.Lock()
	ch := make(chan int, 1)
	election.close = ch
	go findPeers(ch, election)
	election.Unlock()

		localVoter.SetStreamHandler(protocolID, func(stream network.Stream) {
			streamHandler(stream, election)
		})
	localVoter := election.localVoter
	localVoter.SetStreamHandler(protocolID, func(stream network.Stream) {
		streamHandler(stream, election)
	})

		fmt.Println("press ENTER to solidify group of voters and start voting")
		stdReader := bufio.NewReader(os.Stdin)
		_, err := stdReader.ReadString('\n')
		if err != nil {
			panic(err)
		}
	fmt.Println("press ENTER to solidify group of voters and start voting")
	stdReader := bufio.NewReader(os.Stdin)
	_, err := stdReader.ReadString('\n')
	if err != nil {
		panic(err)
	}

		logger.Info("ENTER has been pressed; closing election")
		election.Lock()
		n := len(election.remoteVoters)
		election.close <- n
		close(election.close)
		election.Unlock()
		election.RLock()
		for _, voter := range election.remoteVoters {
			stream, err := localVoter.NewStream(localVoter.ctx, voter.addrInfo.ID, protocolID)
			if err != nil {
				panic(err)
			}
			writer := bufio.NewWriter(stream)
			writer.WriteString(fmt.Sprintf("close\n%d", n))
			writer.Flush()
			stream.Close()
		}
		election.RUnlock()
	} else { // we are a slave
		logger.Info("attempting to open stream with master peer...")
		stream, err := localVoter.NewStream(localVoter.ctx, election.masterID, protocolID)
		rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
	logger.Info("ENTER has been pressed; closing election")
	election.Lock()
	n := len(election.remoteVoters)
	election.close <- n
	close(election.close)
	election.Unlock()
	election.RLock()
	for _, voter := range election.remoteVoters {
		stream, err := localVoter.NewStream(localVoter.ctx, voter.addrInfo.ID, protocolID)
		if err != nil {
			panic(err)
		}
		logger.Info("opened stream with master peer")
		writer := bufio.NewWriter(stream)
		writer.WriteString(fmt.Sprintf("close\n%d", n))
		writer.Flush()
		stream.Close()
	}
	election.RUnlock()
}

		logger.Info("fetching election info from master")
		_, err = rw.WriteString("info")
		if err != nil {
			panic(err)
		}
		rw.Flush()
		stream.Close() // only stops writing
		// first field is the rendezvous string, which is used for peer
		// discovery
func (election *Election) setupSlave() {
	logger.Info("attempting to open stream with master peer...")
	localVoter := election.localVoter
	stream, err := localVoter.NewStream(localVoter.ctx, election.masterID, protocolID)
	rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
	if err != nil {
		panic(err)
	}
	logger.Info("opened stream with master peer")

	logger.Info("fetching election info from master")
	_, err = rw.WriteString("info")
	if err != nil {
		panic(err)
	}
	rw.Flush()
	stream.Close() // only stops writing
	// first field is the rendezvous string, which is used for peer
	// discovery
	str, err := rw.ReadString('\n')
	if err != nil && err != io.EOF {
		panic(err)
	}
	str = stripNewline(str)
	election.rendezvousNonce = Nonce(str)
	// remaining fields are the candidates
	for {
		str, err := rw.ReadString('\n')
		if err != nil && err != io.EOF {
			panic(err)
		}
		str = stripNewline(str)
		election.rendezvousNonce = Nonce(str)
		// remaining fields are the candidates
		for {
			str, err := rw.ReadString('\n')
			if err != nil && err != io.EOF {
				panic(err)
			}
			str = stripNewline(str)
			if str != "" {
				election.Candidates = append(election.Candidates, Candidate(str))
			}
			if err == io.EOF {
				break
			}
		if str != "" {
			election.Candidates = append(election.Candidates, Candidate(str))
		}
		if err == io.EOF {
			break
		}
		logger.Info("done fetching election info")

		logger.Info("checking authenticity of election info...")
		verifyElectionInfo(election, election.merkleRoot)

		// channel used to signify when election is closed
		ch := make(chan int, 1)
		election.close = ch
		// now that we have election info, begin handling streams
		localVoter.SetStreamHandler(protocolID, func(stream network.Stream) {
			streamHandler(stream, election)
		})
		findPeers(ch, election)
	}
	logger.Info("done fetching election info")

	logger.Info("checking authenticity of election info...")
	verifyElectionInfo(election, election.merkleRoot)

	// channel used to signify when election is closed
	ch := make(chan int, 1)
	election.close = ch
	// now that we have election info, begin handling streams
	localVoter.SetStreamHandler(protocolID, func(stream network.Stream) {
		streamHandler(stream, election)
	})
	findPeers(ch, election)
}

func main() {


@@ 206,7 208,13 @@ func main() {
		}
	}

	bootstrap(election)
	election.localVoter.Bootstrap()

	if election.masterID == election.localVoter.ID() {
		election.setupMaster()
	} else {
		election.setupSlave()
	}

	if err := election.saveInfo(); err != nil {
		panic(err)