diff options
| author | Paul Buetow <paul@buetow.org> | 2023-06-10 12:18:55 +0300 |
|---|---|---|
| committer | Paul Buetow <paul@buetow.org> | 2023-06-10 12:18:55 +0300 |
| commit | eaf6bae9f8b493b01be5182c6285bc20887bd5de (patch) | |
| tree | 672391ba7d4fa5b16e771c2751bd2add06f0d5d3 | |
| parent | 1488d0a841443fe622e41148dad89298ece1603d (diff) | |
initial tcp client
| -rw-r--r-- | internal/client/client.go | 32 | ||||
| -rw-r--r-- | internal/run.go | 1 | ||||
| -rw-r--r-- | internal/server/server.go | 12 |
3 files changed, 37 insertions, 8 deletions
diff --git a/internal/client/client.go b/internal/client/client.go index b444d35..f7ddb12 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -3,16 +3,19 @@ package client import ( "context" "log" + "time" "codeberg.org/snonux/gorum/internal/config" ) +const packageStr = "client" + func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) { - log.Println("Starting client") + log.Println(packageStr, "starting") fanOut := make([]chan []string, len(conf.Nodes)) - for i := 0; i < len(fanOut); i++ { - fanOut[i] = make(chan []string, 1) + for i, node := range conf.Nodes { + fanOut[i] = startConnection(ctx, node) } go func() { @@ -27,10 +30,12 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) case liveNodes := <-liveNodesCh: log.Printf("Notifying live nodes %v to all partner nodes", liveNodes) for _, ch := range fanOut { + // First, clear previous element of the channel, if any select { case <-ch: default: } + // Now, update channel with the new live nodes. ch <- liveNodes } case <-ctx.Done(): @@ -39,3 +44,24 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) } }() } + +func startConnection(ctx context.Context, node string) chan []string { + ch := make(chan []string, 1) + + go func() { + for { + log.Println(packageStr, "starting connection", node) + if err := tcpClientRun(ctx, node, ch); err != nil { + log.Println(packageStr, "not connected to node", node, "anymore:", err) + } + + select { + case <-time.After(time.Second * 10): + case <-ctx.Done(): + return + } + } + }() + + return ch +} diff --git a/internal/run.go b/internal/run.go index 9b9df1d..b069472 100644 --- a/internal/run.go +++ b/internal/run.go @@ -20,5 +20,6 @@ func Run(ctx context.Context, configFile string) error { server.Start(ctx, conf, quo) client.Start(ctx, conf, liveNodesCh) + <-ctx.Done() return nil } diff --git a/internal/server/server.go b/internal/server/server.go index 7956703..2a809fe 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,12 +10,14 @@ import ( "codeberg.org/snonux/gorum/internal/vote" ) +const packageStr = "Server" + func Start(ctx context.Context, conf config.Config, quo quorum.Quorum) { go func() { for { - log.Println("Starting server") - if err := run(ctx, conf, quo); err != nil { - log.Println(err) + log.Println(packageStr, "starting") + if err := runServer(ctx, conf, quo); err != nil { + log.Println(packageStr, err) } select { @@ -27,7 +29,7 @@ func Start(ctx context.Context, conf config.Config, quo quorum.Quorum) { }() } -func run(ctx context.Context, conf config.Config, quo quorum.Quorum) error { +func runServer(ctx context.Context, conf config.Config, quo quorum.Quorum) error { serverCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -46,6 +48,6 @@ func run(ctx context.Context, conf config.Config, quo quorum.Quorum) error { return tcpServerRun(serverCtx, conf, func(message string) string { ch <- vote.New(conf, message) - return "ok" + return "ok - I received your message, dear client" }) } |
