From 71a6659905c0a846593b1a0f1950d5f8d3123b02 Mon Sep 17 00:00:00 2001 From: David Florness Date: Tue, 2 Jun 2020 22:23:05 -0600 Subject: [PATCH] Streams are opening --- main.go | 121 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 97 insertions(+), 24 deletions(-) diff --git a/main.go b/main.go index e513e18..b504709 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,19 @@ package main import ( + "bufio" "context" "fmt" + "io" "sync" "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" + discovery "github.com/libp2p/go-libp2p-discovery" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/rivo/tview" "github.com/whyrusleeping/go-logging" @@ -18,37 +22,51 @@ import ( var ( Logger = log.Logger("tallyard") ProtocolID = protocol.ID("/tallyard/0.0.0") + rendezvousString = "ohea7" + + electionOptions []string + masterID peer.ID + ctx context.Context + h host.Host + kdht *dht.IpfsDHT ) -func handleStream(stream network.Stream) { +func masterStreamHandler(stream network.Stream) { + Logger.Info("got a new stream!") + writer := bufio.NewWriter(stream) + + for _, option := range electionOptions { + writer.WriteString(fmt.Sprintf("%s\n", option)) + } + + writer.Flush() } func createElection() { - var electionOptions []string var form *tview.Form - i := 3 + n := 3 app := tview.NewApplication() form = tview.NewForm(). AddInputField("1.", "", 50, nil, nil). AddInputField("2.", "", 50, nil, nil). AddButton("+", func() { - form.AddInputField(fmt.Sprintf("%d.", i), "", 50, nil, nil) - i++ + form.AddInputField(fmt.Sprintf("%d.", n), "", 50, nil, nil) + n++ }). AddButton("-", func() { // TODO: ensure from joiner that there are at least two // candidates - if i > 3 { - form.RemoveFormItem(i-2) - i-- + if n > 3 { + form.RemoveFormItem(n-2) + n-- } }). AddButton("Done", func() { // TODO: ensure none of the candidates are empty app.Stop() - for j := 0; j < i-1; j++ { + for i := 0; i < n-1; i++ { electionOptions = append(electionOptions, - form.GetFormItem(j).(*tview.InputField).GetText()) + form.GetFormItem(i).(*tview.InputField).GetText()) } }) if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil { @@ -58,33 +76,48 @@ func createElection() { func joinElection() { app := tview.NewApplication() - form := tview.NewForm(). + var form *tview.Form + form = tview.NewForm(). AddInputField("Master peer ID:", "", 50, nil, nil). AddButton("Continue", func() { + var err error app.Stop() - fmt.Println("Ready") + masterID, err = peer.Decode(form.GetFormItem(0).(*tview.InputField).GetText()) + if err != nil { + panic(err) + } + Logger.Info("master ID:", masterID) }) if err := app.SetRoot(form, true).EnableMouse(true).Run(); err != nil { panic(err) } } -func bootstrap(ctx context.Context) { - host, err := libp2p.New(ctx) +func bootstrap() { + var err error + + ctx = context.Background() + + h, err = libp2p.New(ctx, + // libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) { + // var err error + // kdht, err = dht.New(ctx, h) + // return kdht, err + // }), + ) if err != nil { panic(err) } - Logger.Info("host: ", host.ID()) - Logger.Info(host.Addrs()) - - host.SetStreamHandler(ProtocolID, handleStream) - kademlia, err := dht.New(ctx, host) - if err != nil { + kdht, err = dht.New(ctx, h) + Logger.Info("boostrapping the DHT") + if err = kdht.Bootstrap(ctx); err != nil { panic(err) } - fmt.Println(kademlia) + + Logger.Info("host:", h.ID()) + Logger.Info(h.Addrs()) var wg sync.WaitGroup for _, peerAddr := range dht.DefaultBootstrapPeers { @@ -92,7 +125,7 @@ func bootstrap(ctx context.Context) { wg.Add(1) go func() { defer wg.Done() - if err := host.Connect(ctx, *peerInfo); err != nil { + if err := h.Connect(ctx, *peerInfo); err != nil { Logger.Warning(err) } else { Logger.Info("connection established with bootstrap node: ", *peerInfo) @@ -100,6 +133,45 @@ func bootstrap(ctx context.Context) { }() } wg.Wait() + + Logger.Info("announcing ourselves...") + routingDiscovery := discovery.NewRoutingDiscovery(kdht) + discovery.Advertise(ctx, routingDiscovery, rendezvousString) + Logger.Info("successfully announced!") + + if masterID == "" { // we are the master + h.SetStreamHandler(ProtocolID, masterStreamHandler) + } else { // we are a slave + Logger.Info("searching for peers...") + peerChan, err := routingDiscovery.FindPeers(ctx, rendezvousString) + if err != nil { + panic(err) + } + for p := range peerChan { + Logger.Info("found peer:", p) + if p.ID != masterID { + continue + } + Logger.Info("attempting to open stream with peer...") + stream, err := h.NewStream(ctx, masterID, ProtocolID) + if err != nil { + panic(err) + } + Logger.Info("opened stream with master peer") + reader := bufio.NewReader(stream) + for { + str, err := reader.ReadString('\n') + if err == io.EOF { + break + } else if err != nil { + panic(err) + } + fmt.Println(str) + } + stream.Close() + Logger.Info("stream with master peer closed") + } + } } func main() { @@ -121,6 +193,7 @@ func main() { panic(err) } - ctx := context.Background() - bootstrap(ctx) + bootstrap() + + select {} } -- 2.38.4