From 0a5b56cec0be16a6a8627ec8548b9e80a243af96 Mon Sep 17 00:00:00 2001 From: Paul Buetow Date: Sat, 10 Feb 2024 19:45:41 +0200 Subject: also read fdEvents in Go --- build.sh | 2 +- main.go | 64 ++++++++++++++++++++++++++++++++++++++++++---------------------- 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/build.sh b/build.sh index f4dd6f3..4777061 100755 --- a/build.sh +++ b/build.sh @@ -13,4 +13,4 @@ export CGO_LDFLAGS="-lelf -lzstd $LIBBPFGO/output/libbpf.a" export GOOS=linux export GOARCH=amd64 -go build -tags netgo -ldflags '-w -extldflags "-static"' -o ioriotng ./main.go +go build -race -tags netgo -ldflags '-w -extldflags "-static"' -o ioriotng ./main.go diff --git a/main.go b/main.go index 3be7aef..48e4f95 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import "C" import ( "bytes" + "context" "encoding/binary" "fmt" "log" @@ -68,7 +69,7 @@ func main() { } defer bpfModule.Close() - if err = resizeMap(bpfModule, "open_event_map", 8192); err != nil { + if err = resizeMap(bpfModule, "open_event_map", 8192*10); err != nil { log.Fatal(err) } @@ -81,13 +82,21 @@ func main() { log.Fatal(err) } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() - if err := listenToEvents[openEvent](bpfModule, "open_event_map"); err != nil { - log.Fatal(err) + for ev := range listenToEvents[openEvent](ctx, bpfModule, "open_event_map") { + log.Println(ev) + } + }() + go func() { + defer wg.Done() + for ev := range listenToEvents[fdEvent](ctx, bpfModule, "fd_event_map") { + log.Println(ev) } }() @@ -99,31 +108,42 @@ func main() { log.Println("Good bye") } -func listenToEvents[T BpfMapper](bpfModule *bpf.Module, mapName string) error { +func listenToEvents[T BpfMapper](ctx context.Context, bpfModule *bpf.Module, mapName string) <-chan T { pollSize := 300 - eventsChannel := make(chan []byte) - lostChannel := make(chan uint64) - pb, err := bpfModule.InitPerfBuf(mapName, eventsChannel, lostChannel, 1) + rawEventsCh := make(chan []byte) + rawLostCh := make(chan uint64) // TODO: Of any use this channel? + eventsCh := make(chan T) + + pb, err := bpfModule.InitPerfBuf(mapName, rawEventsCh, rawLostCh, 1) + pb.Poll(pollSize) if err != nil { - return err + log.Fatal(err) } - defer func() { - pb.Stop() - pb.Close() - }() - pb.Poll(pollSize) - for ev := range eventsChannel { - var e T - if err := binary.Read(bytes.NewReader(ev), binary.LittleEndian, &e); err != nil { - return err + go func() { + defer func() { + pb.Stop() + pb.Close() + close(eventsCh) + }() + for { + select { + case <-ctx.Done(): + return + case lost := <-rawLostCh: + log.Println("Lost", lost, mapName, "events. Consider increasing ring buffer!") + case rawEv := <-rawEventsCh: + var ev T + if err := binary.Read(bytes.NewReader(rawEv), binary.LittleEndian, &ev); err != nil { + log.Fatal(err) + } + eventsCh <- ev + pb.Poll(pollSize) + } } + }() - fmt.Println(e) - pb.Poll(pollSize) - } - - return nil + return eventsCh } func ksymArch() string { -- cgit v1.2.3