summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2023-06-10 12:18:55 +0300
committerPaul Buetow <paul@buetow.org>2023-06-10 12:18:55 +0300
commiteaf6bae9f8b493b01be5182c6285bc20887bd5de (patch)
tree672391ba7d4fa5b16e771c2751bd2add06f0d5d3
parent1488d0a841443fe622e41148dad89298ece1603d (diff)
initial tcp client
-rw-r--r--internal/client/client.go32
-rw-r--r--internal/run.go1
-rw-r--r--internal/server/server.go12
3 files changed, 37 insertions, 8 deletions
diff --git a/internal/client/client.go b/internal/client/client.go
index b444d35..f7ddb12 100644
--- a/internal/client/client.go
+++ b/internal/client/client.go
@@ -3,16 +3,19 @@ package client
import (
"context"
"log"
+ "time"
"codeberg.org/snonux/gorum/internal/config"
)
+const packageStr = "client"
+
func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string) {
- log.Println("Starting client")
+ log.Println(packageStr, "starting")
fanOut := make([]chan []string, len(conf.Nodes))
- for i := 0; i < len(fanOut); i++ {
- fanOut[i] = make(chan []string, 1)
+ for i, node := range conf.Nodes {
+ fanOut[i] = startConnection(ctx, node)
}
go func() {
@@ -27,10 +30,12 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string)
case liveNodes := <-liveNodesCh:
log.Printf("Notifying live nodes %v to all partner nodes", liveNodes)
for _, ch := range fanOut {
+ // First, clear previous element of the channel, if any
select {
case <-ch:
default:
}
+ // Now, update channel with the new live nodes.
ch <- liveNodes
}
case <-ctx.Done():
@@ -39,3 +44,24 @@ func Start(ctx context.Context, conf config.Config, liveNodesCh <-chan []string)
}
}()
}
+
+func startConnection(ctx context.Context, node string) chan []string {
+ ch := make(chan []string, 1)
+
+ go func() {
+ for {
+ log.Println(packageStr, "starting connection", node)
+ if err := tcpClientRun(ctx, node, ch); err != nil {
+ log.Println(packageStr, "not connected to node", node, "anymore:", err)
+ }
+
+ select {
+ case <-time.After(time.Second * 10):
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return ch
+}
diff --git a/internal/run.go b/internal/run.go
index 9b9df1d..b069472 100644
--- a/internal/run.go
+++ b/internal/run.go
@@ -20,5 +20,6 @@ func Run(ctx context.Context, configFile string) error {
server.Start(ctx, conf, quo)
client.Start(ctx, conf, liveNodesCh)
+ <-ctx.Done()
return nil
}
diff --git a/internal/server/server.go b/internal/server/server.go
index 7956703..2a809fe 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -10,12 +10,14 @@ import (
"codeberg.org/snonux/gorum/internal/vote"
)
+const packageStr = "Server"
+
func Start(ctx context.Context, conf config.Config, quo quorum.Quorum) {
go func() {
for {
- log.Println("Starting server")
- if err := run(ctx, conf, quo); err != nil {
- log.Println(err)
+ log.Println(packageStr, "starting")
+ if err := runServer(ctx, conf, quo); err != nil {
+ log.Println(packageStr, err)
}
select {
@@ -27,7 +29,7 @@ func Start(ctx context.Context, conf config.Config, quo quorum.Quorum) {
}()
}
-func run(ctx context.Context, conf config.Config, quo quorum.Quorum) error {
+func runServer(ctx context.Context, conf config.Config, quo quorum.Quorum) error {
serverCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -46,6 +48,6 @@ func run(ctx context.Context, conf config.Config, quo quorum.Quorum) error {
return tcpServerRun(serverCtx, conf, func(message string) string {
ch <- vote.New(conf, message)
- return "ok"
+ return "ok - I received your message, dear client"
})
}