summaryrefslogtreecommitdiff
path: root/ioreplay/src/init
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2018-03-01 11:21:26 +0000
committerPaul Bütow <pbuetow@mimecast.com>2018-03-01 11:21:26 +0000
commit56f8cdff9aaa9bf00c5dc9441a7569374f2cbafb (patch)
treeb5b440b504b9879e241733fa38d19089fb3377b2 /ioreplay/src/init
initial commit0.1
Diffstat (limited to 'ioreplay/src/init')
-rw-r--r--ioreplay/src/init/init.c226
-rw-r--r--ioreplay/src/init/init.h64
-rw-r--r--ioreplay/src/init/itask.c66
-rw-r--r--ioreplay/src/init/itask.h72
-rw-r--r--ioreplay/src/init/ithread.c99
-rw-r--r--ioreplay/src/init/ithread.h86
6 files changed, 613 insertions, 0 deletions
diff --git a/ioreplay/src/init/init.c b/ioreplay/src/init/init.c
new file mode 100644
index 0000000..988729e
--- /dev/null
+++ b/ioreplay/src/init/init.c
@@ -0,0 +1,226 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "init.h"
+
+#include "../datas/stack.h"
+#include "itask.h"
+#include "ithread.h"
+#include "../meta/meta.h"
+#include "../mounts.h"
+#include "../utils/futils.h"
+
+
+init_s *init_new(options_s *opts)
+{
+ init_s *i = Malloc(init_s);
+
+ i->opts = opts;
+ i->mounts = mounts_new(opts);
+ i->threads_map = amap_new(i->mounts->count);
+ i->reuse_queue = rbuffer_new(4096);
+ i->replay_fd = Fopen(opts->replay_file, "r");
+
+ pthread_mutex_init(&i->reuse_queue_mutex, NULL);
+
+ return i;
+}
+
+void init_destroy(init_s *i)
+{
+ amap_destroy(i->threads_map);
+ mounts_destroy(i->mounts);
+
+ itask_s *task = NULL;
+ while (NULL != (task = rbuffer_get_next(i->reuse_queue))) {
+ itask_destroy(task);
+ }
+ rbuffer_destroy(i->reuse_queue);
+
+ fclose(i->replay_fd);
+ pthread_mutex_destroy(&i->reuse_queue_mutex);
+
+ free(i);
+}
+
+void init_extract_header(init_s *i, off_t *init_offset)
+{
+ options_s *opts = i->opts;
+ meta_s *m = meta_new(i->replay_fd);
+ meta_read_start(m);
+
+ long version = 0;
+ if (meta_read_l(m, "version", &version)) {
+ Put("Replay version is '%ld'", version);
+ if (version != REPLAY_VERSION) {
+ Error(".replay file of incompatible version, got %x, expected %x",
+ (int)version, REPLAY_VERSION);
+ }
+ }
+
+ char *user;
+ if (meta_read_s(m, "user", &user)) {
+ Put("Setting user to '%s'", user);
+ opts->user = user;
+ }
+
+ char *name;
+ if (meta_read_s(m, "name", &name)) {
+ Put("Setting name to '%s'", name);
+ opts->name = name;
+ }
+
+ if (meta_read_l(m, "init_offset", init_offset)) {
+ if (*init_offset < 0) {
+ Error("Offset overflow (init offset too large in .replay)");
+ }
+ Put("Setting init offset to '%ld'", *init_offset);
+ }
+
+ meta_destroy(m);
+}
+
+status_e init_run(options_s *opts)
+{
+ status_e ret = SUCCESS;
+ init_s *i = init_new(opts);
+
+ off_t init_offset;
+ init_extract_header(i, &init_offset);
+
+ // Ensure that all ./replay/NAME directories exist
+ mounts_init(i->mounts);
+
+ // Don't do messy stuff as super user
+ drop_root(opts->user);
+
+ // We need to clean up garbish from previous runs!
+ if (opts->purge)
+ mounts_purge(i->mounts);
+ else
+ mounts_trash(i->mounts);
+
+ Out("Creating all files and directories requried for test '%s'...",
+ opts->name);
+
+ // Seek to the INIT section
+ fseeko(i->replay_fd, init_offset, SEEK_SET);
+
+ bool is_file = false, is_dir = false;
+ long vsize = 0;
+ char *path;
+
+ // Stats
+ long dirs_created = 0;
+ long files_created = 0;
+ long files_total_size = 0;
+
+ // Helper variables for getline
+ char *line = NULL;
+ size_t len = 0, read = 0;
+ char *saveptr;
+
+ stack_s *all_threads = stack_new();
+
+ // Process the INIT section of the .replay file line by line.
+
+ while ((read = getline(&line, &len, i->replay_fd)) != -1) {
+ char *tok = strtok_r(line, "|", &saveptr);
+
+ for (int ntok = 0; tok; ntok++) {
+ switch (ntok) {
+ case 0:
+ is_dir = atoi(tok) == 1;
+ break;
+ case 1:
+ is_file = atoi(tok) == 1;
+ break;
+ case 2:
+ vsize = atol(tok);
+ if (vsize < 0) {
+ Error("Size overflow");
+ }
+ break;
+ case 3:
+ path = tok;
+ break;
+ default:
+ break;
+ }
+
+ tok = strtok_r(NULL, "|", &saveptr);
+ }
+
+ itask_s *task = rbuffer_get_next(i->reuse_queue);
+
+ if (!task) {
+ task = itask_new();
+
+ } else {
+ itask_extract_stats(task, &dirs_created, &files_created,
+ &files_total_size);
+ }
+
+ // Set new task values
+ if (is_dir) {
+ task->is_dir = true;
+
+ } else if (is_file) {
+ task->is_file = true;
+ task->vsize = vsize;
+ }
+ task->path = Clone(path);
+
+ // We run one init thread per mount point
+ int mnr = mounts_get_mountnumber(i->mounts, path);
+ ithread_s *t = amap_get(i->threads_map, mnr);
+
+ if (!t) {
+ t = ithread_new(i);
+ amap_set(i->threads_map, mnr, t);
+ stack_push(all_threads, t);
+ ithread_start(t);
+ }
+
+ //itask_print(task);
+ while (!rbuffer_insert(t->queue, task))
+ usleep(1000);
+ }
+
+ ithread_s *t = NULL;
+ while (NULL != (t = stack_pop(all_threads))) {
+ ithread_terminate(t);
+ ithread_destroy(t);
+ }
+ stack_destroy(all_threads);
+
+ itask_s *task = NULL;
+ while (NULL != (task = rbuffer_get_next(i->reuse_queue))) {
+ itask_extract_stats(task, &dirs_created, &files_created,
+ &files_total_size);
+ itask_destroy(task);
+ }
+
+ Put("Done!");
+
+ Put("Created %ld files (net total size: %.2fg) and %ld directories!",
+ files_created, files_total_size/(1024*1024*1024.0),
+ dirs_created);
+
+ init_destroy(i);
+
+ Put("You are ready to fire up the test now");
+
+ return ret;
+}
diff --git a/ioreplay/src/init/init.h b/ioreplay/src/init/init.h
new file mode 100644
index 0000000..3d9f9e9
--- /dev/null
+++ b/ioreplay/src/init/init.h
@@ -0,0 +1,64 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef INIT_H
+#define INIT_H
+
+#include "../defaults.h"
+#include "../options.h"
+#include "../datas/amap.h"
+#include "../datas/rbuffer.h"
+#include "../mounts.h"
+
+typedef struct init_s_ {
+ amap_s *threads_map;
+ rbuffer_s *reuse_queue;
+ options_s *opts;
+ mounts_s *mounts;
+ FILE *replay_fd;
+ pthread_mutex_t reuse_queue_mutex;
+} init_s;
+
+/**
+ * @brief Creates a new init object
+ *
+ * @param opts The options object
+ * @return The new mounts object
+ */
+init_s* init_new(options_s *opts);
+
+/**
+ * @brief Destroys the init object
+ *
+ * @param i The init object
+ */
+void init_destroy(init_s *i);
+
+/**
+ * @brief Initialises the test environment
+ *
+ * @param opts The options object
+ * @return SUCCESS if initialised without any issues
+ */
+status_e init_run(options_s *opts);
+
+/**
+ * @brief Extracts some useful information from the .replay meta header
+ *
+ * @param i The init object
+ * @param init_offset To store the offset of the init section
+ */
+void init_extract_header(init_s *i, off_t *init_offset);
+
+#endif // INIT_H
diff --git a/ioreplay/src/init/itask.c b/ioreplay/src/init/itask.c
new file mode 100644
index 0000000..f04ce33
--- /dev/null
+++ b/ioreplay/src/init/itask.c
@@ -0,0 +1,66 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "itask.h"
+
+itask_s* itask_new()
+{
+ itask_s *task = Malloc(itask_s);
+
+ task->path = NULL;
+ itask_reset_stats(task);
+
+ return task;
+}
+
+void itask_destroy(itask_s *task)
+{
+ if (task->path)
+ free(task->path);
+
+ free(task);
+}
+
+void itask_reset_stats(itask_s *task)
+{
+ task->is_dir = task->is_file = false;
+ task->sizes_created = task->vsize = 0;
+ task->dirs_created = task->files_created = 0;
+
+ if (task->path) {
+ free(task->path);
+ task->path = NULL;
+ }
+}
+
+void itask_extract_stats(itask_s *task, long* dirs_created, long *files_created,
+ long *files_total_size)
+{
+ *dirs_created += task->dirs_created;
+ *files_created += task->files_created;
+ *files_total_size += task->sizes_created;
+
+ if (*dirs_created < 0 || *files_created < 0 || *files_total_size < 0) {
+ Error("Size overflow");
+ }
+
+ itask_reset_stats(task);
+}
+
+void itask_print(itask_s *task)
+{
+ Put("itask(%p): is_dir:%d is_file:%d vsize:%ld path:%s",
+ (void*)task, task->is_dir, task->is_file,
+ task->vsize, task->path);
+}
diff --git a/ioreplay/src/init/itask.h b/ioreplay/src/init/itask.h
new file mode 100644
index 0000000..b10d515
--- /dev/null
+++ b/ioreplay/src/init/itask.h
@@ -0,0 +1,72 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef ITASK_H
+#define ITASK_H
+
+#include "../defaults.h"
+
+/**
+ * @brief The initialise task definition
+ */
+typedef struct itask_s_ {
+ bool is_dir;
+ bool is_file;
+ long vsize;
+ char *path;
+ long dirs_created;
+ long files_created;
+ long sizes_created;
+} itask_s;
+
+/**
+ * @brief Creates a new task object
+ *
+ * @return The new task object
+ */
+itask_s* itask_new();
+
+/**
+ * @brief Resets the task stats
+ *
+ * @param task The itask object
+ */
+void itask_reset_stats(itask_s *task);
+
+/**
+ * @brief Extract stats from a task object
+ *
+ * @param task The itask object
+ * @param dirs_created Adds count of dirs created to that variable
+ * @param files_created Adds count of files created to that variable
+ * @param files_total_size Adds size of files created to that variable
+ */
+void itask_extract_stats(itask_s *task, long* dirs_created, long *files_created,
+ long *files_total_size);
+
+/**
+ * @brief Destroys a given task object
+ *
+ * @param task The task object
+ */
+void itask_destroy(itask_s *task);
+
+/**
+ * @brief Prints a task to stdout
+ *
+ * @param task The task object
+ */
+void itask_print(itask_s *task);
+
+#endif // ITASK_H
diff --git a/ioreplay/src/init/ithread.c b/ioreplay/src/init/ithread.c
new file mode 100644
index 0000000..a580e70
--- /dev/null
+++ b/ioreplay/src/init/ithread.c
@@ -0,0 +1,99 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "ithread.h"
+
+#include "itask.h"
+#include "../utils/futils.h"
+
+
+void* ithread_pthread_start(void *data)
+{
+ ithread_s *t = data;
+ init_s *i = t->init;
+ itask_s *task = NULL;
+
+ do {
+ while (NULL != (task = rbuffer_get_next(t->queue))) {
+ ithread_run_task(t, task);
+
+ // We need to mutex lock the reuse_queue as multiple threads
+ // can insert into it
+ pthread_mutex_lock(&i->reuse_queue_mutex);
+ int ret = rbuffer_insert(i->reuse_queue, task);
+ pthread_mutex_unlock(&i->reuse_queue_mutex);
+ if (!ret)
+ itask_destroy(task);
+ }
+ usleep(100);
+ } while (!t->terminate);
+
+ while (NULL != (task = rbuffer_get_next(t->queue))) {
+ ithread_run_task(t, task);
+ if (!rbuffer_insert(i->reuse_queue, task))
+ itask_destroy(task);
+
+ pthread_mutex_lock(&i->reuse_queue_mutex);
+ int ret = rbuffer_insert(i->reuse_queue, task);
+ pthread_mutex_unlock(&i->reuse_queue_mutex);
+ if (!ret)
+ itask_destroy(task);
+ }
+
+ return NULL;
+}
+
+ithread_s* ithread_new(init_s *i)
+{
+ ithread_s *t = Malloc(ithread_s);
+
+ t->init = i;
+ t->queue = rbuffer_new(1024);
+ t->terminate = false;
+
+ return t;
+}
+
+void ithread_start(ithread_s *t)
+{
+ start_pthread(&t->pthread, ithread_pthread_start, (void*)t);
+}
+
+void ithread_destroy(ithread_s *t)
+{
+ rbuffer_destroy(t->queue);
+ free(t);
+}
+
+void ithread_terminate(ithread_s *t)
+{
+ t->terminate = true;
+ pthread_join(t->pthread, NULL);
+}
+
+void ithread_run_task(ithread_s *t, itask_s *task)
+{
+ if (task->is_dir) {
+ task->dirs_created += ensure_dir_exists(task->path);
+
+ } else if (task->is_file) {
+ if (!ensure_file_exists(task->path, &task->dirs_created)) {
+ task->files_created++;
+ if (task->vsize > 0) {
+ append_random_to_file(task->path, task->vsize);
+ task->sizes_created += task->vsize;
+ }
+ }
+ }
+}
diff --git a/ioreplay/src/init/ithread.h b/ioreplay/src/init/ithread.h
new file mode 100644
index 0000000..0884519
--- /dev/null
+++ b/ioreplay/src/init/ithread.h
@@ -0,0 +1,86 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef ITHREAD_H
+#define ITHREAD_H
+
+#include "../defaults.h"
+#include "../datas/rbuffer.h"
+
+#include "init.h"
+#include "itask.h"
+
+#include <pthread.h>
+
+/**
+ * @brief Definition of an init thread
+ *
+ */
+typedef struct ithread_s_ {
+ pthread_t pthread; /**< We run the init tasks in concurrent pthreads */
+ rbuffer_s *queue; /**< The thread's task queue */
+ init_s *init; /**< The responsible init object */
+ bool terminate; /**< Indicates that thread can terminate */
+} ithread_s;
+
+/**
+ * @brief Creates a new thread object
+ *
+ * @param i The init object
+ * @return The new thread object
+ */
+ithread_s* ithread_new(init_s *i);
+
+/**
+ * @brief Terminates the thread
+ *
+ * This function waits (via join) for the pthread to complete all its
+ * current tasks from the queue.
+ *
+ * @param t The thread object
+ */
+void ithread_terminate(ithread_s* t);
+
+/**
+ * @brief Destroys the thread object
+ *
+ * @param t The thread object
+ */
+void ithread_destroy(ithread_s* t);
+
+/**
+ * @brief Executes the init task
+ *
+ * @param t The thread object
+ * @param task The task object
+ */
+void ithread_run_task(ithread_s* t, itask_s *task);
+
+/**
+ * @brief Starts the POSIX thread
+ *
+ * @param t The responsible thread object
+ */
+void ithread_start(ithread_s *t);
+
+/**
+ * @brief Entry point of the POSIX thread
+ *
+ * @param data Data passed to the pthread
+ * @return Always NULL on success
+ */
+
+void* ithread_pthread_start(void *data);
+
+#endif // ITHREAD_H