summaryrefslogtreecommitdiff
path: root/ioreplay/src/replay
diff options
context:
space:
mode:
Diffstat (limited to 'ioreplay/src/replay')
-rw-r--r--ioreplay/src/replay/replay.c191
-rw-r--r--ioreplay/src/replay/replay.h46
-rw-r--r--ioreplay/src/replay/rioop.c425
-rw-r--r--ioreplay/src/replay/rioop.h54
-rw-r--r--ioreplay/src/replay/rprocess.c34
-rw-r--r--ioreplay/src/replay/rprocess.h40
-rw-r--r--ioreplay/src/replay/rstats.c108
-rw-r--r--ioreplay/src/replay/rstats.h117
-rw-r--r--ioreplay/src/replay/rtask.c50
-rw-r--r--ioreplay/src/replay/rtask.h69
-rw-r--r--ioreplay/src/replay/rthread.c216
-rw-r--r--ioreplay/src/replay/rthread.h123
-rw-r--r--ioreplay/src/replay/rworker.c360
-rw-r--r--ioreplay/src/replay/rworker.h82
14 files changed, 1915 insertions, 0 deletions
diff --git a/ioreplay/src/replay/replay.c b/ioreplay/src/replay/replay.c
new file mode 100644
index 0000000..89f5fee
--- /dev/null
+++ b/ioreplay/src/replay/replay.c
@@ -0,0 +1,191 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "replay.h"
+
+#include "../datas/amap.h"
+#include "../meta/meta.h"
+#include "../mounts.h"
+#include "rworker.h"
+#include "rstats.h"
+
+void replay_extract_header(options_s *opts, FILE *replay_fd, long *num_vsizes,
+ long *num_pids, long *num_fds, long *num_lines)
+{
+ meta_s *m = meta_new(replay_fd);
+ meta_read_start(m);
+
+ long version = 0;
+ if (meta_read_l(m, "version", &version)) {
+ Put("Replay version is '%ld'", version);
+ if (version != REPLAY_VERSION) {
+ Error(".replay file of incompatible version, got %x, expected %x",
+ (int)version, REPLAY_VERSION);
+ }
+ }
+
+ char *user;
+ if (meta_read_s(m, "user", &user)) {
+ Put("Setting user to '%s'", user);
+ opts->user = user;
+ }
+
+ char *name;
+ if (meta_read_s(m, "name", &name)) {
+ Put("Setting name to '%s'", name);
+ opts->name = name;
+ }
+
+ if (meta_read_l(m, "num_vsizes", num_vsizes)) {
+ if (*num_vsizes < 0) {
+ Error("Lamport vsize overflow");
+ }
+ Put("Setting num of vsizes to '%ld'", *num_vsizes);
+ }
+
+ if (meta_read_l(m, "num_mapped_pids", num_pids)) {
+ if (*num_pids < 0) {
+ Error("Process overflow (too many process IDs in .replay)");
+ }
+ Put("Setting num of PIDs to '%ld'", *num_pids);
+ }
+
+ if (meta_read_l(m, "num_mapped_fds", num_fds)) {
+ if (*num_fds < 0) {
+ Error("FD overflow (too many FDs in .replay)");
+ }
+ Put("Setting num of FDs to '%ld'", *num_fds);
+ }
+
+ if (meta_read_l(m, "num_lines", num_lines)) {
+ if (*num_fds < 0) {
+ Error("Overflow (too many lines in .replay)");
+ }
+ Put("Setting num of lines to '%ld'", *num_lines);
+ }
+
+ meta_destroy(m);
+}
+
+status_e replay_run(options_s *opts)
+{
+ status_e status = SUCCESS;
+
+ if (opts->drop_caches) {
+ drop_caches();
+ //cache_file(opts->replay_file);
+ }
+
+ // Extract information from the meta header
+ FILE *replay_fd = Fopen(opts->replay_file, "r");
+ long num_vsizes = 0, num_pids = 0, num_fds = 0, num_lines = 0;
+ replay_extract_header(opts, replay_fd, &num_vsizes, &num_pids,
+ &num_fds, &num_lines);
+ fclose(replay_fd);
+
+ // A map of all file descriptors used.
+ Out("Creating FD map...");
+ amap_s *fds_map = NULL;
+ if (opts->num_workers > 1) {
+ fds_map = amap_new_mmapped(num_fds);
+ } else {
+ fds_map = amap_new(num_fds);
+ }
+ Put("done");
+
+ // To collect all individual worker's stats into the global
+ // stats object.
+ stack_s *all_worker_stats = stack_new();
+
+ // The global stats object
+ rstats_s *stats = rstats_new(opts);
+ rstats_start(stats);
+
+ // Fork worker processes, each worker process will read the .replay file
+ // individually.
+
+ if (opts->num_workers > 1) {
+ for (int i = 0; i < opts->num_workers; ++i) {
+ rworker_stats_s *worker_stats = rworker_stats_new_mmap();
+ stack_push(all_worker_stats, worker_stats);
+
+ pid_t pid = fork();
+
+ if (pid == 0) {
+ // One worker object per fork
+ rworker_s *w = rworker_new(i, fds_map, num_vsizes, num_pids, opts,
+ worker_stats);
+
+ // Process the .replay journal line by line
+ status_e status = rworker_process_lines(w, num_lines);
+ Put("worker(%d): Exiting from %d with status %d", i,
+ pid, status);
+ rworker_destroy(w);
+
+ // Exit sub-process
+ exit(status);
+
+ } else if (pid < 0) {
+ Errno("worker(%d): Unable to create worker process! :'-(", i);
+
+ } else {
+ Put("worker(%d): Process with pid %d forked", i, pid);
+ }
+ }
+
+ drop_root(opts->user);
+
+ Put("Waiting for worker processes to finish");
+ pid_t pid;
+ int rworker_status = SUCCESS;
+
+ while ((pid = wait(&rworker_status)) > 0) {
+ if (rworker_status != SUCCESS)
+ status = rworker_status;
+
+ Put("Process with pid %d exited with status %d",
+ pid, rworker_status);
+ }
+
+ Put("All workers finished (%d)!", status);
+
+ } else {
+ Put("Only one worker, don't fork sub-processes");
+
+ rworker_stats_s *worker_stats = rworker_stats_new_mmap();
+ stack_push(all_worker_stats, worker_stats);
+
+ rworker_s *w = rworker_new(0, fds_map, num_vsizes, num_pids,
+ opts, worker_stats);
+ status = rworker_process_lines(w, num_lines);
+ rworker_destroy(w);
+
+ Put("Worker finished work!");
+ }
+
+ // Collect all statistics
+ rstats_stop(stats);
+ while (!stack_is_empty(all_worker_stats)) {
+ rworker_stats_s *worker_stats = stack_pop(all_worker_stats);
+ rstats_add_from_worker(stats, worker_stats);
+ rworker_stats_destroy(worker_stats);
+ }
+ stack_destroy(all_worker_stats);
+
+ rstats_print(stats);
+ rstats_destroy(stats);
+
+ amap_destroy(fds_map);
+ return status;
+}
diff --git a/ioreplay/src/replay/replay.h b/ioreplay/src/replay/replay.h
new file mode 100644
index 0000000..dcc3d84
--- /dev/null
+++ b/ioreplay/src/replay/replay.h
@@ -0,0 +1,46 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef REPLAY_H
+#define REPLAY_H
+
+#include "../defaults.h"
+#include "../utils/futils.h"
+#include "../opcodes.h"
+#include "../options.h"
+#include "rioop.h"
+#include "rprocess.h"
+
+/**
+ * @brief Replays the given .replay file
+ *
+ * @param opts The options object
+ * @return SUCCESS if everything went fine
+ */
+status_e replay_run(options_s *opts);
+
+/**
+ * @brief Extract required meta data from .replay's meta header
+ *
+ * @param opts The options object
+ * @param replay_fd The file handle to the .replay file
+ * @param num_vsizes The amount of virtual sizes/paths
+ * @param num_pids The amount of process IDs
+ * @param num_fds The amount of virtual file descriptors
+ * @param num_lines The amount of .replay lines with I/O ops
+ */
+void replay_extract_header(options_s *opts, FILE *replay_fd, long *num_vsizes,
+ long *num_pids, long *num_fds,long *num_lines);
+
+#endif // REPLAY_H
diff --git a/ioreplay/src/replay/rioop.c b/ioreplay/src/replay/rioop.c
new file mode 100644
index 0000000..2e16c94
--- /dev/null
+++ b/ioreplay/src/replay/rioop.c
@@ -0,0 +1,425 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rioop.h"
+
+#include "../vfd.h"
+#include "rworker.h"
+
+// Printing error messages
+#define _Error(...) \
+ fprintf(stderr, "%s:%d ERROR: ", __FILE__, __LINE__); \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\nlineno:%ld path:%s\n", task->lineno, vfd->path); \
+ fflush(stdout); \
+ fflush(stderr); \
+ exit(ERROR);
+
+#define _Errno(...) \
+ fprintf(stderr, "%s:%d ERROR: %s (%d). ", __FILE__, __LINE__, \
+ strerror(errno), errno); \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, "\nlineno:%ld path:%s\n", task->lineno, vfd->path); \
+ fflush(stdout); \
+ fflush(stderr); \
+ exit(ERROR);
+
+#define _Init_arg(num) int arg = atoi(task->toks[num])
+#define _Init_cmd(num) int cmd = atoi(task->toks[num])
+#define _Init_fd(num) long fd = atol(task->toks[num])
+#define _Init_flags(num) int flags = atoi(task->toks[num])
+//#define _Init_mode(num) int mode = atoi(task->toks[num])
+#define _Init_offset(num) long offset = atol(task->toks[num])
+#define _Init_op(num) int op = atoi(task->toks[num])
+#define _Init_path2(num) char *path2 = task->toks[num]
+#define _Init_path(num) char *path = task->toks[num]
+#define _Init_rc(num) int rc = atoi(task->toks[num])
+#define _Init_whence(num) long whence = atol(task->toks[num])
+
+#define _Init_bytes(num) \
+ int bytes = atoi(task->toks[num]); \
+ if (bytes <= 0) return
+
+#define _Init_virtfd \
+ vfd_s *vfd = amap_get(p->fds_map, fd); \
+ if (vfd == NULL) return
+
+void rioop_run(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_op(2);
+
+ switch (op) {
+ // stat() syscalls
+ case FSTAT:
+ rioop_fstat(p, t, task);
+ break;
+ case FSTATFS:
+ case FSTATFS64:
+ //Error("op(%d) not implemented", op);
+ break;
+ case FSTAT_AT:
+ case LSTAT:
+ case STAT:
+ rioop_stat(p, t, task);
+ break;
+ case STATFS:
+ case STATFS64:
+ //Error("op(%d) not implemented", op);
+ break;
+
+ // read() syscalls
+ case READ:
+ case READV:
+ rioop_read(p, t, task);
+ break;
+ case READAHEAD:
+ //Error("op(%d) not implemented", op);
+ break;
+ case READLINK:
+ case READLINK_AT:
+ //Error("op(%d) not implemented", op);
+ break;
+
+ // write() syscalls
+ case WRITE:
+ case WRITEV:
+ rioop_write(p, t, task);
+ break;
+
+ // open() and other syscalls which may creat
+ case OPEN:
+ case OPEN_AT:
+ rioop_open(p, t, task, -1);
+ break;
+ case CREAT:
+ // A call to crat() is equivalent to calling open() with flags..
+ rioop_open(p, t, task, O_CREAT|O_WRONLY|O_TRUNC);
+ break;
+ case MKDIR:
+ case MKDIR_AT:
+ rioop_mkdir(p, t, task);
+ break;
+
+ // rename() syscalls
+ case RENAME:
+ case RENAME_AT:
+ case RENAME_AT2:
+ rioop_rename(p, t, task);
+ break;
+
+ // close() and unlink() syscalls
+ case CLOSE:
+ rioop_close(p, t, task);
+ break;
+ case UNLINK:
+ case UNLINK_AT:
+ rioop_unlink(p, t, task);
+ break;
+ case RMDIR:
+ rioop_rmdir(p, t, task);
+ break;
+
+ // sync() syscalls
+ case FSYNC:
+ rioop_fsync(p, t, task);
+ break;
+ case FDATASYNC:
+ rioop_fdatasync(p, t, task);
+ break;
+ case SYNC:
+ case SYNCFS:
+ case SYNC_FILE_RANGE:
+ //Error("op(%d) not implemented", op);
+ break;
+
+ // Other syscalls
+ case FCNTL:
+ rioop_fcntl(p, t, task);
+ break;
+ case GETDENTS:
+ rioop_getdents(p, t, task);
+ break;
+ case LSEEK:
+ rioop_lseek(p, t, task);
+ break;
+
+ // chmod() syscalls
+ case CHMOD:
+ rioop_chmod(p, t, task);
+ break;
+ case FCHMOD:
+ rioop_fchmod(p, t, task);
+ break;
+
+ // chown() syscalls
+ case CHOWN:
+ rioop_chown(p, t, task);
+ break;
+ case FCHOWN:
+ case FCHOWNAT:
+ rioop_fchown(p, t, task);
+ break;
+ case LCHOWN:
+ rioop_lchown(p, t, task);
+ break;
+
+ // Meta operations (I/O replay internal use only).
+ case META_EXIT_GROUP:
+ break;
+ case META_TIMELINE:
+ break;
+
+ default:
+ Error("op(%d) not implemented", op);
+ break;
+ }
+}
+
+void rioop_stat(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ struct stat buf;
+ stat(path, &buf);
+}
+
+void rioop_fstat(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+ struct stat buf;
+ fstat(vfd->fd, &buf);
+}
+
+void rioop_rename(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ _Init_path2(4);
+ rename(path, path2);
+}
+
+void rioop_read(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_bytes(4);
+ _Init_virtfd;
+
+ char *buf = Calloc(bytes+1, char);
+ read(vfd->fd, buf, bytes);
+ free(buf);
+}
+
+void rioop_write(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_bytes(4);
+ _Init_virtfd;
+
+ char *buf = Calloc(bytes+1, char);
+ sprintf(buf, "%ld", task->lineno);
+ Fill_with_stuff(buf, bytes);
+ if (vfd->fd == 0) {
+ Debug("%d %d %ld", vfd->fd, vfd->debug, task->lineno);
+ _Error("ERROR");
+ }
+ write(vfd->fd, buf, bytes);
+ free(buf);
+}
+
+void rioop_open(rprocess_s *p, rthread_s *t, rtask_s *task, int flags_)
+{
+ _Init_fd(3);
+ _Init_path(4);
+ _Init_flags(6);
+
+ // Special case as this is creat() now
+ if (flags_ != -1)
+ flags = flags_;
+
+ bool directory = Has(flags, O_DIRECTORY);
+
+ if (fd > 0) {
+ if (directory) {
+ // We can not open a directory via open() otherwise!
+ flags &= (O_RDONLY & ~(O_RDWR|O_WRONLY|O_CREAT));
+ } else {
+ // We don't want to open the file in read only mode.
+ // SystemTap could have skipped syscalls to fcntl or open
+ flags &= ~O_RDONLY;
+ }
+ // flags |= O_DIRECT|O_SYNC;
+ flags &= ~O_EXCL;
+ }
+
+ int ret = open(path, flags, S_IRWXU|S_IRWXG|S_IRWXO);
+
+ if (fd < 0 && ret > 0) {
+ close(ret);
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "TRACE OPEN|open+close|%s|\n", path);
+ fflush(t->rthread_fd);
+#endif
+ }
+
+ if (fd > 0 && ret > 0) {
+ vfd_s *vfd = vfd_new(ret, fd, path);
+ amap_set(p->fds_map, fd, vfd);
+
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "TRACE OPEN|open|%s|\n", path);
+ fflush(t->rthread_fd);
+#endif
+ }
+}
+
+void rioop_close(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+
+ amap_unset(p->fds_map, fd);
+ if (vfd->dirfd) {
+ closedir(vfd->dirfd);
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "TRACE OPEN|closedir|%s|\n", vfd->path);
+ fflush(t->rthread_fd);
+#endif
+ } else {
+ close(vfd->fd);
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "TRACE OPEN|close|%s|\n", vfd->path);
+ fflush(t->rthread_fd);
+#endif
+ }
+ vfd_destroy(vfd);
+}
+
+void rioop_getdents(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+
+ // getdents expects a dirfd
+ DIR *dirfd = fdopendir(vfd->fd);
+ if (dirfd) {
+ vfd->dirfd = dirfd;
+ readdir(dirfd);
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "TRACE OPEN|fdopendir|%s|\n", vfd->path);
+ fflush(t->rthread_fd);
+#endif
+ }
+}
+
+void rioop_mkdir(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ mkdir(path, S_IRWXU|S_IRWXG|S_IRWXO);
+}
+
+void rioop_unlink(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ unlink(path);
+}
+
+void rioop_rmdir(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ rmdir(path);
+}
+
+void rioop_lseek(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_bytes(6);
+ _Init_virtfd;
+ lseek(vfd->fd, bytes, SEEK_SET);
+}
+
+void rioop_fsync(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+ fsync(vfd->fd);
+}
+
+void rioop_fdatasync(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+ fdatasync(vfd->fd);
+}
+
+void rioop_fcntl(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_cmd(4);
+ _Init_arg(5);
+ _Init_virtfd;
+
+ switch (cmd) {
+ case F_GETFD:
+ case F_GETFL:
+ fcntl(vfd->fd, cmd);
+ break;
+ case F_SETFD:
+ case F_SETFL:
+ fcntl(vfd->fd, cmd, arg);
+ break;
+ default:
+ break;
+ }
+}
+
+void rioop_chmod(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ chmod(path, S_IRWXU|S_IRWXG|S_IRWXO);
+}
+
+void rioop_fchmod(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+ fchmod(vfd->fd, S_IRWXU|S_IRWXG|S_IRWXO);
+}
+
+void rioop_chown(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ rworker_s *w = t->worker;
+ options_s *opts = w->opts;
+ struct passwd *pwd = getpwnam(opts->user);
+ chown(path, pwd->pw_uid, -1);
+}
+
+void rioop_fchown(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_fd(3);
+ _Init_virtfd;
+ rworker_s *w = t->worker;
+ options_s *opts = w->opts;
+ struct passwd *pwd = getpwnam(opts->user);
+ fchown(vfd->fd, pwd->pw_uid, -1);
+}
+
+void rioop_lchown(rprocess_s *p, rthread_s *t, rtask_s *task)
+{
+ _Init_path(3);
+ rworker_s *w = t->worker;
+ options_s *opts = w->opts;
+ struct passwd *pwd = getpwnam(opts->user);
+ lchown(path, pwd->pw_uid, -1);
+}
+
diff --git a/ioreplay/src/replay/rioop.h b/ioreplay/src/replay/rioop.h
new file mode 100644
index 0000000..4db4284
--- /dev/null
+++ b/ioreplay/src/replay/rioop.h
@@ -0,0 +1,54 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RIOOP_H
+#define RIOOP_H
+
+#include "../defaults.h"
+#include "../utils/futils.h"
+#include "../opcodes.h"
+#include "rprocess.h"
+#include "rthread.h"
+
+/**
+ * @brief Replays the responsible I/O operation of a given task
+ *
+ * @param p The virtual replay process object
+ * @param t The thread object
+ * @param task The replay task object
+ */
+void rioop_run(rprocess_s *p, rthread_s *t, rtask_s *task);
+
+void rioop_close(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_fcntl(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_fdatasync(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_fstat(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_fsync(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_getdents(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_mkdir(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_open(rprocess_s *p, rthread_s *t, rtask_s *task, int flags_);
+void rioop_read(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_rename(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_stat(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_lseek(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_unlink(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_rmdir(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_write(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_chmod(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_fchmod(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_chown(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_fchown(rprocess_s *p, rthread_s *t, rtask_s *task);
+void rioop_lchown(rprocess_s *p, rthread_s *t, rtask_s *task);
+
+#endif // RIOOP_H
diff --git a/ioreplay/src/replay/rprocess.c b/ioreplay/src/replay/rprocess.c
new file mode 100644
index 0000000..4efd835
--- /dev/null
+++ b/ioreplay/src/replay/rprocess.c
@@ -0,0 +1,34 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rprocess.h"
+
+rprocess_s* rprocess_new(const int pid, amap_s *fds_map)
+{
+ rprocess_s *p = Malloc(rprocess_s);
+
+ p->fds_map = fds_map;
+ p->pid = pid;
+ p->terminate = 0;
+ p->lineno = 0;
+
+ return p;
+}
+
+void rprocess_destroy(rprocess_s *p)
+{
+ if (!p)
+ return;
+ free(p);
+}
diff --git a/ioreplay/src/replay/rprocess.h b/ioreplay/src/replay/rprocess.h
new file mode 100644
index 0000000..739dd89
--- /dev/null
+++ b/ioreplay/src/replay/rprocess.h
@@ -0,0 +1,40 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RPROCESS_H
+#define RPROCESS_H
+
+#include "../datas/hmap.h"
+#include "../datas/amap.h"
+#include "../defaults.h"
+#include "rthread.h"
+
+/**
+ * @brief The virtual replay process object definition
+ *
+ * This defines a virtual process in replay context.
+ */
+typedef struct rprocess_s_ {
+ int terminate; /**< Indicates whether the worker is terminating or not */
+ int rworker_num; /**< The worker number of the responsible worker */
+ int pid; /**< The virtual process ID */
+ unsigned long lineno; /**< Holding the current .replay line number */
+ bool initm; /**< Indicates whether ioreplay is in init mode or not */
+ amap_s *fds_map; /**< Holding all file descriptors */
+} rprocess_s;
+
+rprocess_s* rprocess_new(const int pid, amap_s *fds_map);
+void rprocess_destroy(rprocess_s* p);
+
+#endif // RPROCESS_H
diff --git a/ioreplay/src/replay/rstats.c b/ioreplay/src/replay/rstats.c
new file mode 100644
index 0000000..c3e6e38
--- /dev/null
+++ b/ioreplay/src/replay/rstats.c
@@ -0,0 +1,108 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rstats.h"
+
+#include <sys/types.h>
+
+rstats_s* rstats_new(options_s *opts)
+{
+ rstats_s *s = Malloc(rstats_s);
+
+ s->opts = opts;
+ s->loadavg_high = 0;
+ s->ioops = 0;
+ s->duration = 0;
+ s->time_ahead = -1;
+
+ if (opts->stats_file)
+ s->stats_fd = Fopen(opts->stats_file, "w");
+ else
+ s->stats_fd = stdout;
+
+ return s;
+}
+
+void rstats_destroy(rstats_s *s)
+{
+ if (s->stats_fd != stdout)
+ fclose(s->stats_fd);
+
+ free(s);
+}
+
+rworker_stats_s* rworker_stats_new_mmap(options_s *opts)
+{
+ // Share this object between processes, so that the stats cann be
+ // collected by the master process!
+ rworker_stats_s *s = Mmapshared(rworker_stats_s);
+
+ s->loadavg_high = 0;
+ s->ioops = 0;
+ s->time_ahead = -1;
+
+ return s;
+}
+
+void rworker_stats_destroy(rworker_stats_s *s)
+{
+ munmap(s, sizeof(rworker_stats_s));
+}
+
+
+void rstats_start(rstats_s* s)
+{
+ gettimeofday(&s->start_time, NULL);
+}
+
+void rstats_stop(rstats_s* s)
+{
+ gettimeofday(&s->end_time, NULL);
+ s->duration= ((s->end_time.tv_sec - s->start_time.tv_sec) * 1000
+ + (s->end_time.tv_usec - s->start_time.tv_usec) / 1000) / 1000;
+
+}
+
+void rstats_add_from_worker(rstats_s* s, rworker_stats_s* w)
+{
+ if (s->loadavg_high < w->loadavg_high)
+ s->loadavg_high = w->loadavg_high;
+
+ if (s->time_ahead == -1 || s->time_ahead > w->time_ahead)
+ s->time_ahead = w->time_ahead;
+
+ s->ioops += w->ioops;
+}
+
+void rstats_print(rstats_s* s)
+{
+ options_s *opts = s->opts;
+
+ if (opts->stats_file) {
+ Put("Writing stats to '%s'", opts->stats_file);
+ }
+
+ fprintf(s->stats_fd, "Stats of test '%s':\n", opts->name);
+ fprintf(s->stats_fd, "\tNum workers: %d\n", opts->num_workers);
+ fprintf(s->stats_fd, "\tThreads per worker: %d\n", opts->num_threads_per_worker);
+ fprintf(s->stats_fd, "\tThreads total: %d\n",
+ opts->num_threads_per_worker * opts->num_workers);
+ fprintf(s->stats_fd, "\tHighest loadavg: %.2f\n", s->loadavg_high);
+ fprintf(s->stats_fd, "\tPerformed ioops: %ld\n", s->ioops);
+ if (s->duration > 0)
+ fprintf(s->stats_fd, "\tAverage ioops/s: %.2f\n", s->ioops/s->duration);
+ fprintf(s->stats_fd, "\tTime ahead: %lds\n", s->time_ahead/1000);
+ fprintf(s->stats_fd, "\tTotal time: %.2fs\n", s->duration);
+}
+
diff --git a/ioreplay/src/replay/rstats.h b/ioreplay/src/replay/rstats.h
new file mode 100644
index 0000000..1ce3f27
--- /dev/null
+++ b/ioreplay/src/replay/rstats.h
@@ -0,0 +1,117 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * @file rstats.h
+ * @author Paul Buetow
+ *
+ * @brief For collecting replay stats
+ */
+
+#ifndef RSTATS_H
+#define RSTATS_H
+
+#include "../defaults.h"
+#include "../options.h"
+
+#include <pthread.h>
+
+/**
+ * @brief Definition of the rstats object
+ *
+ * Used to store global statistics.
+ */
+typedef struct rstats_s_ {
+ double loadavg_high; /**< Highest load average */
+ long ioops; /**< Total amount if io operations */
+ double duration; /**< Duration of the test */
+ long time_ahead; /**< Time ahead of the original speed */
+ struct timeval start_time; /**< Start time of the test */
+ struct timeval end_time; /**< End time of the test */
+ options_s *opts; /**< The I/O replay options object */
+ FILE *stats_fd; /**< The file descriptor for writing the stats */
+} rstats_s;
+
+/**
+ * @brief Definition of the per worker stats object
+ *
+ * Used to store per worker process I/O stats
+ */
+typedef struct rworker_stats_s_ {
+ double loadavg_high; /**< Highest amount of io ops per second */
+ long ioops; /**< Total amount if io operations */
+ long time_ahead; /**< Time ahead of the original speed */
+} rworker_stats_s;
+
+/**
+ * @brief Creates a new stats object
+ *
+ * @return The new stats object
+ */
+rstats_s* rstats_new(options_s *opts);
+
+/**
+ * @brief Destroys the stats object
+ *
+ * @param s The stats object
+ */
+void rstats_destroy(rstats_s* s);
+
+/**
+ * @brief Creates a new per worker stats object
+ *
+ * The memory is mapped into shared memory so it can be shared across multiple
+ * processes.
+ *
+ * @return The new stats object
+ */
+rworker_stats_s* rworker_stats_new_mmap();
+
+/**
+ * @brief Destroys the per worker stats object
+ *
+ * @param s The stats object
+ */
+void rworker_stats_destroy(rworker_stats_s* s);
+
+/**
+ * @brief Starts the stats
+ *
+ * @param s The stats object
+ */
+void rstats_start(rstats_s* s);
+
+/**
+ * @brief Finalises the stats
+ *
+ * @param s The stats object
+ */
+void rstats_stop(rstats_s* s);
+
+/**
+ * @brief Prints the stats
+ *
+ * @param s The stats object
+ */
+void rstats_print(rstats_s* s);
+
+/**
+ * @brief Adds per worker stats to the global stats object
+ *
+ * @param s The global stats object
+ * @param w The worker stats object
+ */
+void rstats_add_from_worker(rstats_s* s, rworker_stats_s* w);
+
+#endif
diff --git a/ioreplay/src/replay/rtask.c b/ioreplay/src/replay/rtask.c
new file mode 100644
index 0000000..b1afb92
--- /dev/null
+++ b/ioreplay/src/replay/rtask.c
@@ -0,0 +1,50 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rtask.h"
+
+#include "rthread.h"
+#include "rworker.h"
+
+rtask_s* rtask_new()
+{
+ rtask_s *task = Malloc(rtask_s);
+
+ *task = (rtask_s) {
+ .worker = NULL, .process = NULL
+ };
+ task->line[0] = '\0';
+
+#ifdef THREAD_DEBUG
+ task->clone = NULL;
+#endif
+
+ return task;
+}
+
+void rtask_destroy(rtask_s *task)
+{
+ if (task)
+ free(task);
+}
+
+void rtask_update(rtask_s *task, void *worker, void *process, char *line,
+ const long lineno, const long vsize)
+{
+ task->worker = worker;
+ task->process = process;
+ task->lineno = lineno;
+ task->vsize = vsize;
+ strcpy(task->line, line);
+}
diff --git a/ioreplay/src/replay/rtask.h b/ioreplay/src/replay/rtask.h
new file mode 100644
index 0000000..35c5714
--- /dev/null
+++ b/ioreplay/src/replay/rtask.h
@@ -0,0 +1,69 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RTASK_H
+#define RTASK_H
+
+#include "../defaults.h"
+
+/**
+ * @brief The replay task definition
+ *
+ * The rtask holds all possible variables required to process a particular
+ * .replay line and to replay the corresponding I/O operation.
+ */
+typedef struct rtask_s_ {
+ void *worker; /* The responsible worker object */
+ void *process; /* The responsible process object */
+ unsigned long lineno; /**< The current line number */
+ unsigned long vsize; /**< The vsize */
+ char *toks[MAX_TOKENS+1]; /**< The tokens parsed from the .replay line */
+ char line[MAX_LINE_LEN]; /**< The remaining part of the .replay line */
+#ifdef RTASK_DEBUG
+ char *clone; /**< Used for debug purposes only */
+#endif
+} rtask_s;
+
+/**
+ * @brief Creates a new thread task object
+ *
+ * This function creates a new thread task object. Such a task object is used
+ * by the worker to hand over I/O tasks to the corresponding threads. The
+ * actual I/O work is performed by the threads then.
+ *
+ * @return The new thread task object
+ */
+rtask_s* rtask_new();
+
+/**
+ * @brief Destroys the replay task object
+ *
+ * @param t The thread task object to be destroyed
+ */
+void rtask_destroy(rtask_s* t);
+
+/**
+ * @brief Updates a reused/recycle task object
+ *
+ * @param task The task object to be updated
+ * @param worker The responsibe worker object
+ * @param process The responsible process object
+ * @param line The remaining line of the .replay file
+ * @param lineno The current line number of the .replay file
+ * @param vsize The vsize/path id
+ */
+void rtask_update(rtask_s *task, void *worker, void *process, char *line,
+ const long lineno, const long vsize);
+
+#endif // RTASK_H
diff --git a/ioreplay/src/replay/rthread.c b/ioreplay/src/replay/rthread.c
new file mode 100644
index 0000000..55364ec
--- /dev/null
+++ b/ioreplay/src/replay/rthread.c
@@ -0,0 +1,216 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rthread.h"
+
+#include <sys/types.h>
+
+#include "rworker.h"
+#include "rprocess.h"
+
+#include "rioop.h"
+
+#ifdef THREAD_DEBUG
+/**
+ * @brief For debugging purposes only
+ *
+ * @param t The responsible thread object
+ */
+static void _rthread_init_log(rthread_s *t)
+{
+ rworker_s *w = t->worker;
+ char *rthread_log = Calloc(1024, char);
+ snprintf(rthread_log, 1023, "/tmp/ioreplay/worker%d.thread%ld.debuglog",
+ w->rworker_num, (long)pthread_self());
+
+ ensure_dir_exists("/tmp/ioreplay");
+ t->rthread_fd = Fopen(rthread_log, "a");
+
+ free(rthread_log);
+ fprintf(t->rthread_fd, "%ld: DEBUG: Created thread log\n", t->tid);
+}
+#endif
+
+void rthread_process_task(rthread_s* t, rtask_s *task,
+ pid_t pthread_id)
+{
+ char *next = task->line;
+ rworker_s *w = (rworker_s*) task->worker;
+
+ // Tokenize the remaining elements of the line.
+ int ntoks = 0;
+ char *saveptr;
+ char *tok = strtok_r(next, "|", &saveptr);
+
+ while (tok) {
+ if (ntoks > MAX_TOKENS) {
+ Error("worker(%d) pthread(%d): lineno:%lu, missing newline?",
+ w->rworker_num, pthread_id, task->lineno);
+ }
+ task->toks[ntoks++] = tok;
+ tok = strtok_r(NULL, "|", &saveptr);
+ }
+ // NULL marker (no more token from here)
+ task->toks[ntoks] = NULL;
+
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "%ld(%ld): %s",
+ t->tid, (long)pthread_self(), task->clone);
+ fflush(t->rthread_fd);
+ free(task->clone);
+ task->clone = NULL;
+#endif
+#ifndef NO_RIOOP
+ // Perform the corresponding I/O operation!
+ rioop_run(task->process, t, task);
+#endif
+
+ // Make the task object recyclable/reusable
+ pthread_mutex_lock(&w->task_buffer_mutex);
+ if (!rbuffer_insert(w->task_buffer, task))
+ // We can't recycle the task object if the buffer is full!
+ rtask_destroy(task);
+ pthread_mutex_unlock(&w->task_buffer_mutex);
+}
+
+void *rthread_pthread_start(void *data)
+{
+ rthread_s* t = (rthread_s*) data;
+ rworker_s *w = t->worker;
+ rtask_s *task = NULL;
+ pid_t pthread_id = pthread_self();
+
+#ifdef THREAD_DEBUG
+ _rthread_init_log(t);
+#endif
+
+ do {
+ while (!rbuffer_has_next(t->tasks) && !t->terminate)
+ usleep(100);
+
+ while ((task = rbuffer_get_next(t->tasks)) != NULL)
+ rthread_process_task(t, task, pthread_id);
+
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "%ld: DEBUG: Idling\n", t->tid);
+ fflush(t->rthread_fd);
+#endif
+
+ // Tell rworker_s that thread is not doing any work!
+ int inserted = false;
+ while (!inserted && !t->terminate) {
+ if (rbuffer_has_next(t->tasks))
+ break;
+
+ usleep(1000);
+
+ if (rbuffer_has_next(t->tasks))
+ break;
+
+ // Make the rthread reusable, he is without any tasks
+ // for some time.
+ pthread_mutex_lock(&w->rthread_buffer_mutex);
+ inserted = rbuffer_insert(w->rthread_buffer, t);
+ pthread_mutex_unlock(&w->rthread_buffer_mutex);
+ }
+
+#ifdef THREAD_DEBUG
+ if (inserted) {
+ fprintf(t->rthread_fd, "%ld: DEBUG: Added to thread buffer\n",
+ t->tid);
+ } else {
+ fprintf(t->rthread_fd, "%ld: DEBUG: Idling thread recovered\n",
+ t->tid);
+ }
+ fflush(t->rthread_fd);
+#endif
+
+ } while (!t->terminate);
+
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "%ld: DEBUG: Terminating\n", t->tid);
+ fflush(t->rthread_fd);
+#endif
+
+ // Process the very last tasks
+ while (NULL != (task = rbuffer_get_next(t->tasks)))
+ rthread_process_task(t, task, pthread_id);
+
+#ifdef THREAD_DEBUG
+ fprintf(t->rthread_fd, "%ld: DEBUG: Done terminating\n", t->tid);
+ fflush(t->rthread_fd);
+#endif
+
+ return NULL;
+}
+
+rthread_s* rthread_new(const long tid, void *worker)
+{
+ rthread_s *t = Malloc(rthread_s);
+ rworker_s *w = worker;
+
+ t->single_threaded = w->opts->num_threads_per_worker == 1;
+ t->tasks = rbuffer_new(TASK_BUFFER_PER_THREAD);
+ t->terminate = false;
+ t->worker = worker;
+ rthread_update(t, tid);
+
+ if (t->single_threaded) {
+#ifdef THREAD_DEBUG
+ _rthread_init_log(t);
+#endif
+ return t;
+ }
+
+ start_pthread(&t->pthread, rthread_pthread_start, (void*)t);
+ return t;
+}
+
+long rthread_update(rthread_s *t, const long tid)
+{
+ long prev_tid = t->tid;
+ t->tid = tid;
+
+ return prev_tid;
+}
+
+void rthread_destroy(rthread_s *t)
+{
+ if (rbuffer_has_next(t->tasks)) {
+ Error("Didn't expect to have any tasks left!");
+ }
+ rbuffer_destroy(t->tasks);
+
+#ifdef THREAD_DEBUG
+ if (t->rthread_fd)
+ fclose(t->rthread_fd);
+#endif
+
+ free(t);
+}
+
+bool rthread_insert_task(rthread_s* t, rtask_s* task)
+{
+ if (t->single_threaded) {
+ rthread_process_task(t, task, pthread_self());
+ return true;
+ }
+ return rbuffer_insert(t->tasks, task);
+}
+
+void rthread_terminate(rthread_s* t)
+{
+ t->terminate = true;
+ pthread_join(t->pthread, NULL);
+}
diff --git a/ioreplay/src/replay/rthread.h b/ioreplay/src/replay/rthread.h
new file mode 100644
index 0000000..9971e49
--- /dev/null
+++ b/ioreplay/src/replay/rthread.h
@@ -0,0 +1,123 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * @file rthread.h
+ * @author Paul Buetow
+ *
+ * @brief The replay thread definitiion
+ */
+
+#ifndef RTHREAD_H
+#define RTHREAD_H
+
+#include "../defaults.h"
+#include "../datas/rbuffer.h"
+#include "../datas/amap.h"
+#include "../vfd.h"
+#include "rtask.h"
+
+#include <pthread.h>
+
+/**
+ * @brief Definition of a worker thread
+ *
+ * Every worker utilises a set of worker threads in order to parallelise the
+ * replaying of the I/O! Every thread comes with its own task queue. It is
+ * filled by the repsonsible worker.
+ *
+ * The user can specify the max amount of threads per worker per -t command
+ * line switch.
+ */
+typedef struct rthread_s_ {
+ void *worker; /**< The responsible worker object */
+ long tid; /**< The virtual thread id */
+ rbuffer_s* tasks; /**< Holds all outstanding tasks */
+ bool terminate; /**< True if thread shall terminate */
+ bool single_threaded; /**< Worker is single threaded or not */
+ pthread_t pthread; /**< We run the tasks in concurrent pthreads */
+#ifdef RTHREAD_DEBUG
+ FILE *rthread_fd; /**< Used for debugging purposes only */
+#endif
+} rthread_s;
+
+/**
+ * @brief Creates a new thread object
+ *
+ * @param tid The thread ID
+ * @param worker The worker object managing this thread
+ * @return The new thread object
+ */
+rthread_s* rthread_new(const long tid, void *worker);
+
+/**
+ * @brief Updates a thread object after recycling it
+ *
+ * @param t The thread object
+ * @param tid The new thread ID
+ */
+long rthread_update(rthread_s *t, const long tid);
+
+/**
+ * @brief Terminates the thread
+ *
+ * This function waits (via join) for the pthread to complete all its
+ * current tasks from the queue.
+ *
+ * @param t The thread object
+ */
+void rthread_terminate(rthread_s* t);
+
+/**
+ * @brief Destroys the thread object
+ *
+ * @param t The thread object
+ */
+void rthread_destroy(rthread_s* t);
+
+/**
+ * @brief Inserts a task into the threads work queue
+ *
+ * Inserts a task into the threads work queue. We use an atomic ring buffer
+ * data structure for the work queue. The ring buffer does not require any
+ * mutex locks.
+ *
+ * @param t The thread object
+ * @param task The task to be inserted
+ * @return Returns true on success, returns false if the task queue is full
+ */
+bool rthread_insert_task(rthread_s* t, rtask_s* task);
+
+/**
+ * @brief Used by the pthread to process a task
+ *
+ * In this function the pthread will attempt to process a task. It extracts all
+ * required information from the task object and invokes the corresponding I/O
+ * syscalls.
+ *
+ * @param t The responsible thread object
+ * @param task The task object
+ * @param pthread_id The current pthread id
+ */
+void rthread_process_task(rthread_s* t, rtask_s *task, pid_t pthread_id);
+
+/**
+ * @brief The entry function for the pthreads
+ *
+ * @param data The data structure passed to the pthread
+ * @return The exit code of the pthread.
+ */
+void *rthread_pthread_start(void *data);
+
+#endif // RTHREAD_H
diff --git a/ioreplay/src/replay/rworker.c b/ioreplay/src/replay/rworker.c
new file mode 100644
index 0000000..5a50ada
--- /dev/null
+++ b/ioreplay/src/replay/rworker.c
@@ -0,0 +1,360 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rworker.h"
+
+#include "../datas/stack.h"
+#include "rprocess.h"
+#include "rthread.h"
+
+#define _Compute_current_time(now) \
+ (now.tv_sec - start_time.tv_sec) * 1000 \
+ + (now.tv_usec - start_time.tv_usec) / 1000
+
+
+/**
+ * @brief A callback helper function for destroying all virtual process objects
+ *
+ * @param data The process object.
+ */
+static void _rprocess_destroy_cb(void *data)
+{
+ rprocess_destroy(data);
+}
+
+rworker_s* rworker_new(const int rworker_num, amap_s *fds_map,
+ const long num_vsizes, const long num_pids,
+ options_s *opts, rworker_stats_s *worker_stats)
+{
+ rworker_s *w = Malloc(rworker_s);
+
+#ifdef THREAD_DEBUG
+ char *rworker_log = Calloc(1024, char);
+ snprintf(rworker_log, 1023, "/tmp/ioreplay/_worker%d.debuglog",
+ rworker_num);
+
+ w->rworker_fd = Fopen(rworker_log, "a");
+ free(rworker_log);
+ fprintf(w->rworker_fd, "DEBUG: Started worker\n");
+#endif
+
+ w->rworker_num = rworker_num;
+ w->opts = opts;
+ w->fds_map = fds_map;
+
+ w->rprocess_map = amap_new(num_pids);
+ w->rthread_map = amap_new(num_vsizes);
+ w->task_buffer = rbuffer_new(opts->num_threads_per_worker
+ *TASK_BUFFER_PER_THREAD);
+ w->rthread_buffer = rbuffer_new(opts->num_threads_per_worker);
+ w->worker_stats = worker_stats;
+
+ // Attach a cleanup callback function to the worker map.
+ w->rprocess_map->data_destroy = _rprocess_destroy_cb;
+
+ pthread_mutex_init(&w->rthread_buffer_mutex, NULL);
+ pthread_mutex_init(&w->task_buffer_mutex, NULL);
+
+ // TODO: Check in the program whether the ulimit is high enough
+ // or not! (ulimit -n)
+
+ return w;
+}
+
+/**
+ * @brief Destroys the object
+ *
+ * Destroys the worker object (frees all memory allocated by the worker)
+ *
+ * @param w The worker object
+ */
+void rworker_destroy(rworker_s *w)
+{
+ if (!w)
+ return;
+
+ if (w->rprocess_map)
+ amap_destroy(w->rprocess_map);
+ if (w->rthread_map)
+ amap_destroy(w->rthread_map);
+
+ if (w->task_buffer) {
+ rtask_s *task = NULL;
+ while (NULL != (task = rbuffer_get_next(w->task_buffer)))
+ rtask_destroy(task);
+ rbuffer_destroy(w->task_buffer);
+ }
+
+ if (w->rthread_buffer)
+ rbuffer_destroy(w->rthread_buffer);
+
+ pthread_mutex_destroy(&w->task_buffer_mutex);
+ pthread_mutex_destroy(&w->rthread_buffer_mutex);
+
+#ifdef THREAD_DEBUG
+ if (w->rworker_fd)
+ fclose(w->rworker_fd);
+#endif
+
+ free(w);
+}
+
+status_e rworker_process_lines(rworker_s* w, const long num_lines)
+{
+ Out("worker(%d): Starting to process replay lines\n", w->rworker_num);
+
+ options_s *opts = w->opts;
+ FILE *replay_fd = Fopen(opts->replay_file, "r");
+
+ // Drop root privileges, otherwise we may overwrite other system
+ // files by accident in case of a bug or user error!
+ drop_root(opts->user);
+
+ // Variables required for the time based caluclations
+ struct timeval now, start_time;
+ long current_time = 0, stats_time = 0;
+ gettimeofday(&start_time, NULL);
+
+ // Helper variables required for reading lines
+ char *line = NULL;
+ char *next = NULL, *next2 = NULL;
+ size_t len = 0, read = 0;
+
+ // Helpers required for threading
+ rthread_s *t = NULL;
+ stack_s *all_threads = stack_new();
+ rworker_stats_s *s = w->worker_stats;
+
+ // More helper variables
+ //unsigned long lineno = 0, stats_ioop = 0, vsize_id = 0;
+ unsigned long lineno = 0, vsize_id = 0;
+ long pid = -1, time = -1;
+
+ // Process the .replay file line by line.
+ while ((read = getline(&line, &len, replay_fd)) != -1) {
+ lineno++;
+
+ if (read >= MAX_LINE_LEN) {
+ Error("line:%lu Exceeded max line len", lineno);
+ }
+
+ // If the line begins with #: Ignore that line, it contains
+ // debug or meta information or comments.
+
+ if (line[0] == '#') {
+ if (line[1] == 'I') {
+ // We stop replaying I/O once we reach the line '#INIT'
+ // which incitates the begin of the INIT section.
+ break;
+ }
+ continue;
+ }
+
+#ifdef THREAD_DEBUG
+ char *clone = Clone(line);
+#endif
+
+ next = strchr(line, '|');
+ Error_if(!next, "lineno:%ld Could not parse time from input file",
+ lineno);
+ next[0] = '\0';
+ next++;
+ time = atol(line);
+
+ next2 = strchr(next, '|');
+ Error_if(!next2, "Could not parse vsize_id from input file");
+ next2[0] = '\0';
+ next2++;
+ vsize_id = atol(next);
+
+ // This worker is not responsible for this line, skip it!
+ if ((vsize_id % opts->num_workers) != w->rworker_num) {
+#ifdef THREAD_DEBUG
+ free(clone);
+#endif
+ continue;
+ }
+
+ next = strchr(next2, '|');
+ Error_if(!next, "Could not parse PID from input file");
+ next[0] = '\0';
+ next++;
+ pid = atol(next2);
+
+ gettimeofday(&now, NULL);
+ current_time = _Compute_current_time(now);
+
+ // Check whether the user specified a replay speed factor. If so, we
+ // may need to throttle down a bit.
+
+ if (opts->speed_factor) {
+ s->time_ahead = time / opts->speed_factor - current_time;
+ if (s->time_ahead > 0)
+ usleep(s->time_ahead*1000);
+
+ } else {
+ s->time_ahead = time - current_time;
+ }
+
+ // Get the responsible process object. The process object holds data
+ // structures usually found in a Linux process, e.g. a table of open
+ // file descriptors.
+
+ rprocess_s *p = amap_get(w->rprocess_map, pid);
+ if (p == NULL) {
+ p = rprocess_new(pid, w->fds_map);
+ amap_set(w->rprocess_map, pid, p);
+ }
+ p->lineno = lineno;
+
+ if (opts->num_threads_per_worker == 1) {
+ // Single threaded mode?
+ if (!t)
+ t = rthread_new(vsize_id, w);
+ else
+ rthread_update(t, vsize_id);
+
+ } else {
+ t = amap_get(w->rthread_map, vsize_id);
+ }
+
+ if (t == NULL) {
+
+ // First try to recycle an old (likely unused) thread
+ if (NULL != (t = rbuffer_get_next(w->rthread_buffer))) {
+ rthread_update(t, vsize_id);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n");
+ fflush(w->rworker_fd);
+#endif
+
+ } else if (opts->num_threads_per_worker <= all_threads->size) {
+ // Reached max threads, waiting until one becomes available
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Reached max threads\n");
+ fflush(w->rworker_fd);
+#endif
+ while (NULL == (t = rbuffer_get_next(w->rthread_buffer)))
+ usleep(1000);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n");
+ fflush(w->rworker_fd);
+#endif
+
+ rthread_update(t, vsize_id);
+
+ } else {
+ t = rthread_new(vsize_id, w);
+
+ // We hold a pointer to all created threads in a stack. This
+ // stack is later used to terminate/join all therads.
+ stack_push(all_threads, t);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Created a new thread\n");
+ fflush(w->rworker_fd);
+#endif
+ }
+
+ amap_set(w->rthread_map, vsize_id, t);
+ }
+
+ // Create a new task for the thread. The task contains all required
+ // information to run an I/O operation. However, first try to
+ // reuse/recycle a task object! If there is no such, create a new one.
+
+ rtask_s *task = rbuffer_get_next(w->task_buffer);
+ if (!task)
+ task = rtask_new();
+ rtask_update(task, w, p, next, lineno, vsize_id);
+ s->ioops++;
+
+
+#ifdef THREAD_DEBUG
+ task->clone = clone;
+ fprintf(w->rworker_fd, "DEBUG: Inserting new task\n");
+ fflush(w->rworker_fd);
+#endif
+
+ // Insert that task to a ring buffer to pass it to the pthread without
+ // much synchronisation overhead!
+
+ while (!rthread_insert_task(t, task))
+ // The ring buffer is full. This may happen if the pthread didn't
+ // manage to process tasks fast enough. re-try after a short period!
+ usleep(1000);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Task inserted\n");
+ fflush(w->rworker_fd);
+#endif
+
+ // The worker prints out stats every 3s
+ if (current_time - stats_time > 3000) {
+ // IDEA: Maybe refactor this block to be implemented in rstats.c
+
+ double loadavg = get_loadavg();
+
+ // Determines whether we replay the I/O faster or slower than
+ // original speed!
+ char *a_b = s->time_ahead >= 0 ? "ahead" : "behind";
+
+ Put("worker(%d): threads:%ld %s:%lds progress:%0.2f%% "
+ "loadavg:%0.2f",
+ w->rworker_num, all_threads->size, a_b, Abs(s->time_ahead/1000),
+ Perc(lineno,num_lines), loadavg);
+
+ stats_time = current_time;
+ //stats_ioop = lineno;
+
+ if (s->loadavg_high < loadavg)
+ s->loadavg_high = loadavg;
+ }
+ }
+
+ Put("worker(%d): Waiting for all threads to finish business...",
+ w->rworker_num);
+
+ // This will wait (join) all threads one after another until all threads
+ // have finished their work and have terminated.
+
+ while (!stack_is_empty(all_threads)) {
+ rthread_s *t = stack_pop(all_threads);
+ rthread_terminate(t);
+ rthread_destroy(t);
+ }
+ stack_destroy(all_threads);
+
+ // Collect some stats last time
+ double loadavg = get_loadavg();
+ if (s->loadavg_high < loadavg)
+ s->loadavg_high = loadavg;
+
+ gettimeofday(&now, NULL);
+ current_time = _Compute_current_time(now);
+ if (opts->speed_factor) {
+ s->time_ahead = time / opts->speed_factor - current_time;
+ } else {
+ s->time_ahead = time - current_time;
+ }
+
+
+ Put("worker(%d): All threads terminated!", w->rworker_num);
+ fclose(replay_fd);
+
+ return SUCCESS;
+}
diff --git a/ioreplay/src/replay/rworker.h b/ioreplay/src/replay/rworker.h
new file mode 100644
index 0000000..26a1300
--- /dev/null
+++ b/ioreplay/src/replay/rworker.h
@@ -0,0 +1,82 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef RWORKER_H
+#define RWORKER_H
+
+#include <pthread.h>
+
+#include "../datas/amap.h"
+#include "../datas/rbuffer.h"
+#include "../defaults.h"
+#include "../options.h"
+#include "rstats.h"
+
+/**
+ * @brief Represents a worker process.
+ *
+ * This represents an I/O replay worker process. The user can specify the
+ * amount of worker processes via the -p command line switch. This is not
+ * to confuse with rprocess_s, which represents an original captured process
+ * and we now want to replay the I/O for!
+ */
+typedef struct {
+ int rworker_num; /**< The current worker ID */
+ amap_s* fds_map; /**< Holding all file descriptors */
+ amap_s* rprocess_map; /**< Holding all processes handled by this worker */
+ amap_s* rthread_map; /**< Holding all threads handled by this worker */
+ rbuffer_s *task_buffer; /**< Buffering thread tasks to be reused */
+ pthread_mutex_t task_buffer_mutex; /**< To sync access to task_buffer */
+ rbuffer_s *rthread_buffer; /**< Buffering idle threads to be reused */
+ pthread_mutex_t rthread_buffer_mutex; /**< Sync access to rthread_buffer */
+ options_s *opts; /**< To synchronise access to rthread_buffer */
+ rworker_stats_s *worker_stats; /**< Object holding per worker statistics */
+#ifdef RTHREAD_DEBUG
+ FILE *rworker_fd; /**< For debugging purposes only */
+#endif
+} rworker_s;
+
+/**
+ * @brief Creates a new worker object
+ *
+ * @param rworker_num The worker number
+ * @param fds_map A map of all virtual file descriptor objects
+ * @param num_vsizes The amount of virtual sizes/total file paths of the test
+ * @param num_pids The total amount of virtual process IDs used in this test
+ * @param opts A pointer to the options object
+ * @param worker_stats A pointer to the worker stats object
+
+ * @return The new worker object
+ */
+rworker_s* rworker_new(const int rworker_num, amap_s *fds_map,
+ const long num_vsizes, const long num_pids,
+ options_s* opts, rworker_stats_s *worker_stats);
+
+/**
+ * @brief Destroys a worker object
+ *
+ * @param w The worker object to be destroyed
+ */
+void rworker_destroy(rworker_s* w);
+
+/**
+ * @brief Makes the worker to process all .replay lines
+ *
+ * @param w The responsible worker object
+ * @param num_lines The total amount of I/O op lines in the .replay file
+ * @return SUCCESS if everything went fine
+ */
+status_e rworker_process_lines(rworker_s* w, const long num_lines);
+
+#endif // RWORKER_H