diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2020-07-03 14:13:13 +0100 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2020-08-13 11:37:24 +0100 |
| commit | c5a0ba7d29da7effa0ae18bffa10fc0be359b8e7 (patch) | |
| tree | de4874740a5ddeb6eb29c887f6e121c61a1f8f3c /internal/mapr/funcs | |
| parent | 8f9f9766cecec4a42ffb4d14ba9b7efc2ed204ad (diff) | |
bump up version to 3.0.0. can run continuous background mapreduce queries, useful for log file monitorig for example. breaking protocol change which allows to mapreduce aggreate messages containing the default field separator |. add of more unit tests. add logformat mapreduce query keyword. add set mapreduce clause support and support to evaluate built-in functions such as md5sum() and maskdigits().v3.0.0
Diffstat (limited to 'internal/mapr/funcs')
| -rw-r--r-- | internal/mapr/funcs/function.go | 66 | ||||
| -rw-r--r-- | internal/mapr/funcs/function_test.go | 45 | ||||
| -rw-r--r-- | internal/mapr/funcs/maskdigits.go | 14 | ||||
| -rw-r--r-- | internal/mapr/funcs/md5sum.go | 12 |
4 files changed, 137 insertions, 0 deletions
diff --git a/internal/mapr/funcs/function.go b/internal/mapr/funcs/function.go new file mode 100644 index 0000000..52aaa98 --- /dev/null +++ b/internal/mapr/funcs/function.go @@ -0,0 +1,66 @@ +package funcs + +import ( + "fmt" + "strings" +) + +// CallbackFunc is a function which can be executed by the mapreduce engine +type CallbackFunc func(text string) string + +// Function embeddes the function name to the callback function +type Function struct { + // Name of the callback function + Name string + call CallbackFunc +} + +// FunctionStack is a list of functions stacked each other +type FunctionStack []Function + +// NewFunctionStack parses the input string, e.g. foo(bar("arg")) and returns a corresponding function stack. +func NewFunctionStack(in string) (FunctionStack, string, error) { + var fs FunctionStack + + getCallback := func(name string) (CallbackFunc, error) { + var cb CallbackFunc + + switch name { + case "md5sum": + return Md5Sum, nil + case "maskdigits": + return MaskDigits, nil + default: + return cb, fmt.Errorf("unknown function '%s'", name) + } + } + + aux := in + for strings.HasSuffix(aux, ")") { + index := strings.Index(aux, "(") + if index <= 0 { + return fs, "", fmt.Errorf("unable to parse function '%s' at '%s'", in, aux) + } + name := aux[0:index] + + call, err := getCallback(name) + if err != nil { + return fs, "", err + } + fs = append(fs, Function{name, call}) + aux = aux[index+1 : len(aux)-1] + } + + return fs, aux, nil +} + +// Call the function stack. +func (fs FunctionStack) Call(str string) string { + for i := len(fs) - 1; i >= 0; i-- { + //logger.Debug("Call", fs[i].Name, str) + str = fs[i].call(str) + //logger.Debug("Call.result", fs[i].Name, str) + } + + return str +} diff --git a/internal/mapr/funcs/function_test.go b/internal/mapr/funcs/function_test.go new file mode 100644 index 0000000..415683c --- /dev/null +++ b/internal/mapr/funcs/function_test.go @@ -0,0 +1,45 @@ +package funcs + +import "testing" + +func TestFunction(t *testing.T) { + input := "md5sum($line)" + fs, arg, err := NewFunctionStack(input) + if err != nil { + t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) + } + if arg != "$line" { + t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + } + t.Log(input, fs, arg) + + result := fs.Call(input) + if result != "b38699013d79e50d9d122433753959c1" { + t.Errorf("error executing function stack '%s': expected result 'b38699013d79e50d9d122433753959c1' but got '%s' (%v)\n", input, result, fs) + } + + input = "maskdigits(md5sum(maskdigits($line)))" + fs, arg, err = NewFunctionStack(input) + if err != nil { + t.Errorf("error parsing function input '%s': %s (%v)\n", input, err.Error(), fs) + } + if arg != "$line" { + t.Errorf("error parsing function input '%s': expected argument '$line' but got '%s' (%v)\n", input, arg, fs) + } + t.Log(input, fs, arg) + + result = fs.Call(input) + if result != ".fac.bbe..bb.........d...a.c..b." { + t.Errorf("error executing function stack '%s': expected result '.fac.bbe..bb.........d...a.c..b.' but got '%s' (%v)\n", input, result, fs) + } + + input = "md5sum$line)" + if fs, _, err := NewFunctionStack(input); err == nil { + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + } + + input = "md5sum(makedigits$line))" + if fs, _, err := NewFunctionStack(input); err == nil { + t.Errorf("Expected error parsing function input '%s' (%v) but got no error\n", input, fs) + } +} diff --git a/internal/mapr/funcs/maskdigits.go b/internal/mapr/funcs/maskdigits.go new file mode 100644 index 0000000..d51f3d8 --- /dev/null +++ b/internal/mapr/funcs/maskdigits.go @@ -0,0 +1,14 @@ +package funcs + +// MaskDigits masks all digits (replaces them with .) +func MaskDigits(input string) string { + s := []byte(input) + + for i, b := range s { + if '0' <= b && b <= '9' { + s[i] = '.' + } + } + + return string(s) +} diff --git a/internal/mapr/funcs/md5sum.go b/internal/mapr/funcs/md5sum.go new file mode 100644 index 0000000..e3cc7e6 --- /dev/null +++ b/internal/mapr/funcs/md5sum.go @@ -0,0 +1,12 @@ +package funcs + +import ( + "crypto/md5" + "encoding/hex" +) + +// Md5Sum returns the hex encoded MD5 checksum of a given input string. +func Md5Sum(text string) string { + hash := md5.Sum([]byte(text)) + return hex.EncodeToString(hash[:]) +} |
