diff options
Diffstat (limited to 'ioreplay/src/init')
| -rw-r--r-- | ioreplay/src/init/init.c | 226 | ||||
| -rw-r--r-- | ioreplay/src/init/init.h | 64 | ||||
| -rw-r--r-- | ioreplay/src/init/itask.c | 66 | ||||
| -rw-r--r-- | ioreplay/src/init/itask.h | 72 | ||||
| -rw-r--r-- | ioreplay/src/init/ithread.c | 99 | ||||
| -rw-r--r-- | ioreplay/src/init/ithread.h | 86 |
6 files changed, 613 insertions, 0 deletions
diff --git a/ioreplay/src/init/init.c b/ioreplay/src/init/init.c new file mode 100644 index 0000000..988729e --- /dev/null +++ b/ioreplay/src/init/init.c @@ -0,0 +1,226 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "init.h" + +#include "../datas/stack.h" +#include "itask.h" +#include "ithread.h" +#include "../meta/meta.h" +#include "../mounts.h" +#include "../utils/futils.h" + + +init_s *init_new(options_s *opts) +{ + init_s *i = Malloc(init_s); + + i->opts = opts; + i->mounts = mounts_new(opts); + i->threads_map = amap_new(i->mounts->count); + i->reuse_queue = rbuffer_new(4096); + i->replay_fd = Fopen(opts->replay_file, "r"); + + pthread_mutex_init(&i->reuse_queue_mutex, NULL); + + return i; +} + +void init_destroy(init_s *i) +{ + amap_destroy(i->threads_map); + mounts_destroy(i->mounts); + + itask_s *task = NULL; + while (NULL != (task = rbuffer_get_next(i->reuse_queue))) { + itask_destroy(task); + } + rbuffer_destroy(i->reuse_queue); + + fclose(i->replay_fd); + pthread_mutex_destroy(&i->reuse_queue_mutex); + + free(i); +} + +void init_extract_header(init_s *i, off_t *init_offset) +{ + options_s *opts = i->opts; + meta_s *m = meta_new(i->replay_fd); + meta_read_start(m); + + long version = 0; + if (meta_read_l(m, "version", &version)) { + Put("Replay version is '%ld'", version); + if (version != REPLAY_VERSION) { + Error(".replay file of incompatible version, got %x, expected %x", + (int)version, REPLAY_VERSION); + } + } + + char *user; + if (meta_read_s(m, "user", &user)) { + Put("Setting user to '%s'", user); + opts->user = user; + } + + char *name; + if (meta_read_s(m, "name", &name)) { + Put("Setting name to '%s'", name); + opts->name = name; + } + + if (meta_read_l(m, "init_offset", init_offset)) { + if (*init_offset < 0) { + Error("Offset overflow (init offset too large in .replay)"); + } + Put("Setting init offset to '%ld'", *init_offset); + } + + meta_destroy(m); +} + +status_e init_run(options_s *opts) +{ + status_e ret = SUCCESS; + init_s *i = init_new(opts); + + off_t init_offset; + init_extract_header(i, &init_offset); + + // Ensure that all ./replay/NAME directories exist + mounts_init(i->mounts); + + // Don't do messy stuff as super user + drop_root(opts->user); + + // We need to clean up garbish from previous runs! + if (opts->purge) + mounts_purge(i->mounts); + else + mounts_trash(i->mounts); + + Out("Creating all files and directories requried for test '%s'...", + opts->name); + + // Seek to the INIT section + fseeko(i->replay_fd, init_offset, SEEK_SET); + + bool is_file = false, is_dir = false; + long vsize = 0; + char *path; + + // Stats + long dirs_created = 0; + long files_created = 0; + long files_total_size = 0; + + // Helper variables for getline + char *line = NULL; + size_t len = 0, read = 0; + char *saveptr; + + stack_s *all_threads = stack_new(); + + // Process the INIT section of the .replay file line by line. + + while ((read = getline(&line, &len, i->replay_fd)) != -1) { + char *tok = strtok_r(line, "|", &saveptr); + + for (int ntok = 0; tok; ntok++) { + switch (ntok) { + case 0: + is_dir = atoi(tok) == 1; + break; + case 1: + is_file = atoi(tok) == 1; + break; + case 2: + vsize = atol(tok); + if (vsize < 0) { + Error("Size overflow"); + } + break; + case 3: + path = tok; + break; + default: + break; + } + + tok = strtok_r(NULL, "|", &saveptr); + } + + itask_s *task = rbuffer_get_next(i->reuse_queue); + + if (!task) { + task = itask_new(); + + } else { + itask_extract_stats(task, &dirs_created, &files_created, + &files_total_size); + } + + // Set new task values + if (is_dir) { + task->is_dir = true; + + } else if (is_file) { + task->is_file = true; + task->vsize = vsize; + } + task->path = Clone(path); + + // We run one init thread per mount point + int mnr = mounts_get_mountnumber(i->mounts, path); + ithread_s *t = amap_get(i->threads_map, mnr); + + if (!t) { + t = ithread_new(i); + amap_set(i->threads_map, mnr, t); + stack_push(all_threads, t); + ithread_start(t); + } + + //itask_print(task); + while (!rbuffer_insert(t->queue, task)) + usleep(1000); + } + + ithread_s *t = NULL; + while (NULL != (t = stack_pop(all_threads))) { + ithread_terminate(t); + ithread_destroy(t); + } + stack_destroy(all_threads); + + itask_s *task = NULL; + while (NULL != (task = rbuffer_get_next(i->reuse_queue))) { + itask_extract_stats(task, &dirs_created, &files_created, + &files_total_size); + itask_destroy(task); + } + + Put("Done!"); + + Put("Created %ld files (net total size: %.2fg) and %ld directories!", + files_created, files_total_size/(1024*1024*1024.0), + dirs_created); + + init_destroy(i); + + Put("You are ready to fire up the test now"); + + return ret; +} diff --git a/ioreplay/src/init/init.h b/ioreplay/src/init/init.h new file mode 100644 index 0000000..3d9f9e9 --- /dev/null +++ b/ioreplay/src/init/init.h @@ -0,0 +1,64 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef INIT_H +#define INIT_H + +#include "../defaults.h" +#include "../options.h" +#include "../datas/amap.h" +#include "../datas/rbuffer.h" +#include "../mounts.h" + +typedef struct init_s_ { + amap_s *threads_map; + rbuffer_s *reuse_queue; + options_s *opts; + mounts_s *mounts; + FILE *replay_fd; + pthread_mutex_t reuse_queue_mutex; +} init_s; + +/** + * @brief Creates a new init object + * + * @param opts The options object + * @return The new mounts object + */ +init_s* init_new(options_s *opts); + +/** + * @brief Destroys the init object + * + * @param i The init object + */ +void init_destroy(init_s *i); + +/** + * @brief Initialises the test environment + * + * @param opts The options object + * @return SUCCESS if initialised without any issues + */ +status_e init_run(options_s *opts); + +/** + * @brief Extracts some useful information from the .replay meta header + * + * @param i The init object + * @param init_offset To store the offset of the init section + */ +void init_extract_header(init_s *i, off_t *init_offset); + +#endif // INIT_H diff --git a/ioreplay/src/init/itask.c b/ioreplay/src/init/itask.c new file mode 100644 index 0000000..f04ce33 --- /dev/null +++ b/ioreplay/src/init/itask.c @@ -0,0 +1,66 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "itask.h" + +itask_s* itask_new() +{ + itask_s *task = Malloc(itask_s); + + task->path = NULL; + itask_reset_stats(task); + + return task; +} + +void itask_destroy(itask_s *task) +{ + if (task->path) + free(task->path); + + free(task); +} + +void itask_reset_stats(itask_s *task) +{ + task->is_dir = task->is_file = false; + task->sizes_created = task->vsize = 0; + task->dirs_created = task->files_created = 0; + + if (task->path) { + free(task->path); + task->path = NULL; + } +} + +void itask_extract_stats(itask_s *task, long* dirs_created, long *files_created, + long *files_total_size) +{ + *dirs_created += task->dirs_created; + *files_created += task->files_created; + *files_total_size += task->sizes_created; + + if (*dirs_created < 0 || *files_created < 0 || *files_total_size < 0) { + Error("Size overflow"); + } + + itask_reset_stats(task); +} + +void itask_print(itask_s *task) +{ + Put("itask(%p): is_dir:%d is_file:%d vsize:%ld path:%s", + (void*)task, task->is_dir, task->is_file, + task->vsize, task->path); +} diff --git a/ioreplay/src/init/itask.h b/ioreplay/src/init/itask.h new file mode 100644 index 0000000..b10d515 --- /dev/null +++ b/ioreplay/src/init/itask.h @@ -0,0 +1,72 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ITASK_H +#define ITASK_H + +#include "../defaults.h" + +/** + * @brief The initialise task definition + */ +typedef struct itask_s_ { + bool is_dir; + bool is_file; + long vsize; + char *path; + long dirs_created; + long files_created; + long sizes_created; +} itask_s; + +/** + * @brief Creates a new task object + * + * @return The new task object + */ +itask_s* itask_new(); + +/** + * @brief Resets the task stats + * + * @param task The itask object + */ +void itask_reset_stats(itask_s *task); + +/** + * @brief Extract stats from a task object + * + * @param task The itask object + * @param dirs_created Adds count of dirs created to that variable + * @param files_created Adds count of files created to that variable + * @param files_total_size Adds size of files created to that variable + */ +void itask_extract_stats(itask_s *task, long* dirs_created, long *files_created, + long *files_total_size); + +/** + * @brief Destroys a given task object + * + * @param task The task object + */ +void itask_destroy(itask_s *task); + +/** + * @brief Prints a task to stdout + * + * @param task The task object + */ +void itask_print(itask_s *task); + +#endif // ITASK_H diff --git a/ioreplay/src/init/ithread.c b/ioreplay/src/init/ithread.c new file mode 100644 index 0000000..a580e70 --- /dev/null +++ b/ioreplay/src/init/ithread.c @@ -0,0 +1,99 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ithread.h" + +#include "itask.h" +#include "../utils/futils.h" + + +void* ithread_pthread_start(void *data) +{ + ithread_s *t = data; + init_s *i = t->init; + itask_s *task = NULL; + + do { + while (NULL != (task = rbuffer_get_next(t->queue))) { + ithread_run_task(t, task); + + // We need to mutex lock the reuse_queue as multiple threads + // can insert into it + pthread_mutex_lock(&i->reuse_queue_mutex); + int ret = rbuffer_insert(i->reuse_queue, task); + pthread_mutex_unlock(&i->reuse_queue_mutex); + if (!ret) + itask_destroy(task); + } + usleep(100); + } while (!t->terminate); + + while (NULL != (task = rbuffer_get_next(t->queue))) { + ithread_run_task(t, task); + if (!rbuffer_insert(i->reuse_queue, task)) + itask_destroy(task); + + pthread_mutex_lock(&i->reuse_queue_mutex); + int ret = rbuffer_insert(i->reuse_queue, task); + pthread_mutex_unlock(&i->reuse_queue_mutex); + if (!ret) + itask_destroy(task); + } + + return NULL; +} + +ithread_s* ithread_new(init_s *i) +{ + ithread_s *t = Malloc(ithread_s); + + t->init = i; + t->queue = rbuffer_new(1024); + t->terminate = false; + + return t; +} + +void ithread_start(ithread_s *t) +{ + start_pthread(&t->pthread, ithread_pthread_start, (void*)t); +} + +void ithread_destroy(ithread_s *t) +{ + rbuffer_destroy(t->queue); + free(t); +} + +void ithread_terminate(ithread_s *t) +{ + t->terminate = true; + pthread_join(t->pthread, NULL); +} + +void ithread_run_task(ithread_s *t, itask_s *task) +{ + if (task->is_dir) { + task->dirs_created += ensure_dir_exists(task->path); + + } else if (task->is_file) { + if (!ensure_file_exists(task->path, &task->dirs_created)) { + task->files_created++; + if (task->vsize > 0) { + append_random_to_file(task->path, task->vsize); + task->sizes_created += task->vsize; + } + } + } +} diff --git a/ioreplay/src/init/ithread.h b/ioreplay/src/init/ithread.h new file mode 100644 index 0000000..0884519 --- /dev/null +++ b/ioreplay/src/init/ithread.h @@ -0,0 +1,86 @@ +// Copyright 2018 Mimecast Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ITHREAD_H +#define ITHREAD_H + +#include "../defaults.h" +#include "../datas/rbuffer.h" + +#include "init.h" +#include "itask.h" + +#include <pthread.h> + +/** + * @brief Definition of an init thread + * + */ +typedef struct ithread_s_ { + pthread_t pthread; /**< We run the init tasks in concurrent pthreads */ + rbuffer_s *queue; /**< The thread's task queue */ + init_s *init; /**< The responsible init object */ + bool terminate; /**< Indicates that thread can terminate */ +} ithread_s; + +/** + * @brief Creates a new thread object + * + * @param i The init object + * @return The new thread object + */ +ithread_s* ithread_new(init_s *i); + +/** + * @brief Terminates the thread + * + * This function waits (via join) for the pthread to complete all its + * current tasks from the queue. + * + * @param t The thread object + */ +void ithread_terminate(ithread_s* t); + +/** + * @brief Destroys the thread object + * + * @param t The thread object + */ +void ithread_destroy(ithread_s* t); + +/** + * @brief Executes the init task + * + * @param t The thread object + * @param task The task object + */ +void ithread_run_task(ithread_s* t, itask_s *task); + +/** + * @brief Starts the POSIX thread + * + * @param t The responsible thread object + */ +void ithread_start(ithread_s *t); + +/** + * @brief Entry point of the POSIX thread + * + * @param data Data passed to the pthread + * @return Always NULL on success + */ + +void* ithread_pthread_start(void *data); + +#endif // ITHREAD_H |
