diff options
Diffstat (limited to 'ioreplay/src/replay')
| -rw-r--r-- | ioreplay/src/replay/replay.c | 191 | ||||
| -rw-r--r-- | ioreplay/src/replay/replay.h | 46 | ||||
| -rw-r--r-- | ioreplay/src/replay/rioop.c | 425 | ||||
| -rw-r--r-- | ioreplay/src/replay/rioop.h | 54 | ||||
| -rw-r--r-- | ioreplay/src/replay/rprocess.c | 34 | ||||
| -rw-r--r-- | ioreplay/src/replay/rprocess.h | 40 | ||||
| -rw-r--r-- | ioreplay/src/replay/rstats.c | 108 | ||||
| -rw-r--r-- | ioreplay/src/replay/rstats.h | 117 | ||||
| -rw-r--r-- | ioreplay/src/replay/rtask.c | 50 | ||||
| -rw-r--r-- | ioreplay/src/replay/rtask.h | 69 | ||||
| -rw-r--r-- | ioreplay/src/replay/rthread.c | 216 | ||||
| -rw-r--r-- | ioreplay/src/replay/rthread.h | 123 | ||||
| -rw-r--r-- | ioreplay/src/replay/rworker.c | 360 | ||||
| -rw-r--r-- | ioreplay/src/replay/rworker.h | 82 |
14 files changed, 1915 insertions, 0 deletions
diff --git a/ioreplay/src/replay/replay.c b/ioreplay/src/replay/replay.c new file mode 100644 index 0000000..89f5fee --- /dev/null +++ b/ioreplay/src/replay/replay.c @@ -0,0 +1,191 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "replay.h" + +#include "../datas/amap.h" +#include "../meta/meta.h" +#include "../mounts.h" +#include "rworker.h" +#include "rstats.h" + +void replay_extract_header(options_s *opts, FILE *replay_fd, long *num_vsizes, + long *num_pids, long *num_fds, long *num_lines) +{ + meta_s *m = meta_new(replay_fd); + meta_read_start(m); + + long version = 0; + if (meta_read_l(m, "version", &version)) { + Put("Replay version is '%ld'", version); + if (version != REPLAY_VERSION) { + Error(".replay file of incompatible version, got %x, expected %x", + (int)version, REPLAY_VERSION); + } + } + + char *user; + if (meta_read_s(m, "user", &user)) { + Put("Setting user to '%s'", user); + opts->user = user; + } + + char *name; + if (meta_read_s(m, "name", &name)) { + Put("Setting name to '%s'", name); + opts->name = name; + } + + if (meta_read_l(m, "num_vsizes", num_vsizes)) { + if (*num_vsizes < 0) { + Error("Lamport vsize overflow"); + } + Put("Setting num of vsizes to '%ld'", *num_vsizes); + } + + if (meta_read_l(m, "num_mapped_pids", num_pids)) { + if (*num_pids < 0) { + Error("Process overflow (too many process IDs in .replay)"); + } + Put("Setting num of PIDs to '%ld'", *num_pids); + } + + if (meta_read_l(m, "num_mapped_fds", num_fds)) { + if (*num_fds < 0) { + Error("FD overflow (too many FDs in .replay)"); + } + Put("Setting num of FDs to '%ld'", *num_fds); + } + + if (meta_read_l(m, "num_lines", num_lines)) { + if (*num_fds < 0) { + Error("Overflow (too many lines in .replay)"); + } + Put("Setting num of lines to '%ld'", *num_lines); + } + + meta_destroy(m); +} + +status_e replay_run(options_s *opts) +{ + status_e status = SUCCESS; + + if (opts->drop_caches) { + drop_caches(); + //cache_file(opts->replay_file); + } + + // Extract information from the meta header + FILE *replay_fd = Fopen(opts->replay_file, "r"); + long num_vsizes = 0, num_pids = 0, num_fds = 0, num_lines = 0; + replay_extract_header(opts, replay_fd, &num_vsizes, &num_pids, + &num_fds, &num_lines); + fclose(replay_fd); + + // A map of all file descriptors used. + Out("Creating FD map..."); + amap_s *fds_map = NULL; + if (opts->num_workers > 1) { + fds_map = amap_new_mmapped(num_fds); + } else { + fds_map = amap_new(num_fds); + } + Put("done"); + + // To collect all individual worker's stats into the global + // stats object. + stack_s *all_worker_stats = stack_new(); + + // The global stats object + rstats_s *stats = rstats_new(opts); + rstats_start(stats); + + // Fork worker processes, each worker process will read the .replay file + // individually. + + if (opts->num_workers > 1) { + for (int i = 0; i < opts->num_workers; ++i) { + rworker_stats_s *worker_stats = rworker_stats_new_mmap(); + stack_push(all_worker_stats, worker_stats); + + pid_t pid = fork(); + + if (pid == 0) { + // One worker object per fork + rworker_s *w = rworker_new(i, fds_map, num_vsizes, num_pids, opts, + worker_stats); + + // Process the .replay journal line by line + status_e status = rworker_process_lines(w, num_lines); + Put("worker(%d): Exiting from %d with status %d", i, + pid, status); + rworker_destroy(w); + + // Exit sub-process + exit(status); + + } else if (pid < 0) { + Errno("worker(%d): Unable to create worker process! :'-(", i); + + } else { + Put("worker(%d): Process with pid %d forked", i, pid); + } + } + + drop_root(opts->user); + + Put("Waiting for worker processes to finish"); + pid_t pid; + int rworker_status = SUCCESS; + + while ((pid = wait(&rworker_status)) > 0) { + if (rworker_status != SUCCESS) + status = rworker_status; + + Put("Process with pid %d exited with status %d", + pid, rworker_status); + } + + Put("All workers finished (%d)!", status); + + } else { + Put("Only one worker, don't fork sub-processes"); + + rworker_stats_s *worker_stats = rworker_stats_new_mmap(); + stack_push(all_worker_stats, worker_stats); + + rworker_s *w = rworker_new(0, fds_map, num_vsizes, num_pids, + opts, worker_stats); + status = rworker_process_lines(w, num_lines); + rworker_destroy(w); + + Put("Worker finished work!"); + } + + // Collect all statistics + rstats_stop(stats); + while (!stack_is_empty(all_worker_stats)) { + rworker_stats_s *worker_stats = stack_pop(all_worker_stats); + rstats_add_from_worker(stats, worker_stats); + rworker_stats_destroy(worker_stats); + } + stack_destroy(all_worker_stats); + + rstats_print(stats); + rstats_destroy(stats); + + amap_destroy(fds_map); + return status; +} diff --git a/ioreplay/src/replay/replay.h b/ioreplay/src/replay/replay.h new file mode 100644 index 0000000..dcc3d84 --- /dev/null +++ b/ioreplay/src/replay/replay.h @@ -0,0 +1,46 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef REPLAY_H +#define REPLAY_H + +#include "../defaults.h" +#include "../utils/futils.h" +#include "../opcodes.h" +#include "../options.h" +#include "rioop.h" +#include "rprocess.h" + +/** + * @brief Replays the given .replay file + * + * @param opts The options object + * @return SUCCESS if everything went fine + */ +status_e replay_run(options_s *opts); + +/** + * @brief Extract required meta data from .replay's meta header + * + * @param opts The options object + * @param replay_fd The file handle to the .replay file + * @param num_vsizes The amount of virtual sizes/paths + * @param num_pids The amount of process IDs + * @param num_fds The amount of virtual file descriptors + * @param num_lines The amount of .replay lines with I/O ops + */ +void replay_extract_header(options_s *opts, FILE *replay_fd, long *num_vsizes, + long *num_pids, long *num_fds,long *num_lines); + +#endif // REPLAY_H diff --git a/ioreplay/src/replay/rioop.c b/ioreplay/src/replay/rioop.c new file mode 100644 index 0000000..2e16c94 --- /dev/null +++ b/ioreplay/src/replay/rioop.c @@ -0,0 +1,425 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rioop.h" + +#include "../vfd.h" +#include "rworker.h" + +// Printing error messages +#define _Error(...) \ + fprintf(stderr, "%s:%d ERROR: ", __FILE__, __LINE__); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\nlineno:%ld path:%s\n", task->lineno, vfd->path); \ + fflush(stdout); \ + fflush(stderr); \ + exit(ERROR); + +#define _Errno(...) \ + fprintf(stderr, "%s:%d ERROR: %s (%d). ", __FILE__, __LINE__, \ + strerror(errno), errno); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, "\nlineno:%ld path:%s\n", task->lineno, vfd->path); \ + fflush(stdout); \ + fflush(stderr); \ + exit(ERROR); + +#define _Init_arg(num) int arg = atoi(task->toks[num]) +#define _Init_cmd(num) int cmd = atoi(task->toks[num]) +#define _Init_fd(num) long fd = atol(task->toks[num]) +#define _Init_flags(num) int flags = atoi(task->toks[num]) +//#define _Init_mode(num) int mode = atoi(task->toks[num]) +#define _Init_offset(num) long offset = atol(task->toks[num]) +#define _Init_op(num) int op = atoi(task->toks[num]) +#define _Init_path2(num) char *path2 = task->toks[num] +#define _Init_path(num) char *path = task->toks[num] +#define _Init_rc(num) int rc = atoi(task->toks[num]) +#define _Init_whence(num) long whence = atol(task->toks[num]) + +#define _Init_bytes(num) \ + int bytes = atoi(task->toks[num]); \ + if (bytes <= 0) return + +#define _Init_virtfd \ + vfd_s *vfd = amap_get(p->fds_map, fd); \ + if (vfd == NULL) return + +void rioop_run(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_op(2); + + switch (op) { + // stat() syscalls + case FSTAT: + rioop_fstat(p, t, task); + break; + case FSTATFS: + case FSTATFS64: + //Error("op(%d) not implemented", op); + break; + case FSTAT_AT: + case LSTAT: + case STAT: + rioop_stat(p, t, task); + break; + case STATFS: + case STATFS64: + //Error("op(%d) not implemented", op); + break; + + // read() syscalls + case READ: + case READV: + rioop_read(p, t, task); + break; + case READAHEAD: + //Error("op(%d) not implemented", op); + break; + case READLINK: + case READLINK_AT: + //Error("op(%d) not implemented", op); + break; + + // write() syscalls + case WRITE: + case WRITEV: + rioop_write(p, t, task); + break; + + // open() and other syscalls which may creat + case OPEN: + case OPEN_AT: + rioop_open(p, t, task, -1); + break; + case CREAT: + // A call to crat() is equivalent to calling open() with flags.. + rioop_open(p, t, task, O_CREAT|O_WRONLY|O_TRUNC); + break; + case MKDIR: + case MKDIR_AT: + rioop_mkdir(p, t, task); + break; + + // rename() syscalls + case RENAME: + case RENAME_AT: + case RENAME_AT2: + rioop_rename(p, t, task); + break; + + // close() and unlink() syscalls + case CLOSE: + rioop_close(p, t, task); + break; + case UNLINK: + case UNLINK_AT: + rioop_unlink(p, t, task); + break; + case RMDIR: + rioop_rmdir(p, t, task); + break; + + // sync() syscalls + case FSYNC: + rioop_fsync(p, t, task); + break; + case FDATASYNC: + rioop_fdatasync(p, t, task); + break; + case SYNC: + case SYNCFS: + case SYNC_FILE_RANGE: + //Error("op(%d) not implemented", op); + break; + + // Other syscalls + case FCNTL: + rioop_fcntl(p, t, task); + break; + case GETDENTS: + rioop_getdents(p, t, task); + break; + case LSEEK: + rioop_lseek(p, t, task); + break; + + // chmod() syscalls + case CHMOD: + rioop_chmod(p, t, task); + break; + case FCHMOD: + rioop_fchmod(p, t, task); + break; + + // chown() syscalls + case CHOWN: + rioop_chown(p, t, task); + break; + case FCHOWN: + case FCHOWNAT: + rioop_fchown(p, t, task); + break; + case LCHOWN: + rioop_lchown(p, t, task); + break; + + // Meta operations (I/O replay internal use only). + case META_EXIT_GROUP: + break; + case META_TIMELINE: + break; + + default: + Error("op(%d) not implemented", op); + break; + } +} + +void rioop_stat(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + struct stat buf; + stat(path, &buf); +} + +void rioop_fstat(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + struct stat buf; + fstat(vfd->fd, &buf); +} + +void rioop_rename(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + _Init_path2(4); + rename(path, path2); +} + +void rioop_read(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_bytes(4); + _Init_virtfd; + + char *buf = Calloc(bytes+1, char); + read(vfd->fd, buf, bytes); + free(buf); +} + +void rioop_write(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_bytes(4); + _Init_virtfd; + + char *buf = Calloc(bytes+1, char); + sprintf(buf, "%ld", task->lineno); + Fill_with_stuff(buf, bytes); + if (vfd->fd == 0) { + Debug("%d %d %ld", vfd->fd, vfd->debug, task->lineno); + _Error("ERROR"); + } + write(vfd->fd, buf, bytes); + free(buf); +} + +void rioop_open(rprocess_s *p, rthread_s *t, rtask_s *task, int flags_) +{ + _Init_fd(3); + _Init_path(4); + _Init_flags(6); + + // Special case as this is creat() now + if (flags_ != -1) + flags = flags_; + + bool directory = Has(flags, O_DIRECTORY); + + if (fd > 0) { + if (directory) { + // We can not open a directory via open() otherwise! + flags &= (O_RDONLY & ~(O_RDWR|O_WRONLY|O_CREAT)); + } else { + // We don't want to open the file in read only mode. + // SystemTap could have skipped syscalls to fcntl or open + flags &= ~O_RDONLY; + } + // flags |= O_DIRECT|O_SYNC; + flags &= ~O_EXCL; + } + + int ret = open(path, flags, S_IRWXU|S_IRWXG|S_IRWXO); + + if (fd < 0 && ret > 0) { + close(ret); +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "TRACE OPEN|open+close|%s|\n", path); + fflush(t->rthread_fd); +#endif + } + + if (fd > 0 && ret > 0) { + vfd_s *vfd = vfd_new(ret, fd, path); + amap_set(p->fds_map, fd, vfd); + +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "TRACE OPEN|open|%s|\n", path); + fflush(t->rthread_fd); +#endif + } +} + +void rioop_close(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + + amap_unset(p->fds_map, fd); + if (vfd->dirfd) { + closedir(vfd->dirfd); +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "TRACE OPEN|closedir|%s|\n", vfd->path); + fflush(t->rthread_fd); +#endif + } else { + close(vfd->fd); +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "TRACE OPEN|close|%s|\n", vfd->path); + fflush(t->rthread_fd); +#endif + } + vfd_destroy(vfd); +} + +void rioop_getdents(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + + // getdents expects a dirfd + DIR *dirfd = fdopendir(vfd->fd); + if (dirfd) { + vfd->dirfd = dirfd; + readdir(dirfd); +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "TRACE OPEN|fdopendir|%s|\n", vfd->path); + fflush(t->rthread_fd); +#endif + } +} + +void rioop_mkdir(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + mkdir(path, S_IRWXU|S_IRWXG|S_IRWXO); +} + +void rioop_unlink(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + unlink(path); +} + +void rioop_rmdir(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + rmdir(path); +} + +void rioop_lseek(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_bytes(6); + _Init_virtfd; + lseek(vfd->fd, bytes, SEEK_SET); +} + +void rioop_fsync(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + fsync(vfd->fd); +} + +void rioop_fdatasync(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + fdatasync(vfd->fd); +} + +void rioop_fcntl(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_cmd(4); + _Init_arg(5); + _Init_virtfd; + + switch (cmd) { + case F_GETFD: + case F_GETFL: + fcntl(vfd->fd, cmd); + break; + case F_SETFD: + case F_SETFL: + fcntl(vfd->fd, cmd, arg); + break; + default: + break; + } +} + +void rioop_chmod(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + chmod(path, S_IRWXU|S_IRWXG|S_IRWXO); +} + +void rioop_fchmod(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + fchmod(vfd->fd, S_IRWXU|S_IRWXG|S_IRWXO); +} + +void rioop_chown(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + rworker_s *w = t->worker; + options_s *opts = w->opts; + struct passwd *pwd = getpwnam(opts->user); + chown(path, pwd->pw_uid, -1); +} + +void rioop_fchown(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_fd(3); + _Init_virtfd; + rworker_s *w = t->worker; + options_s *opts = w->opts; + struct passwd *pwd = getpwnam(opts->user); + fchown(vfd->fd, pwd->pw_uid, -1); +} + +void rioop_lchown(rprocess_s *p, rthread_s *t, rtask_s *task) +{ + _Init_path(3); + rworker_s *w = t->worker; + options_s *opts = w->opts; + struct passwd *pwd = getpwnam(opts->user); + lchown(path, pwd->pw_uid, -1); +} + diff --git a/ioreplay/src/replay/rioop.h b/ioreplay/src/replay/rioop.h new file mode 100644 index 0000000..4db4284 --- /dev/null +++ b/ioreplay/src/replay/rioop.h @@ -0,0 +1,54 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RIOOP_H +#define RIOOP_H + +#include "../defaults.h" +#include "../utils/futils.h" +#include "../opcodes.h" +#include "rprocess.h" +#include "rthread.h" + +/** + * @brief Replays the responsible I/O operation of a given task + * + * @param p The virtual replay process object + * @param t The thread object + * @param task The replay task object + */ +void rioop_run(rprocess_s *p, rthread_s *t, rtask_s *task); + +void rioop_close(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_fcntl(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_fdatasync(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_fstat(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_fsync(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_getdents(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_mkdir(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_open(rprocess_s *p, rthread_s *t, rtask_s *task, int flags_); +void rioop_read(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_rename(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_stat(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_lseek(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_unlink(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_rmdir(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_write(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_chmod(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_fchmod(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_chown(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_fchown(rprocess_s *p, rthread_s *t, rtask_s *task); +void rioop_lchown(rprocess_s *p, rthread_s *t, rtask_s *task); + +#endif // RIOOP_H diff --git a/ioreplay/src/replay/rprocess.c b/ioreplay/src/replay/rprocess.c new file mode 100644 index 0000000..4efd835 --- /dev/null +++ b/ioreplay/src/replay/rprocess.c @@ -0,0 +1,34 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rprocess.h" + +rprocess_s* rprocess_new(const int pid, amap_s *fds_map) +{ + rprocess_s *p = Malloc(rprocess_s); + + p->fds_map = fds_map; + p->pid = pid; + p->terminate = 0; + p->lineno = 0; + + return p; +} + +void rprocess_destroy(rprocess_s *p) +{ + if (!p) + return; + free(p); +} diff --git a/ioreplay/src/replay/rprocess.h b/ioreplay/src/replay/rprocess.h new file mode 100644 index 0000000..739dd89 --- /dev/null +++ b/ioreplay/src/replay/rprocess.h @@ -0,0 +1,40 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RPROCESS_H +#define RPROCESS_H + +#include "../datas/hmap.h" +#include "../datas/amap.h" +#include "../defaults.h" +#include "rthread.h" + +/** + * @brief The virtual replay process object definition + * + * This defines a virtual process in replay context. + */ +typedef struct rprocess_s_ { + int terminate; /**< Indicates whether the worker is terminating or not */ + int rworker_num; /**< The worker number of the responsible worker */ + int pid; /**< The virtual process ID */ + unsigned long lineno; /**< Holding the current .replay line number */ + bool initm; /**< Indicates whether ioreplay is in init mode or not */ + amap_s *fds_map; /**< Holding all file descriptors */ +} rprocess_s; + +rprocess_s* rprocess_new(const int pid, amap_s *fds_map); +void rprocess_destroy(rprocess_s* p); + +#endif // RPROCESS_H diff --git a/ioreplay/src/replay/rstats.c b/ioreplay/src/replay/rstats.c new file mode 100644 index 0000000..c3e6e38 --- /dev/null +++ b/ioreplay/src/replay/rstats.c @@ -0,0 +1,108 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rstats.h" + +#include <sys/types.h> + +rstats_s* rstats_new(options_s *opts) +{ + rstats_s *s = Malloc(rstats_s); + + s->opts = opts; + s->loadavg_high = 0; + s->ioops = 0; + s->duration = 0; + s->time_ahead = -1; + + if (opts->stats_file) + s->stats_fd = Fopen(opts->stats_file, "w"); + else + s->stats_fd = stdout; + + return s; +} + +void rstats_destroy(rstats_s *s) +{ + if (s->stats_fd != stdout) + fclose(s->stats_fd); + + free(s); +} + +rworker_stats_s* rworker_stats_new_mmap(options_s *opts) +{ + // Share this object between processes, so that the stats cann be + // collected by the master process! + rworker_stats_s *s = Mmapshared(rworker_stats_s); + + s->loadavg_high = 0; + s->ioops = 0; + s->time_ahead = -1; + + return s; +} + +void rworker_stats_destroy(rworker_stats_s *s) +{ + munmap(s, sizeof(rworker_stats_s)); +} + + +void rstats_start(rstats_s* s) +{ + gettimeofday(&s->start_time, NULL); +} + +void rstats_stop(rstats_s* s) +{ + gettimeofday(&s->end_time, NULL); + s->duration= ((s->end_time.tv_sec - s->start_time.tv_sec) * 1000 + + (s->end_time.tv_usec - s->start_time.tv_usec) / 1000) / 1000; + +} + +void rstats_add_from_worker(rstats_s* s, rworker_stats_s* w) +{ + if (s->loadavg_high < w->loadavg_high) + s->loadavg_high = w->loadavg_high; + + if (s->time_ahead == -1 || s->time_ahead > w->time_ahead) + s->time_ahead = w->time_ahead; + + s->ioops += w->ioops; +} + +void rstats_print(rstats_s* s) +{ + options_s *opts = s->opts; + + if (opts->stats_file) { + Put("Writing stats to '%s'", opts->stats_file); + } + + fprintf(s->stats_fd, "Stats of test '%s':\n", opts->name); + fprintf(s->stats_fd, "\tNum workers: %d\n", opts->num_workers); + fprintf(s->stats_fd, "\tThreads per worker: %d\n", opts->num_threads_per_worker); + fprintf(s->stats_fd, "\tThreads total: %d\n", + opts->num_threads_per_worker * opts->num_workers); + fprintf(s->stats_fd, "\tHighest loadavg: %.2f\n", s->loadavg_high); + fprintf(s->stats_fd, "\tPerformed ioops: %ld\n", s->ioops); + if (s->duration > 0) + fprintf(s->stats_fd, "\tAverage ioops/s: %.2f\n", s->ioops/s->duration); + fprintf(s->stats_fd, "\tTime ahead: %lds\n", s->time_ahead/1000); + fprintf(s->stats_fd, "\tTotal time: %.2fs\n", s->duration); +} + diff --git a/ioreplay/src/replay/rstats.h b/ioreplay/src/replay/rstats.h new file mode 100644 index 0000000..1ce3f27 --- /dev/null +++ b/ioreplay/src/replay/rstats.h @@ -0,0 +1,117 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file rstats.h + * @author Paul Buetow + * + * @brief For collecting replay stats + */ + +#ifndef RSTATS_H +#define RSTATS_H + +#include "../defaults.h" +#include "../options.h" + +#include <pthread.h> + +/** + * @brief Definition of the rstats object + * + * Used to store global statistics. + */ +typedef struct rstats_s_ { + double loadavg_high; /**< Highest load average */ + long ioops; /**< Total amount if io operations */ + double duration; /**< Duration of the test */ + long time_ahead; /**< Time ahead of the original speed */ + struct timeval start_time; /**< Start time of the test */ + struct timeval end_time; /**< End time of the test */ + options_s *opts; /**< The I/O replay options object */ + FILE *stats_fd; /**< The file descriptor for writing the stats */ +} rstats_s; + +/** + * @brief Definition of the per worker stats object + * + * Used to store per worker process I/O stats + */ +typedef struct rworker_stats_s_ { + double loadavg_high; /**< Highest amount of io ops per second */ + long ioops; /**< Total amount if io operations */ + long time_ahead; /**< Time ahead of the original speed */ +} rworker_stats_s; + +/** + * @brief Creates a new stats object + * + * @return The new stats object + */ +rstats_s* rstats_new(options_s *opts); + +/** + * @brief Destroys the stats object + * + * @param s The stats object + */ +void rstats_destroy(rstats_s* s); + +/** + * @brief Creates a new per worker stats object + * + * The memory is mapped into shared memory so it can be shared across multiple + * processes. + * + * @return The new stats object + */ +rworker_stats_s* rworker_stats_new_mmap(); + +/** + * @brief Destroys the per worker stats object + * + * @param s The stats object + */ +void rworker_stats_destroy(rworker_stats_s* s); + +/** + * @brief Starts the stats + * + * @param s The stats object + */ +void rstats_start(rstats_s* s); + +/** + * @brief Finalises the stats + * + * @param s The stats object + */ +void rstats_stop(rstats_s* s); + +/** + * @brief Prints the stats + * + * @param s The stats object + */ +void rstats_print(rstats_s* s); + +/** + * @brief Adds per worker stats to the global stats object + * + * @param s The global stats object + * @param w The worker stats object + */ +void rstats_add_from_worker(rstats_s* s, rworker_stats_s* w); + +#endif diff --git a/ioreplay/src/replay/rtask.c b/ioreplay/src/replay/rtask.c new file mode 100644 index 0000000..b1afb92 --- /dev/null +++ b/ioreplay/src/replay/rtask.c @@ -0,0 +1,50 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rtask.h" + +#include "rthread.h" +#include "rworker.h" + +rtask_s* rtask_new() +{ + rtask_s *task = Malloc(rtask_s); + + *task = (rtask_s) { + .worker = NULL, .process = NULL + }; + task->line[0] = '\0'; + +#ifdef THREAD_DEBUG + task->clone = NULL; +#endif + + return task; +} + +void rtask_destroy(rtask_s *task) +{ + if (task) + free(task); +} + +void rtask_update(rtask_s *task, void *worker, void *process, char *line, + const long lineno, const long vsize) +{ + task->worker = worker; + task->process = process; + task->lineno = lineno; + task->vsize = vsize; + strcpy(task->line, line); +} diff --git a/ioreplay/src/replay/rtask.h b/ioreplay/src/replay/rtask.h new file mode 100644 index 0000000..35c5714 --- /dev/null +++ b/ioreplay/src/replay/rtask.h @@ -0,0 +1,69 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RTASK_H +#define RTASK_H + +#include "../defaults.h" + +/** + * @brief The replay task definition + * + * The rtask holds all possible variables required to process a particular + * .replay line and to replay the corresponding I/O operation. + */ +typedef struct rtask_s_ { + void *worker; /* The responsible worker object */ + void *process; /* The responsible process object */ + unsigned long lineno; /**< The current line number */ + unsigned long vsize; /**< The vsize */ + char *toks[MAX_TOKENS+1]; /**< The tokens parsed from the .replay line */ + char line[MAX_LINE_LEN]; /**< The remaining part of the .replay line */ +#ifdef RTASK_DEBUG + char *clone; /**< Used for debug purposes only */ +#endif +} rtask_s; + +/** + * @brief Creates a new thread task object + * + * This function creates a new thread task object. Such a task object is used + * by the worker to hand over I/O tasks to the corresponding threads. The + * actual I/O work is performed by the threads then. + * + * @return The new thread task object + */ +rtask_s* rtask_new(); + +/** + * @brief Destroys the replay task object + * + * @param t The thread task object to be destroyed + */ +void rtask_destroy(rtask_s* t); + +/** + * @brief Updates a reused/recycle task object + * + * @param task The task object to be updated + * @param worker The responsibe worker object + * @param process The responsible process object + * @param line The remaining line of the .replay file + * @param lineno The current line number of the .replay file + * @param vsize The vsize/path id + */ +void rtask_update(rtask_s *task, void *worker, void *process, char *line, + const long lineno, const long vsize); + +#endif // RTASK_H diff --git a/ioreplay/src/replay/rthread.c b/ioreplay/src/replay/rthread.c new file mode 100644 index 0000000..55364ec --- /dev/null +++ b/ioreplay/src/replay/rthread.c @@ -0,0 +1,216 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rthread.h" + +#include <sys/types.h> + +#include "rworker.h" +#include "rprocess.h" + +#include "rioop.h" + +#ifdef THREAD_DEBUG +/** + * @brief For debugging purposes only + * + * @param t The responsible thread object + */ +static void _rthread_init_log(rthread_s *t) +{ + rworker_s *w = t->worker; + char *rthread_log = Calloc(1024, char); + snprintf(rthread_log, 1023, "/tmp/ioreplay/worker%d.thread%ld.debuglog", + w->rworker_num, (long)pthread_self()); + + ensure_dir_exists("/tmp/ioreplay"); + t->rthread_fd = Fopen(rthread_log, "a"); + + free(rthread_log); + fprintf(t->rthread_fd, "%ld: DEBUG: Created thread log\n", t->tid); +} +#endif + +void rthread_process_task(rthread_s* t, rtask_s *task, + pid_t pthread_id) +{ + char *next = task->line; + rworker_s *w = (rworker_s*) task->worker; + + // Tokenize the remaining elements of the line. + int ntoks = 0; + char *saveptr; + char *tok = strtok_r(next, "|", &saveptr); + + while (tok) { + if (ntoks > MAX_TOKENS) { + Error("worker(%d) pthread(%d): lineno:%lu, missing newline?", + w->rworker_num, pthread_id, task->lineno); + } + task->toks[ntoks++] = tok; + tok = strtok_r(NULL, "|", &saveptr); + } + // NULL marker (no more token from here) + task->toks[ntoks] = NULL; + +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "%ld(%ld): %s", + t->tid, (long)pthread_self(), task->clone); + fflush(t->rthread_fd); + free(task->clone); + task->clone = NULL; +#endif +#ifndef NO_RIOOP + // Perform the corresponding I/O operation! + rioop_run(task->process, t, task); +#endif + + // Make the task object recyclable/reusable + pthread_mutex_lock(&w->task_buffer_mutex); + if (!rbuffer_insert(w->task_buffer, task)) + // We can't recycle the task object if the buffer is full! + rtask_destroy(task); + pthread_mutex_unlock(&w->task_buffer_mutex); +} + +void *rthread_pthread_start(void *data) +{ + rthread_s* t = (rthread_s*) data; + rworker_s *w = t->worker; + rtask_s *task = NULL; + pid_t pthread_id = pthread_self(); + +#ifdef THREAD_DEBUG + _rthread_init_log(t); +#endif + + do { + while (!rbuffer_has_next(t->tasks) && !t->terminate) + usleep(100); + + while ((task = rbuffer_get_next(t->tasks)) != NULL) + rthread_process_task(t, task, pthread_id); + +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "%ld: DEBUG: Idling\n", t->tid); + fflush(t->rthread_fd); +#endif + + // Tell rworker_s that thread is not doing any work! + int inserted = false; + while (!inserted && !t->terminate) { + if (rbuffer_has_next(t->tasks)) + break; + + usleep(1000); + + if (rbuffer_has_next(t->tasks)) + break; + + // Make the rthread reusable, he is without any tasks + // for some time. + pthread_mutex_lock(&w->rthread_buffer_mutex); + inserted = rbuffer_insert(w->rthread_buffer, t); + pthread_mutex_unlock(&w->rthread_buffer_mutex); + } + +#ifdef THREAD_DEBUG + if (inserted) { + fprintf(t->rthread_fd, "%ld: DEBUG: Added to thread buffer\n", + t->tid); + } else { + fprintf(t->rthread_fd, "%ld: DEBUG: Idling thread recovered\n", + t->tid); + } + fflush(t->rthread_fd); +#endif + + } while (!t->terminate); + +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "%ld: DEBUG: Terminating\n", t->tid); + fflush(t->rthread_fd); +#endif + + // Process the very last tasks + while (NULL != (task = rbuffer_get_next(t->tasks))) + rthread_process_task(t, task, pthread_id); + +#ifdef THREAD_DEBUG + fprintf(t->rthread_fd, "%ld: DEBUG: Done terminating\n", t->tid); + fflush(t->rthread_fd); +#endif + + return NULL; +} + +rthread_s* rthread_new(const long tid, void *worker) +{ + rthread_s *t = Malloc(rthread_s); + rworker_s *w = worker; + + t->single_threaded = w->opts->num_threads_per_worker == 1; + t->tasks = rbuffer_new(TASK_BUFFER_PER_THREAD); + t->terminate = false; + t->worker = worker; + rthread_update(t, tid); + + if (t->single_threaded) { +#ifdef THREAD_DEBUG + _rthread_init_log(t); +#endif + return t; + } + + start_pthread(&t->pthread, rthread_pthread_start, (void*)t); + return t; +} + +long rthread_update(rthread_s *t, const long tid) +{ + long prev_tid = t->tid; + t->tid = tid; + + return prev_tid; +} + +void rthread_destroy(rthread_s *t) +{ + if (rbuffer_has_next(t->tasks)) { + Error("Didn't expect to have any tasks left!"); + } + rbuffer_destroy(t->tasks); + +#ifdef THREAD_DEBUG + if (t->rthread_fd) + fclose(t->rthread_fd); +#endif + + free(t); +} + +bool rthread_insert_task(rthread_s* t, rtask_s* task) +{ + if (t->single_threaded) { + rthread_process_task(t, task, pthread_self()); + return true; + } + return rbuffer_insert(t->tasks, task); +} + +void rthread_terminate(rthread_s* t) +{ + t->terminate = true; + pthread_join(t->pthread, NULL); +} diff --git a/ioreplay/src/replay/rthread.h b/ioreplay/src/replay/rthread.h new file mode 100644 index 0000000..9971e49 --- /dev/null +++ b/ioreplay/src/replay/rthread.h @@ -0,0 +1,123 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file rthread.h + * @author Paul Buetow + * + * @brief The replay thread definitiion + */ + +#ifndef RTHREAD_H +#define RTHREAD_H + +#include "../defaults.h" +#include "../datas/rbuffer.h" +#include "../datas/amap.h" +#include "../vfd.h" +#include "rtask.h" + +#include <pthread.h> + +/** + * @brief Definition of a worker thread + * + * Every worker utilises a set of worker threads in order to parallelise the + * replaying of the I/O! Every thread comes with its own task queue. It is + * filled by the repsonsible worker. + * + * The user can specify the max amount of threads per worker per -t command + * line switch. + */ +typedef struct rthread_s_ { + void *worker; /**< The responsible worker object */ + long tid; /**< The virtual thread id */ + rbuffer_s* tasks; /**< Holds all outstanding tasks */ + bool terminate; /**< True if thread shall terminate */ + bool single_threaded; /**< Worker is single threaded or not */ + pthread_t pthread; /**< We run the tasks in concurrent pthreads */ +#ifdef RTHREAD_DEBUG + FILE *rthread_fd; /**< Used for debugging purposes only */ +#endif +} rthread_s; + +/** + * @brief Creates a new thread object + * + * @param tid The thread ID + * @param worker The worker object managing this thread + * @return The new thread object + */ +rthread_s* rthread_new(const long tid, void *worker); + +/** + * @brief Updates a thread object after recycling it + * + * @param t The thread object + * @param tid The new thread ID + */ +long rthread_update(rthread_s *t, const long tid); + +/** + * @brief Terminates the thread + * + * This function waits (via join) for the pthread to complete all its + * current tasks from the queue. + * + * @param t The thread object + */ +void rthread_terminate(rthread_s* t); + +/** + * @brief Destroys the thread object + * + * @param t The thread object + */ +void rthread_destroy(rthread_s* t); + +/** + * @brief Inserts a task into the threads work queue + * + * Inserts a task into the threads work queue. We use an atomic ring buffer + * data structure for the work queue. The ring buffer does not require any + * mutex locks. + * + * @param t The thread object + * @param task The task to be inserted + * @return Returns true on success, returns false if the task queue is full + */ +bool rthread_insert_task(rthread_s* t, rtask_s* task); + +/** + * @brief Used by the pthread to process a task + * + * In this function the pthread will attempt to process a task. It extracts all + * required information from the task object and invokes the corresponding I/O + * syscalls. + * + * @param t The responsible thread object + * @param task The task object + * @param pthread_id The current pthread id + */ +void rthread_process_task(rthread_s* t, rtask_s *task, pid_t pthread_id); + +/** + * @brief The entry function for the pthreads + * + * @param data The data structure passed to the pthread + * @return The exit code of the pthread. + */ +void *rthread_pthread_start(void *data); + +#endif // RTHREAD_H diff --git a/ioreplay/src/replay/rworker.c b/ioreplay/src/replay/rworker.c new file mode 100644 index 0000000..5a50ada --- /dev/null +++ b/ioreplay/src/replay/rworker.c @@ -0,0 +1,360 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rworker.h" + +#include "../datas/stack.h" +#include "rprocess.h" +#include "rthread.h" + +#define _Compute_current_time(now) \ + (now.tv_sec - start_time.tv_sec) * 1000 \ + + (now.tv_usec - start_time.tv_usec) / 1000 + + +/** + * @brief A callback helper function for destroying all virtual process objects + * + * @param data The process object. + */ +static void _rprocess_destroy_cb(void *data) +{ + rprocess_destroy(data); +} + +rworker_s* rworker_new(const int rworker_num, amap_s *fds_map, + const long num_vsizes, const long num_pids, + options_s *opts, rworker_stats_s *worker_stats) +{ + rworker_s *w = Malloc(rworker_s); + +#ifdef THREAD_DEBUG + char *rworker_log = Calloc(1024, char); + snprintf(rworker_log, 1023, "/tmp/ioreplay/_worker%d.debuglog", + rworker_num); + + w->rworker_fd = Fopen(rworker_log, "a"); + free(rworker_log); + fprintf(w->rworker_fd, "DEBUG: Started worker\n"); +#endif + + w->rworker_num = rworker_num; + w->opts = opts; + w->fds_map = fds_map; + + w->rprocess_map = amap_new(num_pids); + w->rthread_map = amap_new(num_vsizes); + w->task_buffer = rbuffer_new(opts->num_threads_per_worker + *TASK_BUFFER_PER_THREAD); + w->rthread_buffer = rbuffer_new(opts->num_threads_per_worker); + w->worker_stats = worker_stats; + + // Attach a cleanup callback function to the worker map. + w->rprocess_map->data_destroy = _rprocess_destroy_cb; + + pthread_mutex_init(&w->rthread_buffer_mutex, NULL); + pthread_mutex_init(&w->task_buffer_mutex, NULL); + + // TODO: Check in the program whether the ulimit is high enough + // or not! (ulimit -n) + + return w; +} + +/** + * @brief Destroys the object + * + * Destroys the worker object (frees all memory allocated by the worker) + * + * @param w The worker object + */ +void rworker_destroy(rworker_s *w) +{ + if (!w) + return; + + if (w->rprocess_map) + amap_destroy(w->rprocess_map); + if (w->rthread_map) + amap_destroy(w->rthread_map); + + if (w->task_buffer) { + rtask_s *task = NULL; + while (NULL != (task = rbuffer_get_next(w->task_buffer))) + rtask_destroy(task); + rbuffer_destroy(w->task_buffer); + } + + if (w->rthread_buffer) + rbuffer_destroy(w->rthread_buffer); + + pthread_mutex_destroy(&w->task_buffer_mutex); + pthread_mutex_destroy(&w->rthread_buffer_mutex); + +#ifdef THREAD_DEBUG + if (w->rworker_fd) + fclose(w->rworker_fd); +#endif + + free(w); +} + +status_e rworker_process_lines(rworker_s* w, const long num_lines) +{ + Out("worker(%d): Starting to process replay lines\n", w->rworker_num); + + options_s *opts = w->opts; + FILE *replay_fd = Fopen(opts->replay_file, "r"); + + // Drop root privileges, otherwise we may overwrite other system + // files by accident in case of a bug or user error! + drop_root(opts->user); + + // Variables required for the time based caluclations + struct timeval now, start_time; + long current_time = 0, stats_time = 0; + gettimeofday(&start_time, NULL); + + // Helper variables required for reading lines + char *line = NULL; + char *next = NULL, *next2 = NULL; + size_t len = 0, read = 0; + + // Helpers required for threading + rthread_s *t = NULL; + stack_s *all_threads = stack_new(); + rworker_stats_s *s = w->worker_stats; + + // More helper variables + //unsigned long lineno = 0, stats_ioop = 0, vsize_id = 0; + unsigned long lineno = 0, vsize_id = 0; + long pid = -1, time = -1; + + // Process the .replay file line by line. + while ((read = getline(&line, &len, replay_fd)) != -1) { + lineno++; + + if (read >= MAX_LINE_LEN) { + Error("line:%lu Exceeded max line len", lineno); + } + + // If the line begins with #: Ignore that line, it contains + // debug or meta information or comments. + + if (line[0] == '#') { + if (line[1] == 'I') { + // We stop replaying I/O once we reach the line '#INIT' + // which incitates the begin of the INIT section. + break; + } + continue; + } + +#ifdef THREAD_DEBUG + char *clone = Clone(line); +#endif + + next = strchr(line, '|'); + Error_if(!next, "lineno:%ld Could not parse time from input file", + lineno); + next[0] = '\0'; + next++; + time = atol(line); + + next2 = strchr(next, '|'); + Error_if(!next2, "Could not parse vsize_id from input file"); + next2[0] = '\0'; + next2++; + vsize_id = atol(next); + + // This worker is not responsible for this line, skip it! + if ((vsize_id % opts->num_workers) != w->rworker_num) { +#ifdef THREAD_DEBUG + free(clone); +#endif + continue; + } + + next = strchr(next2, '|'); + Error_if(!next, "Could not parse PID from input file"); + next[0] = '\0'; + next++; + pid = atol(next2); + + gettimeofday(&now, NULL); + current_time = _Compute_current_time(now); + + // Check whether the user specified a replay speed factor. If so, we + // may need to throttle down a bit. + + if (opts->speed_factor) { + s->time_ahead = time / opts->speed_factor - current_time; + if (s->time_ahead > 0) + usleep(s->time_ahead*1000); + + } else { + s->time_ahead = time - current_time; + } + + // Get the responsible process object. The process object holds data + // structures usually found in a Linux process, e.g. a table of open + // file descriptors. + + rprocess_s *p = amap_get(w->rprocess_map, pid); + if (p == NULL) { + p = rprocess_new(pid, w->fds_map); + amap_set(w->rprocess_map, pid, p); + } + p->lineno = lineno; + + if (opts->num_threads_per_worker == 1) { + // Single threaded mode? + if (!t) + t = rthread_new(vsize_id, w); + else + rthread_update(t, vsize_id); + + } else { + t = amap_get(w->rthread_map, vsize_id); + } + + if (t == NULL) { + + // First try to recycle an old (likely unused) thread + if (NULL != (t = rbuffer_get_next(w->rthread_buffer))) { + rthread_update(t, vsize_id); + +#ifdef THREAD_DEBUG + fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n"); + fflush(w->rworker_fd); +#endif + + } else if (opts->num_threads_per_worker <= all_threads->size) { + // Reached max threads, waiting until one becomes available + +#ifdef THREAD_DEBUG + fprintf(w->rworker_fd, "DEBUG: Reached max threads\n"); + fflush(w->rworker_fd); +#endif + while (NULL == (t = rbuffer_get_next(w->rthread_buffer))) + usleep(1000); + +#ifdef THREAD_DEBUG + fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n"); + fflush(w->rworker_fd); +#endif + + rthread_update(t, vsize_id); + + } else { + t = rthread_new(vsize_id, w); + + // We hold a pointer to all created threads in a stack. This + // stack is later used to terminate/join all therads. + stack_push(all_threads, t); + +#ifdef THREAD_DEBUG + fprintf(w->rworker_fd, "DEBUG: Created a new thread\n"); + fflush(w->rworker_fd); +#endif + } + + amap_set(w->rthread_map, vsize_id, t); + } + + // Create a new task for the thread. The task contains all required + // information to run an I/O operation. However, first try to + // reuse/recycle a task object! If there is no such, create a new one. + + rtask_s *task = rbuffer_get_next(w->task_buffer); + if (!task) + task = rtask_new(); + rtask_update(task, w, p, next, lineno, vsize_id); + s->ioops++; + + +#ifdef THREAD_DEBUG + task->clone = clone; + fprintf(w->rworker_fd, "DEBUG: Inserting new task\n"); + fflush(w->rworker_fd); +#endif + + // Insert that task to a ring buffer to pass it to the pthread without + // much synchronisation overhead! + + while (!rthread_insert_task(t, task)) + // The ring buffer is full. This may happen if the pthread didn't + // manage to process tasks fast enough. re-try after a short period! + usleep(1000); + +#ifdef THREAD_DEBUG + fprintf(w->rworker_fd, "DEBUG: Task inserted\n"); + fflush(w->rworker_fd); +#endif + + // The worker prints out stats every 3s + if (current_time - stats_time > 3000) { + // IDEA: Maybe refactor this block to be implemented in rstats.c + + double loadavg = get_loadavg(); + + // Determines whether we replay the I/O faster or slower than + // original speed! + char *a_b = s->time_ahead >= 0 ? "ahead" : "behind"; + + Put("worker(%d): threads:%ld %s:%lds progress:%0.2f%% " + "loadavg:%0.2f", + w->rworker_num, all_threads->size, a_b, Abs(s->time_ahead/1000), + Perc(lineno,num_lines), loadavg); + + stats_time = current_time; + //stats_ioop = lineno; + + if (s->loadavg_high < loadavg) + s->loadavg_high = loadavg; + } + } + + Put("worker(%d): Waiting for all threads to finish business...", + w->rworker_num); + + // This will wait (join) all threads one after another until all threads + // have finished their work and have terminated. + + while (!stack_is_empty(all_threads)) { + rthread_s *t = stack_pop(all_threads); + rthread_terminate(t); + rthread_destroy(t); + } + stack_destroy(all_threads); + + // Collect some stats last time + double loadavg = get_loadavg(); + if (s->loadavg_high < loadavg) + s->loadavg_high = loadavg; + + gettimeofday(&now, NULL); + current_time = _Compute_current_time(now); + if (opts->speed_factor) { + s->time_ahead = time / opts->speed_factor - current_time; + } else { + s->time_ahead = time - current_time; + } + + + Put("worker(%d): All threads terminated!", w->rworker_num); + fclose(replay_fd); + + return SUCCESS; +} diff --git a/ioreplay/src/replay/rworker.h b/ioreplay/src/replay/rworker.h new file mode 100644 index 0000000..26a1300 --- /dev/null +++ b/ioreplay/src/replay/rworker.h @@ -0,0 +1,82 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RWORKER_H +#define RWORKER_H + +#include <pthread.h> + +#include "../datas/amap.h" +#include "../datas/rbuffer.h" +#include "../defaults.h" +#include "../options.h" +#include "rstats.h" + +/** + * @brief Represents a worker process. + * + * This represents an I/O replay worker process. The user can specify the + * amount of worker processes via the -p command line switch. This is not + * to confuse with rprocess_s, which represents an original captured process + * and we now want to replay the I/O for! + */ +typedef struct { + int rworker_num; /**< The current worker ID */ + amap_s* fds_map; /**< Holding all file descriptors */ + amap_s* rprocess_map; /**< Holding all processes handled by this worker */ + amap_s* rthread_map; /**< Holding all threads handled by this worker */ + rbuffer_s *task_buffer; /**< Buffering thread tasks to be reused */ + pthread_mutex_t task_buffer_mutex; /**< To sync access to task_buffer */ + rbuffer_s *rthread_buffer; /**< Buffering idle threads to be reused */ + pthread_mutex_t rthread_buffer_mutex; /**< Sync access to rthread_buffer */ + options_s *opts; /**< To synchronise access to rthread_buffer */ + rworker_stats_s *worker_stats; /**< Object holding per worker statistics */ +#ifdef RTHREAD_DEBUG + FILE *rworker_fd; /**< For debugging purposes only */ +#endif +} rworker_s; + +/** + * @brief Creates a new worker object + * + * @param rworker_num The worker number + * @param fds_map A map of all virtual file descriptor objects + * @param num_vsizes The amount of virtual sizes/total file paths of the test + * @param num_pids The total amount of virtual process IDs used in this test + * @param opts A pointer to the options object + * @param worker_stats A pointer to the worker stats object + + * @return The new worker object + */ +rworker_s* rworker_new(const int rworker_num, amap_s *fds_map, + const long num_vsizes, const long num_pids, + options_s* opts, rworker_stats_s *worker_stats); + +/** + * @brief Destroys a worker object + * + * @param w The worker object to be destroyed + */ +void rworker_destroy(rworker_s* w); + +/** + * @brief Makes the worker to process all .replay lines + * + * @param w The responsible worker object + * @param num_lines The total amount of I/O op lines in the .replay file + * @return SUCCESS if everything went fine + */ +status_e rworker_process_lines(rworker_s* w, const long num_lines); + +#endif // RWORKER_H |
