@@ 1,15 1,19 @@
package main
import (
+ "bufio"
"context"
"fmt"
+ "io"
"sync"
"github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
+ "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
+ discovery "github.com/libp2p/go-libp2p-discovery"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/rivo/tview"
"github.com/whyrusleeping/go-logging"
@@ 18,37 22,51 @@ import (
var (
Logger = log.Logger("tallyard")
ProtocolID = protocol.ID("/tallyard/0.0.0")
+ rendezvousString = "ohea7"
+
+ electionOptions []string
+ masterID peer.ID
+ ctx context.Context
+ h host.Host
+ kdht *dht.IpfsDHT
)
-func handleStream(stream network.Stream) {
+func masterStreamHandler(stream network.Stream) {
+ Logger.Info("got a new stream!")
+ writer := bufio.NewWriter(stream)
+
+ for _, option := range electionOptions {
+ writer.WriteString(fmt.Sprintf("%s\n", option))
+ }
+
+ writer.Flush()
}
func createElection() {
- var electionOptions []string
var form *tview.Form
- i := 3
+ n := 3
app := tview.NewApplication()
form = tview.NewForm().
AddInputField("1.", "", 50, nil, nil).
AddInputField("2.", "", 50, nil, nil).
AddButton("+", func() {
- form.AddInputField(fmt.Sprintf("%d.", i), "", 50, nil, nil)
- i++
+ form.AddInputField(fmt.Sprintf("%d.", n), "", 50, nil, nil)
+ n++
}).
AddButton("-", func() {
// TODO: ensure from joiner that there are at least two
// candidates
- if i > 3 {
- form.RemoveFormItem(i-2)
- i--
+ if n > 3 {
+ form.RemoveFormItem(n-2)
+ n--
}
}).
AddButton("Done", func() {
// TODO: ensure none of the candidates are empty
app.Stop()
- for j := 0; j < i-1; j++ {
+ for i := 0; i < n-1; i++ {
electionOptions = append(electionOptions,
- form.GetFormItem(j).(*tview.InputField).GetText())
+ form.GetFormItem(i).(*tview.InputField).GetText())
}
})
if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
@@ 58,33 76,48 @@ func createElection() {
func joinElection() {
app := tview.NewApplication()
- form := tview.NewForm().
+ var form *tview.Form
+ form = tview.NewForm().
AddInputField("Master peer ID:", "", 50, nil, nil).
AddButton("Continue", func() {
+ var err error
app.Stop()
- fmt.Println("Ready")
+ masterID, err = peer.Decode(form.GetFormItem(0).(*tview.InputField).GetText())
+ if err != nil {
+ panic(err)
+ }
+ Logger.Info("master ID:", masterID)
})
if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil {
panic(err)
}
}
-func bootstrap(ctx context.Context) {
- host, err := libp2p.New(ctx)
+func bootstrap() {
+ var err error
+
+ ctx = context.Background()
+
+ h, err = libp2p.New(ctx,
+ // libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
+ // var err error
+ // kdht, err = dht.New(ctx, h)
+ // return kdht, err
+ // }),
+ )
if err != nil {
panic(err)
}
- Logger.Info("host: ", host.ID())
- Logger.Info(host.Addrs())
-
- host.SetStreamHandler(ProtocolID, handleStream)
- kademlia, err := dht.New(ctx, host)
- if err != nil {
+ kdht, err = dht.New(ctx, h)
+ Logger.Info("boostrapping the DHT")
+ if err = kdht.Bootstrap(ctx); err != nil {
panic(err)
}
- fmt.Println(kademlia)
+
+ Logger.Info("host:", h.ID())
+ Logger.Info(h.Addrs())
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
@@ 92,7 125,7 @@ func bootstrap(ctx context.Context) {
wg.Add(1)
go func() {
defer wg.Done()
- if err := host.Connect(ctx, *peerInfo); err != nil {
+ if err := h.Connect(ctx, *peerInfo); err != nil {
Logger.Warning(err)
} else {
Logger.Info("connection established with bootstrap node: ", *peerInfo)
@@ 100,6 133,45 @@ func bootstrap(ctx context.Context) {
}()
}
wg.Wait()
+
+ Logger.Info("announcing ourselves...")
+ routingDiscovery := discovery.NewRoutingDiscovery(kdht)
+ discovery.Advertise(ctx, routingDiscovery, rendezvousString)
+ Logger.Info("successfully announced!")
+
+ if masterID == "" { // we are the master
+ h.SetStreamHandler(ProtocolID, masterStreamHandler)
+ } else { // we are a slave
+ Logger.Info("searching for peers...")
+ peerChan, err := routingDiscovery.FindPeers(ctx, rendezvousString)
+ if err != nil {
+ panic(err)
+ }
+ for p := range peerChan {
+ Logger.Info("found peer:", p)
+ if p.ID != masterID {
+ continue
+ }
+ Logger.Info("attempting to open stream with peer...")
+ stream, err := h.NewStream(ctx, masterID, ProtocolID)
+ if err != nil {
+ panic(err)
+ }
+ Logger.Info("opened stream with master peer")
+ reader := bufio.NewReader(stream)
+ for {
+ str, err := reader.ReadString('\n')
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ panic(err)
+ }
+ fmt.Println(str)
+ }
+ stream.Close()
+ Logger.Info("stream with master peer closed")
+ }
+ }
}
func main() {
@@ 121,6 193,7 @@ func main() {
panic(err)
}
- ctx := context.Background()
- bootstrap(ctx)
+ bootstrap()
+
+ select {}
}