diff options
| author | Paul Buetow <pbuetow@mimecast.com> | 2018-03-06 17:38:59 +0000 |
|---|---|---|
| committer | Paul Buetow <pbuetow@mimecast.com> | 2018-03-06 17:38:59 +0000 |
| commit | 26b3b3e368a79ce29df732ea04e72a4c002ae2ce (patch) | |
| tree | e3fc8d7461ab371279f7bf9c692096cd39cc92f6 /ioreplay/src/replay | |
| parent | ae2221660f9b411fa78cdf8034f0803e9a870cde (diff) | |
rename into ioriot
Diffstat (limited to 'ioreplay/src/replay')
| -rw-r--r-- | ioreplay/src/replay/replay.c | 191 | ||||
| -rw-r--r-- | ioreplay/src/replay/replay.h | 46 | ||||
| -rw-r--r-- | ioreplay/src/replay/rioop.c | 425 | ||||
| -rw-r--r-- | ioreplay/src/replay/rioop.h | 54 | ||||
| -rw-r--r-- | ioreplay/src/replay/rprocess.c | 34 | ||||
| -rw-r--r-- | ioreplay/src/replay/rprocess.h | 40 | ||||
| -rw-r--r-- | ioreplay/src/replay/rstats.c | 108 | ||||
| -rw-r--r-- | ioreplay/src/replay/rstats.h | 117 | ||||
| -rw-r--r-- | ioreplay/src/replay/rtask.c | 50 | ||||
| -rw-r--r-- | ioreplay/src/replay/rtask.h | 69 | ||||
| -rw-r--r-- | ioreplay/src/replay/rthread.c | 216 | ||||
| -rw-r--r-- | ioreplay/src/replay/rthread.h | 123 | ||||
| -rw-r--r-- | ioreplay/src/replay/rworker.c | 360 | ||||
| -rw-r--r-- | ioreplay/src/replay/rworker.h | 82 |
14 files changed, 0 insertions, 1915 deletions
diff --git a/ioreplay/src/replay/replay.c b/ioreplay/src/replay/replay.c deleted file mode 100644 index e4606d1..0000000 --- a/ioreplay/src/replay/replay.c +++ /dev/null @@ -1,191 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "replay.h" - -#include "../datas/amap.h" -#include "../meta/meta.h" -#include "../mounts.h" -#include "rworker.h" -#include "rstats.h" - -void replay_extract_header(options_s *opts, FILE *replay_fd, long *num_vsizes, - long *num_pids, long *num_fds, long *num_lines) -{ - meta_s *m = meta_new(replay_fd); - meta_read_start(m); - - long version = 0; - if (meta_read_l(m, "version", &version)) { - Put("Replay version is '%ld'", version); - if (version != REPLAY_VERSION) { - Error(".replay file of incompatible version, got %x, expected %x", - (int)version, REPLAY_VERSION); - } - } - - char *user; - if (meta_read_s(m, "user", &user)) { - Put("Setting user to '%s'", user); - opts->user = user; - } - - char *name; - if (meta_read_s(m, "name", &name)) { - Put("Setting name to '%s'", name); - opts->name = name; - } - - if (meta_read_l(m, "num_vsizes", num_vsizes)) { - if (*num_vsizes < 0) { - Error("Lamport vsize overflow"); - } - Put("Setting num of vsizes to '%ld'", *num_vsizes); - } - - if (meta_read_l(m, "num_mapped_pids", num_pids)) { - if (*num_pids < 0) { - Error("Process overflow (too many process IDs in .replay)"); - } - Put("Setting num of PIDs to '%ld'", *num_pids); - } - - if (meta_read_l(m, "num_mapped_fds", num_fds)) { - if (*num_fds < 0) { - Error("FD overflow (too many FDs in .replay)"); - } - Put("Setting num of FDs to '%ld'", *num_fds); - } - - if (meta_read_l(m, "num_lines", num_lines)) { - if (*num_fds < 0) { - Error("Overflow (too many lines in .replay)"); - } - Put("Setting num of lines to '%ld'", *num_lines); - } - - meta_destroy(m); -} - -status_e replay_run(options_s *opts) -{ - status_e status = SUCCESS; - - if (opts->drop_caches) { - drop_caches(); - //cache_file(opts->replay_file); - } - - // Extract information from the meta header - FILE *replay_fd = Fopen(opts->replay_file, "r"); - long num_vsizes = 0, num_pids = 0, num_fds = 0, num_lines = 0; - replay_extract_header(opts, replay_fd, &num_vsizes, &num_pids, - &num_fds, &num_lines); - fclose(replay_fd); - - // A map of all file descriptors used. - Out("Creating FD map..."); - amap_s *fds_map = NULL; - if (opts->num_workers > 1) { - fds_map = amap_new_mmapped(num_fds); - } else { - fds_map = amap_new(num_fds); - } - Put("done"); - - // To collect all individual worker's stats into the global - // stats object. - stack_s *all_worker_stats = stack_new(); - - // The global stats object - rstats_s *stats = rstats_new(opts); - rstats_start(stats); - - // Fork worker processes, each worker process will read the .replay file - // individually. - - if (opts->num_workers > 1) { - for (int i = 0; i < opts->num_workers; ++i) { - rworker_stats_s *worker_stats = rworker_stats_new_mmap(); - stack_push(all_worker_stats, worker_stats); - - pid_t pid = fork(); - - if (pid == 0) { - // One worker object per fork - rworker_s *w = rworker_new(i, fds_map, num_vsizes, num_pids, opts, - worker_stats); - - // Process the .replay journal line by line - status_e status = rworker_process_lines(w, num_lines); - Put("worker(%d): Exiting from %d with status %d", i, - pid, status); - rworker_destroy(w); - - // Exit sub-process - exit(status); - - } else if (pid < 0) { - Errno("worker(%d): Unable to create worker process! :'-(", i); - - } else { - Put("worker(%d): Process with pid %d forked", i, pid); - } - } - - set_limits_drop_root(opts->user); - - Put("Waiting for worker processes to finish"); - pid_t pid; - int rworker_status = SUCCESS; - - while ((pid = wait(&rworker_status)) > 0) { - if (rworker_status != SUCCESS) - status = rworker_status; - - Put("Process with pid %d exited with status %d", - pid, rworker_status); - } - - Put("All workers finished (%d)!", status); - - } else { - Put("Only one worker, don't fork sub-processes"); - - rworker_stats_s *worker_stats = rworker_stats_new_mmap(); - stack_push(all_worker_stats, worker_stats); - - rworker_s *w = rworker_new(0, fds_map, num_vsizes, num_pids, - opts, worker_stats); - status = rworker_process_lines(w, num_lines); - rworker_destroy(w); - - Put("Worker finished work!"); - } - - // Collect all statistics - rstats_stop(stats); - while (!stack_is_empty(all_worker_stats)) { - rworker_stats_s *worker_stats = stack_pop(all_worker_stats); - rstats_add_from_worker(stats, worker_stats); - rworker_stats_destroy(worker_stats); - } - stack_destroy(all_worker_stats); - - rstats_print(stats); - rstats_destroy(stats); - - amap_destroy(fds_map); - return status; -} diff --git a/ioreplay/src/replay/replay.h b/ioreplay/src/replay/replay.h deleted file mode 100644 index dcc3d84..0000000 --- a/ioreplay/src/replay/replay.h +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef REPLAY_H -#define REPLAY_H - -#include "../defaults.h" -#include "../utils/futils.h" -#include "../opcodes.h" -#include "../options.h" -#include "rioop.h" -#include "rprocess.h" - -/** - * @brief Replays the given .replay file - * - * @param opts The options object - * @return SUCCESS if everything went fine - */ -status_e replay_run(options_s *opts); - -/** - * @brief Extract required meta data from .replay's meta header - * - * @param opts The options object - * @param replay_fd The file handle to the .replay file - * @param num_vsizes The amount of virtual sizes/paths - * @param num_pids The amount of process IDs - * @param num_fds The amount of virtual file descriptors - * @param num_lines The amount of .replay lines with I/O ops - */ -void replay_extract_header(options_s *opts, FILE *replay_fd, long *num_vsizes, - long *num_pids, long *num_fds,long *num_lines); - -#endif // REPLAY_H diff --git a/ioreplay/src/replay/rioop.c b/ioreplay/src/replay/rioop.c deleted file mode 100644 index 2e16c94..0000000 --- a/ioreplay/src/replay/rioop.c +++ /dev/null @@ -1,425 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rioop.h" - -#include "../vfd.h" -#include "rworker.h" - -// Printing error messages -#define _Error(...) \ - fprintf(stderr, "%s:%d ERROR: ", __FILE__, __LINE__); \ - fprintf(stderr, __VA_ARGS__); \ - fprintf(stderr, "\nlineno:%ld path:%s\n", task->lineno, vfd->path); \ - fflush(stdout); \ - fflush(stderr); \ - exit(ERROR); - -#define _Errno(...) \ - fprintf(stderr, "%s:%d ERROR: %s (%d). ", __FILE__, __LINE__, \ - strerror(errno), errno); \ - fprintf(stderr, __VA_ARGS__); \ - fprintf(stderr, "\nlineno:%ld path:%s\n", task->lineno, vfd->path); \ - fflush(stdout); \ - fflush(stderr); \ - exit(ERROR); - -#define _Init_arg(num) int arg = atoi(task->toks[num]) -#define _Init_cmd(num) int cmd = atoi(task->toks[num]) -#define _Init_fd(num) long fd = atol(task->toks[num]) -#define _Init_flags(num) int flags = atoi(task->toks[num]) -//#define _Init_mode(num) int mode = atoi(task->toks[num]) -#define _Init_offset(num) long offset = atol(task->toks[num]) -#define _Init_op(num) int op = atoi(task->toks[num]) -#define _Init_path2(num) char *path2 = task->toks[num] -#define _Init_path(num) char *path = task->toks[num] -#define _Init_rc(num) int rc = atoi(task->toks[num]) -#define _Init_whence(num) long whence = atol(task->toks[num]) - -#define _Init_bytes(num) \ - int bytes = atoi(task->toks[num]); \ - if (bytes <= 0) return - -#define _Init_virtfd \ - vfd_s *vfd = amap_get(p->fds_map, fd); \ - if (vfd == NULL) return - -void rioop_run(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_op(2); - - switch (op) { - // stat() syscalls - case FSTAT: - rioop_fstat(p, t, task); - break; - case FSTATFS: - case FSTATFS64: - //Error("op(%d) not implemented", op); - break; - case FSTAT_AT: - case LSTAT: - case STAT: - rioop_stat(p, t, task); - break; - case STATFS: - case STATFS64: - //Error("op(%d) not implemented", op); - break; - - // read() syscalls - case READ: - case READV: - rioop_read(p, t, task); - break; - case READAHEAD: - //Error("op(%d) not implemented", op); - break; - case READLINK: - case READLINK_AT: - //Error("op(%d) not implemented", op); - break; - - // write() syscalls - case WRITE: - case WRITEV: - rioop_write(p, t, task); - break; - - // open() and other syscalls which may creat - case OPEN: - case OPEN_AT: - rioop_open(p, t, task, -1); - break; - case CREAT: - // A call to crat() is equivalent to calling open() with flags.. - rioop_open(p, t, task, O_CREAT|O_WRONLY|O_TRUNC); - break; - case MKDIR: - case MKDIR_AT: - rioop_mkdir(p, t, task); - break; - - // rename() syscalls - case RENAME: - case RENAME_AT: - case RENAME_AT2: - rioop_rename(p, t, task); - break; - - // close() and unlink() syscalls - case CLOSE: - rioop_close(p, t, task); - break; - case UNLINK: - case UNLINK_AT: - rioop_unlink(p, t, task); - break; - case RMDIR: - rioop_rmdir(p, t, task); - break; - - // sync() syscalls - case FSYNC: - rioop_fsync(p, t, task); - break; - case FDATASYNC: - rioop_fdatasync(p, t, task); - break; - case SYNC: - case SYNCFS: - case SYNC_FILE_RANGE: - //Error("op(%d) not implemented", op); - break; - - // Other syscalls - case FCNTL: - rioop_fcntl(p, t, task); - break; - case GETDENTS: - rioop_getdents(p, t, task); - break; - case LSEEK: - rioop_lseek(p, t, task); - break; - - // chmod() syscalls - case CHMOD: - rioop_chmod(p, t, task); - break; - case FCHMOD: - rioop_fchmod(p, t, task); - break; - - // chown() syscalls - case CHOWN: - rioop_chown(p, t, task); - break; - case FCHOWN: - case FCHOWNAT: - rioop_fchown(p, t, task); - break; - case LCHOWN: - rioop_lchown(p, t, task); - break; - - // Meta operations (I/O replay internal use only). - case META_EXIT_GROUP: - break; - case META_TIMELINE: - break; - - default: - Error("op(%d) not implemented", op); - break; - } -} - -void rioop_stat(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - struct stat buf; - stat(path, &buf); -} - -void rioop_fstat(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - struct stat buf; - fstat(vfd->fd, &buf); -} - -void rioop_rename(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - _Init_path2(4); - rename(path, path2); -} - -void rioop_read(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_bytes(4); - _Init_virtfd; - - char *buf = Calloc(bytes+1, char); - read(vfd->fd, buf, bytes); - free(buf); -} - -void rioop_write(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_bytes(4); - _Init_virtfd; - - char *buf = Calloc(bytes+1, char); - sprintf(buf, "%ld", task->lineno); - Fill_with_stuff(buf, bytes); - if (vfd->fd == 0) { - Debug("%d %d %ld", vfd->fd, vfd->debug, task->lineno); - _Error("ERROR"); - } - write(vfd->fd, buf, bytes); - free(buf); -} - -void rioop_open(rprocess_s *p, rthread_s *t, rtask_s *task, int flags_) -{ - _Init_fd(3); - _Init_path(4); - _Init_flags(6); - - // Special case as this is creat() now - if (flags_ != -1) - flags = flags_; - - bool directory = Has(flags, O_DIRECTORY); - - if (fd > 0) { - if (directory) { - // We can not open a directory via open() otherwise! - flags &= (O_RDONLY & ~(O_RDWR|O_WRONLY|O_CREAT)); - } else { - // We don't want to open the file in read only mode. - // SystemTap could have skipped syscalls to fcntl or open - flags &= ~O_RDONLY; - } - // flags |= O_DIRECT|O_SYNC; - flags &= ~O_EXCL; - } - - int ret = open(path, flags, S_IRWXU|S_IRWXG|S_IRWXO); - - if (fd < 0 && ret > 0) { - close(ret); -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "TRACE OPEN|open+close|%s|\n", path); - fflush(t->rthread_fd); -#endif - } - - if (fd > 0 && ret > 0) { - vfd_s *vfd = vfd_new(ret, fd, path); - amap_set(p->fds_map, fd, vfd); - -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "TRACE OPEN|open|%s|\n", path); - fflush(t->rthread_fd); -#endif - } -} - -void rioop_close(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - - amap_unset(p->fds_map, fd); - if (vfd->dirfd) { - closedir(vfd->dirfd); -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "TRACE OPEN|closedir|%s|\n", vfd->path); - fflush(t->rthread_fd); -#endif - } else { - close(vfd->fd); -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "TRACE OPEN|close|%s|\n", vfd->path); - fflush(t->rthread_fd); -#endif - } - vfd_destroy(vfd); -} - -void rioop_getdents(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - - // getdents expects a dirfd - DIR *dirfd = fdopendir(vfd->fd); - if (dirfd) { - vfd->dirfd = dirfd; - readdir(dirfd); -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "TRACE OPEN|fdopendir|%s|\n", vfd->path); - fflush(t->rthread_fd); -#endif - } -} - -void rioop_mkdir(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - mkdir(path, S_IRWXU|S_IRWXG|S_IRWXO); -} - -void rioop_unlink(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - unlink(path); -} - -void rioop_rmdir(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - rmdir(path); -} - -void rioop_lseek(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_bytes(6); - _Init_virtfd; - lseek(vfd->fd, bytes, SEEK_SET); -} - -void rioop_fsync(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - fsync(vfd->fd); -} - -void rioop_fdatasync(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - fdatasync(vfd->fd); -} - -void rioop_fcntl(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_cmd(4); - _Init_arg(5); - _Init_virtfd; - - switch (cmd) { - case F_GETFD: - case F_GETFL: - fcntl(vfd->fd, cmd); - break; - case F_SETFD: - case F_SETFL: - fcntl(vfd->fd, cmd, arg); - break; - default: - break; - } -} - -void rioop_chmod(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - chmod(path, S_IRWXU|S_IRWXG|S_IRWXO); -} - -void rioop_fchmod(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - fchmod(vfd->fd, S_IRWXU|S_IRWXG|S_IRWXO); -} - -void rioop_chown(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - rworker_s *w = t->worker; - options_s *opts = w->opts; - struct passwd *pwd = getpwnam(opts->user); - chown(path, pwd->pw_uid, -1); -} - -void rioop_fchown(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_fd(3); - _Init_virtfd; - rworker_s *w = t->worker; - options_s *opts = w->opts; - struct passwd *pwd = getpwnam(opts->user); - fchown(vfd->fd, pwd->pw_uid, -1); -} - -void rioop_lchown(rprocess_s *p, rthread_s *t, rtask_s *task) -{ - _Init_path(3); - rworker_s *w = t->worker; - options_s *opts = w->opts; - struct passwd *pwd = getpwnam(opts->user); - lchown(path, pwd->pw_uid, -1); -} - diff --git a/ioreplay/src/replay/rioop.h b/ioreplay/src/replay/rioop.h deleted file mode 100644 index 4db4284..0000000 --- a/ioreplay/src/replay/rioop.h +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RIOOP_H -#define RIOOP_H - -#include "../defaults.h" -#include "../utils/futils.h" -#include "../opcodes.h" -#include "rprocess.h" -#include "rthread.h" - -/** - * @brief Replays the responsible I/O operation of a given task - * - * @param p The virtual replay process object - * @param t The thread object - * @param task The replay task object - */ -void rioop_run(rprocess_s *p, rthread_s *t, rtask_s *task); - -void rioop_close(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_fcntl(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_fdatasync(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_fstat(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_fsync(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_getdents(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_mkdir(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_open(rprocess_s *p, rthread_s *t, rtask_s *task, int flags_); -void rioop_read(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_rename(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_stat(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_lseek(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_unlink(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_rmdir(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_write(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_chmod(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_fchmod(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_chown(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_fchown(rprocess_s *p, rthread_s *t, rtask_s *task); -void rioop_lchown(rprocess_s *p, rthread_s *t, rtask_s *task); - -#endif // RIOOP_H diff --git a/ioreplay/src/replay/rprocess.c b/ioreplay/src/replay/rprocess.c deleted file mode 100644 index 4efd835..0000000 --- a/ioreplay/src/replay/rprocess.c +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rprocess.h" - -rprocess_s* rprocess_new(const int pid, amap_s *fds_map) -{ - rprocess_s *p = Malloc(rprocess_s); - - p->fds_map = fds_map; - p->pid = pid; - p->terminate = 0; - p->lineno = 0; - - return p; -} - -void rprocess_destroy(rprocess_s *p) -{ - if (!p) - return; - free(p); -} diff --git a/ioreplay/src/replay/rprocess.h b/ioreplay/src/replay/rprocess.h deleted file mode 100644 index 739dd89..0000000 --- a/ioreplay/src/replay/rprocess.h +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RPROCESS_H -#define RPROCESS_H - -#include "../datas/hmap.h" -#include "../datas/amap.h" -#include "../defaults.h" -#include "rthread.h" - -/** - * @brief The virtual replay process object definition - * - * This defines a virtual process in replay context. - */ -typedef struct rprocess_s_ { - int terminate; /**< Indicates whether the worker is terminating or not */ - int rworker_num; /**< The worker number of the responsible worker */ - int pid; /**< The virtual process ID */ - unsigned long lineno; /**< Holding the current .replay line number */ - bool initm; /**< Indicates whether ioreplay is in init mode or not */ - amap_s *fds_map; /**< Holding all file descriptors */ -} rprocess_s; - -rprocess_s* rprocess_new(const int pid, amap_s *fds_map); -void rprocess_destroy(rprocess_s* p); - -#endif // RPROCESS_H diff --git a/ioreplay/src/replay/rstats.c b/ioreplay/src/replay/rstats.c deleted file mode 100644 index c3e6e38..0000000 --- a/ioreplay/src/replay/rstats.c +++ /dev/null @@ -1,108 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rstats.h" - -#include <sys/types.h> - -rstats_s* rstats_new(options_s *opts) -{ - rstats_s *s = Malloc(rstats_s); - - s->opts = opts; - s->loadavg_high = 0; - s->ioops = 0; - s->duration = 0; - s->time_ahead = -1; - - if (opts->stats_file) - s->stats_fd = Fopen(opts->stats_file, "w"); - else - s->stats_fd = stdout; - - return s; -} - -void rstats_destroy(rstats_s *s) -{ - if (s->stats_fd != stdout) - fclose(s->stats_fd); - - free(s); -} - -rworker_stats_s* rworker_stats_new_mmap(options_s *opts) -{ - // Share this object between processes, so that the stats cann be - // collected by the master process! - rworker_stats_s *s = Mmapshared(rworker_stats_s); - - s->loadavg_high = 0; - s->ioops = 0; - s->time_ahead = -1; - - return s; -} - -void rworker_stats_destroy(rworker_stats_s *s) -{ - munmap(s, sizeof(rworker_stats_s)); -} - - -void rstats_start(rstats_s* s) -{ - gettimeofday(&s->start_time, NULL); -} - -void rstats_stop(rstats_s* s) -{ - gettimeofday(&s->end_time, NULL); - s->duration= ((s->end_time.tv_sec - s->start_time.tv_sec) * 1000 - + (s->end_time.tv_usec - s->start_time.tv_usec) / 1000) / 1000; - -} - -void rstats_add_from_worker(rstats_s* s, rworker_stats_s* w) -{ - if (s->loadavg_high < w->loadavg_high) - s->loadavg_high = w->loadavg_high; - - if (s->time_ahead == -1 || s->time_ahead > w->time_ahead) - s->time_ahead = w->time_ahead; - - s->ioops += w->ioops; -} - -void rstats_print(rstats_s* s) -{ - options_s *opts = s->opts; - - if (opts->stats_file) { - Put("Writing stats to '%s'", opts->stats_file); - } - - fprintf(s->stats_fd, "Stats of test '%s':\n", opts->name); - fprintf(s->stats_fd, "\tNum workers: %d\n", opts->num_workers); - fprintf(s->stats_fd, "\tThreads per worker: %d\n", opts->num_threads_per_worker); - fprintf(s->stats_fd, "\tThreads total: %d\n", - opts->num_threads_per_worker * opts->num_workers); - fprintf(s->stats_fd, "\tHighest loadavg: %.2f\n", s->loadavg_high); - fprintf(s->stats_fd, "\tPerformed ioops: %ld\n", s->ioops); - if (s->duration > 0) - fprintf(s->stats_fd, "\tAverage ioops/s: %.2f\n", s->ioops/s->duration); - fprintf(s->stats_fd, "\tTime ahead: %lds\n", s->time_ahead/1000); - fprintf(s->stats_fd, "\tTotal time: %.2fs\n", s->duration); -} - diff --git a/ioreplay/src/replay/rstats.h b/ioreplay/src/replay/rstats.h deleted file mode 100644 index 1ce3f27..0000000 --- a/ioreplay/src/replay/rstats.h +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file rstats.h - * @author Paul Buetow - * - * @brief For collecting replay stats - */ - -#ifndef RSTATS_H -#define RSTATS_H - -#include "../defaults.h" -#include "../options.h" - -#include <pthread.h> - -/** - * @brief Definition of the rstats object - * - * Used to store global statistics. - */ -typedef struct rstats_s_ { - double loadavg_high; /**< Highest load average */ - long ioops; /**< Total amount if io operations */ - double duration; /**< Duration of the test */ - long time_ahead; /**< Time ahead of the original speed */ - struct timeval start_time; /**< Start time of the test */ - struct timeval end_time; /**< End time of the test */ - options_s *opts; /**< The I/O replay options object */ - FILE *stats_fd; /**< The file descriptor for writing the stats */ -} rstats_s; - -/** - * @brief Definition of the per worker stats object - * - * Used to store per worker process I/O stats - */ -typedef struct rworker_stats_s_ { - double loadavg_high; /**< Highest amount of io ops per second */ - long ioops; /**< Total amount if io operations */ - long time_ahead; /**< Time ahead of the original speed */ -} rworker_stats_s; - -/** - * @brief Creates a new stats object - * - * @return The new stats object - */ -rstats_s* rstats_new(options_s *opts); - -/** - * @brief Destroys the stats object - * - * @param s The stats object - */ -void rstats_destroy(rstats_s* s); - -/** - * @brief Creates a new per worker stats object - * - * The memory is mapped into shared memory so it can be shared across multiple - * processes. - * - * @return The new stats object - */ -rworker_stats_s* rworker_stats_new_mmap(); - -/** - * @brief Destroys the per worker stats object - * - * @param s The stats object - */ -void rworker_stats_destroy(rworker_stats_s* s); - -/** - * @brief Starts the stats - * - * @param s The stats object - */ -void rstats_start(rstats_s* s); - -/** - * @brief Finalises the stats - * - * @param s The stats object - */ -void rstats_stop(rstats_s* s); - -/** - * @brief Prints the stats - * - * @param s The stats object - */ -void rstats_print(rstats_s* s); - -/** - * @brief Adds per worker stats to the global stats object - * - * @param s The global stats object - * @param w The worker stats object - */ -void rstats_add_from_worker(rstats_s* s, rworker_stats_s* w); - -#endif diff --git a/ioreplay/src/replay/rtask.c b/ioreplay/src/replay/rtask.c deleted file mode 100644 index b1afb92..0000000 --- a/ioreplay/src/replay/rtask.c +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rtask.h" - -#include "rthread.h" -#include "rworker.h" - -rtask_s* rtask_new() -{ - rtask_s *task = Malloc(rtask_s); - - *task = (rtask_s) { - .worker = NULL, .process = NULL - }; - task->line[0] = '\0'; - -#ifdef THREAD_DEBUG - task->clone = NULL; -#endif - - return task; -} - -void rtask_destroy(rtask_s *task) -{ - if (task) - free(task); -} - -void rtask_update(rtask_s *task, void *worker, void *process, char *line, - const long lineno, const long vsize) -{ - task->worker = worker; - task->process = process; - task->lineno = lineno; - task->vsize = vsize; - strcpy(task->line, line); -} diff --git a/ioreplay/src/replay/rtask.h b/ioreplay/src/replay/rtask.h deleted file mode 100644 index 35c5714..0000000 --- a/ioreplay/src/replay/rtask.h +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RTASK_H -#define RTASK_H - -#include "../defaults.h" - -/** - * @brief The replay task definition - * - * The rtask holds all possible variables required to process a particular - * .replay line and to replay the corresponding I/O operation. - */ -typedef struct rtask_s_ { - void *worker; /* The responsible worker object */ - void *process; /* The responsible process object */ - unsigned long lineno; /**< The current line number */ - unsigned long vsize; /**< The vsize */ - char *toks[MAX_TOKENS+1]; /**< The tokens parsed from the .replay line */ - char line[MAX_LINE_LEN]; /**< The remaining part of the .replay line */ -#ifdef RTASK_DEBUG - char *clone; /**< Used for debug purposes only */ -#endif -} rtask_s; - -/** - * @brief Creates a new thread task object - * - * This function creates a new thread task object. Such a task object is used - * by the worker to hand over I/O tasks to the corresponding threads. The - * actual I/O work is performed by the threads then. - * - * @return The new thread task object - */ -rtask_s* rtask_new(); - -/** - * @brief Destroys the replay task object - * - * @param t The thread task object to be destroyed - */ -void rtask_destroy(rtask_s* t); - -/** - * @brief Updates a reused/recycle task object - * - * @param task The task object to be updated - * @param worker The responsibe worker object - * @param process The responsible process object - * @param line The remaining line of the .replay file - * @param lineno The current line number of the .replay file - * @param vsize The vsize/path id - */ -void rtask_update(rtask_s *task, void *worker, void *process, char *line, - const long lineno, const long vsize); - -#endif // RTASK_H diff --git a/ioreplay/src/replay/rthread.c b/ioreplay/src/replay/rthread.c deleted file mode 100644 index 55364ec..0000000 --- a/ioreplay/src/replay/rthread.c +++ /dev/null @@ -1,216 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rthread.h" - -#include <sys/types.h> - -#include "rworker.h" -#include "rprocess.h" - -#include "rioop.h" - -#ifdef THREAD_DEBUG -/** - * @brief For debugging purposes only - * - * @param t The responsible thread object - */ -static void _rthread_init_log(rthread_s *t) -{ - rworker_s *w = t->worker; - char *rthread_log = Calloc(1024, char); - snprintf(rthread_log, 1023, "/tmp/ioreplay/worker%d.thread%ld.debuglog", - w->rworker_num, (long)pthread_self()); - - ensure_dir_exists("/tmp/ioreplay"); - t->rthread_fd = Fopen(rthread_log, "a"); - - free(rthread_log); - fprintf(t->rthread_fd, "%ld: DEBUG: Created thread log\n", t->tid); -} -#endif - -void rthread_process_task(rthread_s* t, rtask_s *task, - pid_t pthread_id) -{ - char *next = task->line; - rworker_s *w = (rworker_s*) task->worker; - - // Tokenize the remaining elements of the line. - int ntoks = 0; - char *saveptr; - char *tok = strtok_r(next, "|", &saveptr); - - while (tok) { - if (ntoks > MAX_TOKENS) { - Error("worker(%d) pthread(%d): lineno:%lu, missing newline?", - w->rworker_num, pthread_id, task->lineno); - } - task->toks[ntoks++] = tok; - tok = strtok_r(NULL, "|", &saveptr); - } - // NULL marker (no more token from here) - task->toks[ntoks] = NULL; - -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "%ld(%ld): %s", - t->tid, (long)pthread_self(), task->clone); - fflush(t->rthread_fd); - free(task->clone); - task->clone = NULL; -#endif -#ifndef NO_RIOOP - // Perform the corresponding I/O operation! - rioop_run(task->process, t, task); -#endif - - // Make the task object recyclable/reusable - pthread_mutex_lock(&w->task_buffer_mutex); - if (!rbuffer_insert(w->task_buffer, task)) - // We can't recycle the task object if the buffer is full! - rtask_destroy(task); - pthread_mutex_unlock(&w->task_buffer_mutex); -} - -void *rthread_pthread_start(void *data) -{ - rthread_s* t = (rthread_s*) data; - rworker_s *w = t->worker; - rtask_s *task = NULL; - pid_t pthread_id = pthread_self(); - -#ifdef THREAD_DEBUG - _rthread_init_log(t); -#endif - - do { - while (!rbuffer_has_next(t->tasks) && !t->terminate) - usleep(100); - - while ((task = rbuffer_get_next(t->tasks)) != NULL) - rthread_process_task(t, task, pthread_id); - -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "%ld: DEBUG: Idling\n", t->tid); - fflush(t->rthread_fd); -#endif - - // Tell rworker_s that thread is not doing any work! - int inserted = false; - while (!inserted && !t->terminate) { - if (rbuffer_has_next(t->tasks)) - break; - - usleep(1000); - - if (rbuffer_has_next(t->tasks)) - break; - - // Make the rthread reusable, he is without any tasks - // for some time. - pthread_mutex_lock(&w->rthread_buffer_mutex); - inserted = rbuffer_insert(w->rthread_buffer, t); - pthread_mutex_unlock(&w->rthread_buffer_mutex); - } - -#ifdef THREAD_DEBUG - if (inserted) { - fprintf(t->rthread_fd, "%ld: DEBUG: Added to thread buffer\n", - t->tid); - } else { - fprintf(t->rthread_fd, "%ld: DEBUG: Idling thread recovered\n", - t->tid); - } - fflush(t->rthread_fd); -#endif - - } while (!t->terminate); - -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "%ld: DEBUG: Terminating\n", t->tid); - fflush(t->rthread_fd); -#endif - - // Process the very last tasks - while (NULL != (task = rbuffer_get_next(t->tasks))) - rthread_process_task(t, task, pthread_id); - -#ifdef THREAD_DEBUG - fprintf(t->rthread_fd, "%ld: DEBUG: Done terminating\n", t->tid); - fflush(t->rthread_fd); -#endif - - return NULL; -} - -rthread_s* rthread_new(const long tid, void *worker) -{ - rthread_s *t = Malloc(rthread_s); - rworker_s *w = worker; - - t->single_threaded = w->opts->num_threads_per_worker == 1; - t->tasks = rbuffer_new(TASK_BUFFER_PER_THREAD); - t->terminate = false; - t->worker = worker; - rthread_update(t, tid); - - if (t->single_threaded) { -#ifdef THREAD_DEBUG - _rthread_init_log(t); -#endif - return t; - } - - start_pthread(&t->pthread, rthread_pthread_start, (void*)t); - return t; -} - -long rthread_update(rthread_s *t, const long tid) -{ - long prev_tid = t->tid; - t->tid = tid; - - return prev_tid; -} - -void rthread_destroy(rthread_s *t) -{ - if (rbuffer_has_next(t->tasks)) { - Error("Didn't expect to have any tasks left!"); - } - rbuffer_destroy(t->tasks); - -#ifdef THREAD_DEBUG - if (t->rthread_fd) - fclose(t->rthread_fd); -#endif - - free(t); -} - -bool rthread_insert_task(rthread_s* t, rtask_s* task) -{ - if (t->single_threaded) { - rthread_process_task(t, task, pthread_self()); - return true; - } - return rbuffer_insert(t->tasks, task); -} - -void rthread_terminate(rthread_s* t) -{ - t->terminate = true; - pthread_join(t->pthread, NULL); -} diff --git a/ioreplay/src/replay/rthread.h b/ioreplay/src/replay/rthread.h deleted file mode 100644 index 9971e49..0000000 --- a/ioreplay/src/replay/rthread.h +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file rthread.h - * @author Paul Buetow - * - * @brief The replay thread definitiion - */ - -#ifndef RTHREAD_H -#define RTHREAD_H - -#include "../defaults.h" -#include "../datas/rbuffer.h" -#include "../datas/amap.h" -#include "../vfd.h" -#include "rtask.h" - -#include <pthread.h> - -/** - * @brief Definition of a worker thread - * - * Every worker utilises a set of worker threads in order to parallelise the - * replaying of the I/O! Every thread comes with its own task queue. It is - * filled by the repsonsible worker. - * - * The user can specify the max amount of threads per worker per -t command - * line switch. - */ -typedef struct rthread_s_ { - void *worker; /**< The responsible worker object */ - long tid; /**< The virtual thread id */ - rbuffer_s* tasks; /**< Holds all outstanding tasks */ - bool terminate; /**< True if thread shall terminate */ - bool single_threaded; /**< Worker is single threaded or not */ - pthread_t pthread; /**< We run the tasks in concurrent pthreads */ -#ifdef RTHREAD_DEBUG - FILE *rthread_fd; /**< Used for debugging purposes only */ -#endif -} rthread_s; - -/** - * @brief Creates a new thread object - * - * @param tid The thread ID - * @param worker The worker object managing this thread - * @return The new thread object - */ -rthread_s* rthread_new(const long tid, void *worker); - -/** - * @brief Updates a thread object after recycling it - * - * @param t The thread object - * @param tid The new thread ID - */ -long rthread_update(rthread_s *t, const long tid); - -/** - * @brief Terminates the thread - * - * This function waits (via join) for the pthread to complete all its - * current tasks from the queue. - * - * @param t The thread object - */ -void rthread_terminate(rthread_s* t); - -/** - * @brief Destroys the thread object - * - * @param t The thread object - */ -void rthread_destroy(rthread_s* t); - -/** - * @brief Inserts a task into the threads work queue - * - * Inserts a task into the threads work queue. We use an atomic ring buffer - * data structure for the work queue. The ring buffer does not require any - * mutex locks. - * - * @param t The thread object - * @param task The task to be inserted - * @return Returns true on success, returns false if the task queue is full - */ -bool rthread_insert_task(rthread_s* t, rtask_s* task); - -/** - * @brief Used by the pthread to process a task - * - * In this function the pthread will attempt to process a task. It extracts all - * required information from the task object and invokes the corresponding I/O - * syscalls. - * - * @param t The responsible thread object - * @param task The task object - * @param pthread_id The current pthread id - */ -void rthread_process_task(rthread_s* t, rtask_s *task, pid_t pthread_id); - -/** - * @brief The entry function for the pthreads - * - * @param data The data structure passed to the pthread - * @return The exit code of the pthread. - */ -void *rthread_pthread_start(void *data); - -#endif // RTHREAD_H diff --git a/ioreplay/src/replay/rworker.c b/ioreplay/src/replay/rworker.c deleted file mode 100644 index 0e3fbe9..0000000 --- a/ioreplay/src/replay/rworker.c +++ /dev/null @@ -1,360 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rworker.h" - -#include "../datas/stack.h" -#include "rprocess.h" -#include "rthread.h" - -#define _Compute_current_time(now) \ - (now.tv_sec - start_time.tv_sec) * 1000 \ - + (now.tv_usec - start_time.tv_usec) / 1000 - - -/** - * @brief A callback helper function for destroying all virtual process objects - * - * @param data The process object. - */ -static void _rprocess_destroy_cb(void *data) -{ - rprocess_destroy(data); -} - -rworker_s* rworker_new(const int rworker_num, amap_s *fds_map, - const long num_vsizes, const long num_pids, - options_s *opts, rworker_stats_s *worker_stats) -{ - rworker_s *w = Malloc(rworker_s); - -#ifdef THREAD_DEBUG - char *rworker_log = Calloc(1024, char); - snprintf(rworker_log, 1023, "/tmp/ioreplay/_worker%d.debuglog", - rworker_num); - - w->rworker_fd = Fopen(rworker_log, "a"); - free(rworker_log); - fprintf(w->rworker_fd, "DEBUG: Started worker\n"); -#endif - - w->rworker_num = rworker_num; - w->opts = opts; - w->fds_map = fds_map; - - w->rprocess_map = amap_new(num_pids); - w->rthread_map = amap_new(num_vsizes); - w->task_buffer = rbuffer_new(opts->num_threads_per_worker - *TASK_BUFFER_PER_THREAD); - w->rthread_buffer = rbuffer_new(opts->num_threads_per_worker); - w->worker_stats = worker_stats; - - // Attach a cleanup callback function to the worker map. - w->rprocess_map->data_destroy = _rprocess_destroy_cb; - - pthread_mutex_init(&w->rthread_buffer_mutex, NULL); - pthread_mutex_init(&w->task_buffer_mutex, NULL); - - // TODO: Check in the program whether the ulimit is high enough - // or not! (ulimit -n) - - return w; -} - -/** - * @brief Destroys the object - * - * Destroys the worker object (frees all memory allocated by the worker) - * - * @param w The worker object - */ -void rworker_destroy(rworker_s *w) -{ - if (!w) - return; - - if (w->rprocess_map) - amap_destroy(w->rprocess_map); - if (w->rthread_map) - amap_destroy(w->rthread_map); - - if (w->task_buffer) { - rtask_s *task = NULL; - while (NULL != (task = rbuffer_get_next(w->task_buffer))) - rtask_destroy(task); - rbuffer_destroy(w->task_buffer); - } - - if (w->rthread_buffer) - rbuffer_destroy(w->rthread_buffer); - - pthread_mutex_destroy(&w->task_buffer_mutex); - pthread_mutex_destroy(&w->rthread_buffer_mutex); - -#ifdef THREAD_DEBUG - if (w->rworker_fd) - fclose(w->rworker_fd); -#endif - - free(w); -} - -status_e rworker_process_lines(rworker_s* w, const long num_lines) -{ - Out("worker(%d): Starting to process replay lines\n", w->rworker_num); - - options_s *opts = w->opts; - FILE *replay_fd = Fopen(opts->replay_file, "r"); - - // Drop root privileges, otherwise we may overwrite other system - // files by accident in case of a bug or user error! - set_limits_drop_root(opts->user); - - // Variables required for the time based caluclations - struct timeval now, start_time; - long current_time = 0, stats_time = 0; - gettimeofday(&start_time, NULL); - - // Helper variables required for reading lines - char *line = NULL; - char *next = NULL, *next2 = NULL; - size_t len = 0, read = 0; - - // Helpers required for threading - rthread_s *t = NULL; - stack_s *all_threads = stack_new(); - rworker_stats_s *s = w->worker_stats; - - // More helper variables - //unsigned long lineno = 0, stats_ioop = 0, vsize_id = 0; - unsigned long lineno = 0, vsize_id = 0; - long pid = -1, time = -1; - - // Process the .replay file line by line. - while ((read = getline(&line, &len, replay_fd)) != -1) { - lineno++; - - if (read >= MAX_LINE_LEN) { - Error("line:%lu Exceeded max line len", lineno); - } - - // If the line begins with #: Ignore that line, it contains - // debug or meta information or comments. - - if (line[0] == '#') { - if (line[1] == 'I') { - // We stop replaying I/O once we reach the line '#INIT' - // which incitates the begin of the INIT section. - break; - } - continue; - } - -#ifdef THREAD_DEBUG - char *clone = Clone(line); -#endif - - next = strchr(line, '|'); - Error_if(!next, "lineno:%ld Could not parse time from input file", - lineno); - next[0] = '\0'; - next++; - time = atol(line); - - next2 = strchr(next, '|'); - Error_if(!next2, "Could not parse vsize_id from input file"); - next2[0] = '\0'; - next2++; - vsize_id = atol(next); - - // This worker is not responsible for this line, skip it! - if ((vsize_id % opts->num_workers) != w->rworker_num) { -#ifdef THREAD_DEBUG - free(clone); -#endif - continue; - } - - next = strchr(next2, '|'); - Error_if(!next, "Could not parse PID from input file"); - next[0] = '\0'; - next++; - pid = atol(next2); - - gettimeofday(&now, NULL); - current_time = _Compute_current_time(now); - - // Check whether the user specified a replay speed factor. If so, we - // may need to throttle down a bit. - - if (opts->speed_factor) { - s->time_ahead = time / opts->speed_factor - current_time; - if (s->time_ahead > 0) - usleep(s->time_ahead*1000); - - } else { - s->time_ahead = time - current_time; - } - - // Get the responsible process object. The process object holds data - // structures usually found in a Linux process, e.g. a table of open - // file descriptors. - - rprocess_s *p = amap_get(w->rprocess_map, pid); - if (p == NULL) { - p = rprocess_new(pid, w->fds_map); - amap_set(w->rprocess_map, pid, p); - } - p->lineno = lineno; - - if (opts->num_threads_per_worker == 1) { - // Single threaded mode? - if (!t) - t = rthread_new(vsize_id, w); - else - rthread_update(t, vsize_id); - - } else { - t = amap_get(w->rthread_map, vsize_id); - } - - if (t == NULL) { - - // First try to recycle an old (likely unused) thread - if (NULL != (t = rbuffer_get_next(w->rthread_buffer))) { - rthread_update(t, vsize_id); - -#ifdef THREAD_DEBUG - fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n"); - fflush(w->rworker_fd); -#endif - - } else if (opts->num_threads_per_worker <= all_threads->size) { - // Reached max threads, waiting until one becomes available - -#ifdef THREAD_DEBUG - fprintf(w->rworker_fd, "DEBUG: Reached max threads\n"); - fflush(w->rworker_fd); -#endif - while (NULL == (t = rbuffer_get_next(w->rthread_buffer))) - usleep(1000); - -#ifdef THREAD_DEBUG - fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n"); - fflush(w->rworker_fd); -#endif - - rthread_update(t, vsize_id); - - } else { - t = rthread_new(vsize_id, w); - - // We hold a pointer to all created threads in a stack. This - // stack is later used to terminate/join all therads. - stack_push(all_threads, t); - -#ifdef THREAD_DEBUG - fprintf(w->rworker_fd, "DEBUG: Created a new thread\n"); - fflush(w->rworker_fd); -#endif - } - - amap_set(w->rthread_map, vsize_id, t); - } - - // Create a new task for the thread. The task contains all required - // information to run an I/O operation. However, first try to - // reuse/recycle a task object! If there is no such, create a new one. - - rtask_s *task = rbuffer_get_next(w->task_buffer); - if (!task) - task = rtask_new(); - rtask_update(task, w, p, next, lineno, vsize_id); - s->ioops++; - - -#ifdef THREAD_DEBUG - task->clone = clone; - fprintf(w->rworker_fd, "DEBUG: Inserting new task\n"); - fflush(w->rworker_fd); -#endif - - // Insert that task to a ring buffer to pass it to the pthread without - // much synchronisation overhead! - - while (!rthread_insert_task(t, task)) - // The ring buffer is full. This may happen if the pthread didn't - // manage to process tasks fast enough. re-try after a short period! - usleep(1000); - -#ifdef THREAD_DEBUG - fprintf(w->rworker_fd, "DEBUG: Task inserted\n"); - fflush(w->rworker_fd); -#endif - - // The worker prints out stats every 3s - if (current_time - stats_time > 3000) { - // IDEA: Maybe refactor this block to be implemented in rstats.c - - double loadavg = get_loadavg(); - - // Determines whether we replay the I/O faster or slower than - // original speed! - char *a_b = s->time_ahead >= 0 ? "ahead" : "behind"; - - Put("worker(%d): threads:%ld %s:%lds progress:%0.2f%% " - "loadavg:%0.2f", - w->rworker_num, all_threads->size, a_b, Abs(s->time_ahead/1000), - Perc(lineno,num_lines), loadavg); - - stats_time = current_time; - //stats_ioop = lineno; - - if (s->loadavg_high < loadavg) - s->loadavg_high = loadavg; - } - } - - Put("worker(%d): Waiting for all threads to finish business...", - w->rworker_num); - - // This will wait (join) all threads one after another until all threads - // have finished their work and have terminated. - - while (!stack_is_empty(all_threads)) { - rthread_s *t = stack_pop(all_threads); - rthread_terminate(t); - rthread_destroy(t); - } - stack_destroy(all_threads); - - // Collect some stats last time - double loadavg = get_loadavg(); - if (s->loadavg_high < loadavg) - s->loadavg_high = loadavg; - - gettimeofday(&now, NULL); - current_time = _Compute_current_time(now); - if (opts->speed_factor) { - s->time_ahead = time / opts->speed_factor - current_time; - } else { - s->time_ahead = time - current_time; - } - - - Put("worker(%d): All threads terminated!", w->rworker_num); - fclose(replay_fd); - - return SUCCESS; -} diff --git a/ioreplay/src/replay/rworker.h b/ioreplay/src/replay/rworker.h deleted file mode 100644 index 26a1300..0000000 --- a/ioreplay/src/replay/rworker.h +++ /dev/null @@ -1,82 +0,0 @@ -// Copyright 2018 Mimecast Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#ifndef RWORKER_H -#define RWORKER_H - -#include <pthread.h> - -#include "../datas/amap.h" -#include "../datas/rbuffer.h" -#include "../defaults.h" -#include "../options.h" -#include "rstats.h" - -/** - * @brief Represents a worker process. - * - * This represents an I/O replay worker process. The user can specify the - * amount of worker processes via the -p command line switch. This is not - * to confuse with rprocess_s, which represents an original captured process - * and we now want to replay the I/O for! - */ -typedef struct { - int rworker_num; /**< The current worker ID */ - amap_s* fds_map; /**< Holding all file descriptors */ - amap_s* rprocess_map; /**< Holding all processes handled by this worker */ - amap_s* rthread_map; /**< Holding all threads handled by this worker */ - rbuffer_s *task_buffer; /**< Buffering thread tasks to be reused */ - pthread_mutex_t task_buffer_mutex; /**< To sync access to task_buffer */ - rbuffer_s *rthread_buffer; /**< Buffering idle threads to be reused */ - pthread_mutex_t rthread_buffer_mutex; /**< Sync access to rthread_buffer */ - options_s *opts; /**< To synchronise access to rthread_buffer */ - rworker_stats_s *worker_stats; /**< Object holding per worker statistics */ -#ifdef RTHREAD_DEBUG - FILE *rworker_fd; /**< For debugging purposes only */ -#endif -} rworker_s; - -/** - * @brief Creates a new worker object - * - * @param rworker_num The worker number - * @param fds_map A map of all virtual file descriptor objects - * @param num_vsizes The amount of virtual sizes/total file paths of the test - * @param num_pids The total amount of virtual process IDs used in this test - * @param opts A pointer to the options object - * @param worker_stats A pointer to the worker stats object - - * @return The new worker object - */ -rworker_s* rworker_new(const int rworker_num, amap_s *fds_map, - const long num_vsizes, const long num_pids, - options_s* opts, rworker_stats_s *worker_stats); - -/** - * @brief Destroys a worker object - * - * @param w The worker object to be destroyed - */ -void rworker_destroy(rworker_s* w); - -/** - * @brief Makes the worker to process all .replay lines - * - * @param w The responsible worker object - * @param num_lines The total amount of I/O op lines in the .replay file - * @return SUCCESS if everything went fine - */ -status_e rworker_process_lines(rworker_s* w, const long num_lines); - -#endif // RWORKER_H |
