summaryrefslogtreecommitdiff
path: root/internal/mapr/funcs
diff options
context:
space:
mode:
authorPaul Buetow <pbuetow@mimecast.com>2020-07-03 14:13:13 +0100
committerPaul Buetow <pbuetow@mimecast.com>2020-08-13 11:37:24 +0100
commitc5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 (patch)
treede4874740a5ddeb6eb29c887f6e121c61a1f8f3c /internal/mapr/funcs
parent8f9f9766cecec4a42ffb4d14ba9b7efc2ed204ad (diff)
bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().v3.0.0
Diffstat (limited to 'internal/mapr/funcs')
-rw-r--r--internal/mapr/funcs/function.go66
-rw-r--r--internal/mapr/funcs/function_test.go45
-rw-r--r--internal/mapr/funcs/maskdigits.go14
-rw-r--r--internal/mapr/funcs/md5sum.go12
4 files changed, 137 insertions, 0 deletions
diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go
new file mode 100644
index 0000000..52aaa98
--- /dev/null
+++ b/internal/mapr/funcs/function.go
@@ -0,0 +1,66 @@
+package funcs
+
+import (
+ "fmt"
+ "strings"
+)
+
+// CallbackFunc is a function which can be executed by the mapreduce engine
+type CallbackFunc func(text string) string
+
+// Function embeddes the function name to the callback function
+type Function struct {
+ // Name of the callback function
+ Name string
+ call CallbackFunc
+}
+
+// FunctionStack is a list of functions stacked each other
+type FunctionStack []Function
+
+// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack.
+func NewFunctionStack(in string) (FunctionStack, string, error) {
+ var fs FunctionStack
+
+ getCallback := func(name string) (CallbackFunc, error) {
+ var cb CallbackFunc
+
+ switch name {
+ case "md5sum":
+ return Md5Sum, nil
+ case "maskdigits":
+ return MaskDigits, nil
+ default:
+ return cb, fmt.Errorf("unknown function '%s'", name)
+ }
+ }
+
+ aux := in
+ for strings.HasSuffix(aux, ")") {
+ index := strings.Index(aux, "(")
+ if index <= 0 {
+ return fs, "", fmt.Errorf("unable to parse function '%s' at '%s'", in, aux)
+ }
+ name := aux[0:index]
+
+ call, err := getCallback(name)
+ if err != nil {
+ return fs, "", err
+ }
+ fs = append(fs, Function{name, call})
+ aux = aux[index+1 : len(aux)-1]
+ }
+
+ return fs, aux, nil
+}
+
+// Call the function stack.
+func (fs FunctionStack) Call(str string) string {
+ for i := len(fs) - 1; i >= 0; i-- {
+ //logger.Debug("Call", fs[i].Name, str)
+ str = fs[i].call(str)
+ //logger.Debug("Call.result", fs[i].Name, str)
+ }
+
+ return str
+}
diff --git a/internal/mapr/funcs/function_test.go b/internal/mapr/funcs/function_test.go
new file mode 100644
index 0000000..415683c
--- /dev/null
+++ b/internal/mapr/funcs/function_test.go
@@ -0,0 +1,45 @@
+package funcs
+
+import "testing"
+
+func TestFunction(t *testing.T) {
+ input := "md5sum($line)"
+ fs, arg, err := NewFunctionStack(input)
+ if err != nil {
+ t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs)
+ }
+ if arg != "$line" {
+ t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs)
+ }
+ t.Log(input, fs, arg)
+
+ result := fs.Call(input)
+ if result != "b38699013d79e50d9d122433753959c1" {
+ t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs)
+ }
+
+ input = "maskdigits(md5sum(maskdigits($line)))"
+ fs, arg, err = NewFunctionStack(input)
+ if err != nil {
+ t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs)
+ }
+ if arg != "$line" {
+ t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs)
+ }
+ t.Log(input, fs, arg)
+
+ result = fs.Call(input)
+ if result != ".fac.bbe..bb.........d...a.c..b." {
+ t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs)
+ }
+
+ input = "md5sum$line)"
+ if fs, _, err := NewFunctionStack(input); err == nil {
+ t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs)
+ }
+
+ input = "md5sum(makedigits$line))"
+ if fs, _, err := NewFunctionStack(input); err == nil {
+ t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs)
+ }
+}
diff --git a/internal/mapr/funcs/maskdigits.go b/internal/mapr/funcs/maskdigits.go
new file mode 100644
index 0000000..d51f3d8
--- /dev/null
+++ b/internal/mapr/funcs/maskdigits.go
@@ -0,0 +1,14 @@
+package funcs
+
+// MaskDigits masks all digits (replaces them with .)
+func MaskDigits(input string) string {
+ s := []byte(input)
+
+ for i, b := range s {
+ if '0' <= b && b <= '9' {
+ s[i] = '.'
+ }
+ }
+
+ return string(s)
+}
diff --git a/internal/mapr/funcs/md5sum.go b/internal/mapr/funcs/md5sum.go
new file mode 100644
index 0000000..e3cc7e6
--- /dev/null
+++ b/internal/mapr/funcs/md5sum.go
@@ -0,0 +1,12 @@
+package funcs
+
+import (
+ "crypto/md5"
+ "encoding/hex"
+)
+
+// Md5Sum returns the hex encoded MD5 checksum of a given input string.
+func Md5Sum(text string) string {
+ hash := md5.Sum([]byte(text))
+ return hex.EncodeToString(hash[:])
+}