summaryrefslogtreecommitdiff
path: root/integrationtests
diff options
context:
space:
mode:
authorPaul Buetow <paul@buetow.org>2026-04-10 22:41:02 +0300
committerPaul Buetow <paul@buetow.org>2026-04-10 22:41:02 +0300
commite657c7b5cb5bb9675c7817b34a7333bdf259415c (patch)
treed630c106ddca1e2500ebe47b2cf6a11f44c5bc24 /integrationtests
parent28f6319b77d35c6da6b99ad7e35d0d5602dc2ee6 (diff)
task 80: add runtime-query integration coverage
Diffstat (limited to 'integrationtests')
-rw-r--r--integrationtests/interactive_runtime_query_test.go83
-rw-r--r--integrationtests/runtime_query_compatibility_test.go188
2 files changed, 271 insertions, 0 deletions
diff --git a/integrationtests/interactive_runtime_query_test.go b/integrationtests/interactive_runtime_query_test.go
index b338065..6ca2cef 100644
--- a/integrationtests/interactive_runtime_query_test.go
+++ b/integrationtests/interactive_runtime_query_test.go
@@ -109,6 +109,89 @@ func TestDTailInteractiveReloadReusesSessionAndDropsLateOldMatches(t *testing.T)
}
}
+func TestDTailInteractiveReloadReusesSessionOnImmediateBoundaryAndDropsLateOldMatches(t *testing.T) {
+ skipIfNotIntegrationTest(t)
+
+ testLogger := NewTestLogger("TestDTailInteractiveReloadReusesSessionOnImmediateBoundaryAndDropsLateOldMatches")
+ defer testLogger.WriteLogFile()
+ cleanupTmpFiles(t)
+
+ ctx, cancel := createTestContextWithTimeout(t)
+ ctx = WithTestLogger(ctx, testLogger)
+ defer cancel()
+
+ followFile := "interactive_dtail_reload_immediate.tmp"
+ if err := os.WriteFile(followFile, []byte("ERROR initial\n"), 0600); err != nil {
+ t.Fatalf("unable to create follow file: %v", err)
+ }
+ cleanupFiles(t, followFile, "interactive_dtail_reload_immediate.stdout.tmp")
+
+ port := getUniquePortNumber()
+ serverStdout, serverStderr, _, err := startCommand(ctx, t, "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "debug",
+ "--bindAddress", "localhost",
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Fatalf("start dserver: %v", err)
+ }
+ serverLogs := startProcessOutputCollector(ctx, serverStdout, serverStderr)
+ if err := waitForServerReady(ctx, "localhost", port); err != nil {
+ t.Fatalf("wait for dserver: %v", err)
+ }
+ serverLogs.reset()
+
+ writerDone := make(chan error, 1)
+ go func() {
+ if err := waitForCollectorSubstring(ctx, serverLogs, "Start reading|"+followFile+"|"+followFile); err != nil {
+ writerDone <- err
+ return
+ }
+ writerDone <- appendLinesOnSchedule(ctx, followFile, []interactiveStep{
+ {Delay: 1500 * time.Millisecond, Input: "ERROR late"},
+ {Delay: 1700 * time.Millisecond, Input: "WARN live"},
+ })
+ }()
+
+ clientOutput, err := runInteractivePTYCommand(ctx, []string{
+ "../dtail",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "info",
+ "--servers", fmt.Sprintf("localhost:%d", port),
+ "--files", followFile,
+ "--grep", "ERROR",
+ "--plain",
+ "--trustAllHosts",
+ "--interactive-query",
+ }, []interactiveStep{
+ {Delay: 1200 * time.Millisecond, Input: ":reload --grep WARN"},
+ {Delay: 4 * time.Second, Input: ":quit"},
+ })
+ if err != nil {
+ t.Fatalf("run interactive dtail: %v\noutput:\n%s", err, clientOutput)
+ }
+
+ if err := <-writerDone; err != nil {
+ t.Fatalf("write follow file: %v", err)
+ }
+
+ if !strings.Contains(clientOutput, "WARN live") {
+ t.Fatalf("expected WARN line after reload in output:\n%s", clientOutput)
+ }
+ if strings.Contains(clientOutput, "ERROR late") {
+ t.Fatalf("unexpected stale ERROR line after reload:\n%s", clientOutput)
+ }
+ if !strings.Contains(clientOutput, "reload applied successfully") {
+ t.Fatalf("expected reload success message in output:\n%s", clientOutput)
+ }
+ if countSubstring(serverLogs.snapshot(), "Creating new server handler") != 1 {
+ t.Fatalf("expected one SSH session on the server, logs:\n%s", strings.Join(serverLogs.snapshot(), "\n"))
+ }
+}
+
func TestDGrepInteractiveReloadReusesSessionAfterCompletedRead(t *testing.T) {
skipIfNotIntegrationTest(t)
diff --git a/integrationtests/runtime_query_compatibility_test.go b/integrationtests/runtime_query_compatibility_test.go
new file mode 100644
index 0000000..8e811ad
--- /dev/null
+++ b/integrationtests/runtime_query_compatibility_test.go
@@ -0,0 +1,188 @@
+package integrationtests
+
+import (
+ "bufio"
+ "context"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/mimecast/dtail/internal/protocol"
+ "github.com/mimecast/dtail/internal/user"
+
+ gossh "golang.org/x/crypto/ssh"
+)
+
+func TestDServerProtocolVersionMismatchReportsCompatibilityError(t *testing.T) {
+ skipIfNotIntegrationTest(t)
+
+ testLogger := NewTestLogger("TestDServerProtocolVersionMismatchReportsCompatibilityError")
+ defer testLogger.WriteLogFile()
+ cleanupTmpFiles(t)
+
+ ctx, cancel := createTestContextWithTimeout(t)
+ ctx = WithTestLogger(ctx, testLogger)
+ defer cancel()
+
+ port := getUniquePortNumber()
+ serverStdout, serverStderr, _, err := startCommand(ctx, t, "", "../dserver",
+ "--cfg", "none",
+ "--logger", "stdout",
+ "--logLevel", "error",
+ "--bindAddress", "localhost",
+ "--port", fmt.Sprintf("%d", port),
+ )
+ if err != nil {
+ t.Fatalf("start dserver: %v", err)
+ }
+ _ = startProcessOutputCollector(ctx, serverStdout, serverStderr)
+ if err := waitForServerReady(ctx, "localhost", port); err != nil {
+ t.Fatalf("wait for dserver: %v", err)
+ }
+
+ client, session, stdin, lines, err := openSSHSession(ctx, t, fmt.Sprintf("localhost:%d", port))
+ if err != nil {
+ t.Fatalf("open ssh session: %v", err)
+ }
+ defer client.Close()
+ defer session.Close()
+
+ rawCommand := "protocol 3 base64 " + base64.StdEncoding.EncodeToString([]byte("tail: . /tmp/ignored .")) + ";"
+ if _, err := io.WriteString(stdin, rawCommand); err != nil {
+ t.Fatalf("write protocol mismatch command: %v", err)
+ }
+
+ output, ok := waitForSSHOutputContains(ctx, session, lines,
+ "the DTail server protocol version '"+protocol.ProtocolCompat+"' does not match")
+ if !ok {
+ t.Fatalf("expected protocol mismatch error in SSH output:\n%s", output)
+ }
+ if !strings.Contains(output, "please update DTail server") {
+ t.Fatalf("expected mixed-version compatibility guidance in output:\n%s", output)
+ }
+}
+
+func openSSHSession(ctx context.Context, t *testing.T, address string) (*gossh.Client, *gossh.Session, io.WriteCloser, <-chan string, error) {
+ t.Helper()
+
+ signer, err := loadTestSigner()
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ clientConfig := &gossh.ClientConfig{
+ User: user.Name(),
+ Auth: []gossh.AuthMethod{gossh.PublicKeys(signer)},
+ HostKeyCallback: gossh.InsecureIgnoreHostKey(),
+ Timeout: 5 * time.Second,
+ }
+
+ conn, err := gossh.Dial("tcp", address, clientConfig)
+ if err != nil {
+ return nil, nil, nil, nil, err
+ }
+
+ session, err := conn.NewSession()
+ if err != nil {
+ conn.Close()
+ return nil, nil, nil, nil, err
+ }
+
+ stdin, err := session.StdinPipe()
+ if err != nil {
+ session.Close()
+ conn.Close()
+ return nil, nil, nil, nil, err
+ }
+ stdout, err := session.StdoutPipe()
+ if err != nil {
+ session.Close()
+ conn.Close()
+ return nil, nil, nil, nil, err
+ }
+ stderr, err := session.StderrPipe()
+ if err != nil {
+ session.Close()
+ conn.Close()
+ return nil, nil, nil, nil, err
+ }
+ if err := session.Shell(); err != nil {
+ session.Close()
+ conn.Close()
+ return nil, nil, nil, nil, err
+ }
+
+ lines := make(chan string, 32)
+ go func() {
+ defer close(lines)
+ scanner := bufio.NewScanner(stdout)
+ for scanner.Scan() {
+ select {
+ case lines <- scanner.Text():
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ go func() {
+ scanner := bufio.NewScanner(stderr)
+ for scanner.Scan() {
+ select {
+ case lines <- scanner.Text():
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+
+ return conn, session, stdin, lines, nil
+}
+
+func loadTestSigner() (gossh.Signer, error) {
+ for _, path := range []string{"id_rsa", "../id_rsa"} {
+ keyBytes, err := os.ReadFile(path)
+ if err != nil {
+ continue
+ }
+
+ signer, err := gossh.ParsePrivateKey(keyBytes)
+ if err == nil {
+ return signer, nil
+ }
+ }
+
+ return nil, fmt.Errorf("unable to load test ssh private key from id_rsa")
+}
+
+func waitForSSHOutputContains(ctx context.Context, session *gossh.Session, lines <-chan string, needle string) (string, bool) {
+ var output strings.Builder
+ timeout := time.NewTimer(5 * time.Second)
+ defer timeout.Stop()
+
+ for {
+ select {
+ case line, ok := <-lines:
+ if !ok {
+ return output.String(), strings.Contains(output.String(), needle)
+ }
+ if output.Len() > 0 {
+ output.WriteByte('\n')
+ }
+ output.WriteString(line)
+ if strings.Contains(output.String(), needle) {
+ _ = session.Close()
+ return output.String(), true
+ }
+ case <-timeout.C:
+ _ = session.Close()
+ return output.String(), false
+ case <-ctx.Done():
+ _ = session.Close()
+ return output.String(), false
+ }
+ }
+}