summaryrefslogtreecommitdiff
path: root/ioreplay/src/replay/rworker.c
diff options
context:
space:
mode:
authorPaul Bütow <pbuetow@mimecast.com>2018-03-01 11:21:26 +0000
committerPaul Bütow <pbuetow@mimecast.com>2018-03-01 11:21:26 +0000
commit56f8cdff9aaa9bf00c5dc9441a7569374f2cbafb (patch)
treeb5b440b504b9879e241733fa38d19089fb3377b2 /ioreplay/src/replay/rworker.c
initial commit0.1
Diffstat (limited to 'ioreplay/src/replay/rworker.c')
-rw-r--r--ioreplay/src/replay/rworker.c360
1 files changed, 360 insertions, 0 deletions
diff --git a/ioreplay/src/replay/rworker.c b/ioreplay/src/replay/rworker.c
new file mode 100644
index 0000000..5a50ada
--- /dev/null
+++ b/ioreplay/src/replay/rworker.c
@@ -0,0 +1,360 @@
+// Copyright 2018 Mimecast Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "rworker.h"
+
+#include "../datas/stack.h"
+#include "rprocess.h"
+#include "rthread.h"
+
+#define _Compute_current_time(now) \
+ (now.tv_sec - start_time.tv_sec) * 1000 \
+ + (now.tv_usec - start_time.tv_usec) / 1000
+
+
+/**
+ * @brief A callback helper function for destroying all virtual process objects
+ *
+ * @param data The process object.
+ */
+static void _rprocess_destroy_cb(void *data)
+{
+ rprocess_destroy(data);
+}
+
+rworker_s* rworker_new(const int rworker_num, amap_s *fds_map,
+ const long num_vsizes, const long num_pids,
+ options_s *opts, rworker_stats_s *worker_stats)
+{
+ rworker_s *w = Malloc(rworker_s);
+
+#ifdef THREAD_DEBUG
+ char *rworker_log = Calloc(1024, char);
+ snprintf(rworker_log, 1023, "/tmp/ioreplay/_worker%d.debuglog",
+ rworker_num);
+
+ w->rworker_fd = Fopen(rworker_log, "a");
+ free(rworker_log);
+ fprintf(w->rworker_fd, "DEBUG: Started worker\n");
+#endif
+
+ w->rworker_num = rworker_num;
+ w->opts = opts;
+ w->fds_map = fds_map;
+
+ w->rprocess_map = amap_new(num_pids);
+ w->rthread_map = amap_new(num_vsizes);
+ w->task_buffer = rbuffer_new(opts->num_threads_per_worker
+ *TASK_BUFFER_PER_THREAD);
+ w->rthread_buffer = rbuffer_new(opts->num_threads_per_worker);
+ w->worker_stats = worker_stats;
+
+ // Attach a cleanup callback function to the worker map.
+ w->rprocess_map->data_destroy = _rprocess_destroy_cb;
+
+ pthread_mutex_init(&w->rthread_buffer_mutex, NULL);
+ pthread_mutex_init(&w->task_buffer_mutex, NULL);
+
+ // TODO: Check in the program whether the ulimit is high enough
+ // or not! (ulimit -n)
+
+ return w;
+}
+
+/**
+ * @brief Destroys the object
+ *
+ * Destroys the worker object (frees all memory allocated by the worker)
+ *
+ * @param w The worker object
+ */
+void rworker_destroy(rworker_s *w)
+{
+ if (!w)
+ return;
+
+ if (w->rprocess_map)
+ amap_destroy(w->rprocess_map);
+ if (w->rthread_map)
+ amap_destroy(w->rthread_map);
+
+ if (w->task_buffer) {
+ rtask_s *task = NULL;
+ while (NULL != (task = rbuffer_get_next(w->task_buffer)))
+ rtask_destroy(task);
+ rbuffer_destroy(w->task_buffer);
+ }
+
+ if (w->rthread_buffer)
+ rbuffer_destroy(w->rthread_buffer);
+
+ pthread_mutex_destroy(&w->task_buffer_mutex);
+ pthread_mutex_destroy(&w->rthread_buffer_mutex);
+
+#ifdef THREAD_DEBUG
+ if (w->rworker_fd)
+ fclose(w->rworker_fd);
+#endif
+
+ free(w);
+}
+
+status_e rworker_process_lines(rworker_s* w, const long num_lines)
+{
+ Out("worker(%d): Starting to process replay lines\n", w->rworker_num);
+
+ options_s *opts = w->opts;
+ FILE *replay_fd = Fopen(opts->replay_file, "r");
+
+ // Drop root privileges, otherwise we may overwrite other system
+ // files by accident in case of a bug or user error!
+ drop_root(opts->user);
+
+ // Variables required for the time based caluclations
+ struct timeval now, start_time;
+ long current_time = 0, stats_time = 0;
+ gettimeofday(&start_time, NULL);
+
+ // Helper variables required for reading lines
+ char *line = NULL;
+ char *next = NULL, *next2 = NULL;
+ size_t len = 0, read = 0;
+
+ // Helpers required for threading
+ rthread_s *t = NULL;
+ stack_s *all_threads = stack_new();
+ rworker_stats_s *s = w->worker_stats;
+
+ // More helper variables
+ //unsigned long lineno = 0, stats_ioop = 0, vsize_id = 0;
+ unsigned long lineno = 0, vsize_id = 0;
+ long pid = -1, time = -1;
+
+ // Process the .replay file line by line.
+ while ((read = getline(&line, &len, replay_fd)) != -1) {
+ lineno++;
+
+ if (read >= MAX_LINE_LEN) {
+ Error("line:%lu Exceeded max line len", lineno);
+ }
+
+ // If the line begins with #: Ignore that line, it contains
+ // debug or meta information or comments.
+
+ if (line[0] == '#') {
+ if (line[1] == 'I') {
+ // We stop replaying I/O once we reach the line '#INIT'
+ // which incitates the begin of the INIT section.
+ break;
+ }
+ continue;
+ }
+
+#ifdef THREAD_DEBUG
+ char *clone = Clone(line);
+#endif
+
+ next = strchr(line, '|');
+ Error_if(!next, "lineno:%ld Could not parse time from input file",
+ lineno);
+ next[0] = '\0';
+ next++;
+ time = atol(line);
+
+ next2 = strchr(next, '|');
+ Error_if(!next2, "Could not parse vsize_id from input file");
+ next2[0] = '\0';
+ next2++;
+ vsize_id = atol(next);
+
+ // This worker is not responsible for this line, skip it!
+ if ((vsize_id % opts->num_workers) != w->rworker_num) {
+#ifdef THREAD_DEBUG
+ free(clone);
+#endif
+ continue;
+ }
+
+ next = strchr(next2, '|');
+ Error_if(!next, "Could not parse PID from input file");
+ next[0] = '\0';
+ next++;
+ pid = atol(next2);
+
+ gettimeofday(&now, NULL);
+ current_time = _Compute_current_time(now);
+
+ // Check whether the user specified a replay speed factor. If so, we
+ // may need to throttle down a bit.
+
+ if (opts->speed_factor) {
+ s->time_ahead = time / opts->speed_factor - current_time;
+ if (s->time_ahead > 0)
+ usleep(s->time_ahead*1000);
+
+ } else {
+ s->time_ahead = time - current_time;
+ }
+
+ // Get the responsible process object. The process object holds data
+ // structures usually found in a Linux process, e.g. a table of open
+ // file descriptors.
+
+ rprocess_s *p = amap_get(w->rprocess_map, pid);
+ if (p == NULL) {
+ p = rprocess_new(pid, w->fds_map);
+ amap_set(w->rprocess_map, pid, p);
+ }
+ p->lineno = lineno;
+
+ if (opts->num_threads_per_worker == 1) {
+ // Single threaded mode?
+ if (!t)
+ t = rthread_new(vsize_id, w);
+ else
+ rthread_update(t, vsize_id);
+
+ } else {
+ t = amap_get(w->rthread_map, vsize_id);
+ }
+
+ if (t == NULL) {
+
+ // First try to recycle an old (likely unused) thread
+ if (NULL != (t = rbuffer_get_next(w->rthread_buffer))) {
+ rthread_update(t, vsize_id);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n");
+ fflush(w->rworker_fd);
+#endif
+
+ } else if (opts->num_threads_per_worker <= all_threads->size) {
+ // Reached max threads, waiting until one becomes available
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Reached max threads\n");
+ fflush(w->rworker_fd);
+#endif
+ while (NULL == (t = rbuffer_get_next(w->rthread_buffer)))
+ usleep(1000);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Reused an idling thread\n");
+ fflush(w->rworker_fd);
+#endif
+
+ rthread_update(t, vsize_id);
+
+ } else {
+ t = rthread_new(vsize_id, w);
+
+ // We hold a pointer to all created threads in a stack. This
+ // stack is later used to terminate/join all therads.
+ stack_push(all_threads, t);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Created a new thread\n");
+ fflush(w->rworker_fd);
+#endif
+ }
+
+ amap_set(w->rthread_map, vsize_id, t);
+ }
+
+ // Create a new task for the thread. The task contains all required
+ // information to run an I/O operation. However, first try to
+ // reuse/recycle a task object! If there is no such, create a new one.
+
+ rtask_s *task = rbuffer_get_next(w->task_buffer);
+ if (!task)
+ task = rtask_new();
+ rtask_update(task, w, p, next, lineno, vsize_id);
+ s->ioops++;
+
+
+#ifdef THREAD_DEBUG
+ task->clone = clone;
+ fprintf(w->rworker_fd, "DEBUG: Inserting new task\n");
+ fflush(w->rworker_fd);
+#endif
+
+ // Insert that task to a ring buffer to pass it to the pthread without
+ // much synchronisation overhead!
+
+ while (!rthread_insert_task(t, task))
+ // The ring buffer is full. This may happen if the pthread didn't
+ // manage to process tasks fast enough. re-try after a short period!
+ usleep(1000);
+
+#ifdef THREAD_DEBUG
+ fprintf(w->rworker_fd, "DEBUG: Task inserted\n");
+ fflush(w->rworker_fd);
+#endif
+
+ // The worker prints out stats every 3s
+ if (current_time - stats_time > 3000) {
+ // IDEA: Maybe refactor this block to be implemented in rstats.c
+
+ double loadavg = get_loadavg();
+
+ // Determines whether we replay the I/O faster or slower than
+ // original speed!
+ char *a_b = s->time_ahead >= 0 ? "ahead" : "behind";
+
+ Put("worker(%d): threads:%ld %s:%lds progress:%0.2f%% "
+ "loadavg:%0.2f",
+ w->rworker_num, all_threads->size, a_b, Abs(s->time_ahead/1000),
+ Perc(lineno,num_lines), loadavg);
+
+ stats_time = current_time;
+ //stats_ioop = lineno;
+
+ if (s->loadavg_high < loadavg)
+ s->loadavg_high = loadavg;
+ }
+ }
+
+ Put("worker(%d): Waiting for all threads to finish business...",
+ w->rworker_num);
+
+ // This will wait (join) all threads one after another until all threads
+ // have finished their work and have terminated.
+
+ while (!stack_is_empty(all_threads)) {
+ rthread_s *t = stack_pop(all_threads);
+ rthread_terminate(t);
+ rthread_destroy(t);
+ }
+ stack_destroy(all_threads);
+
+ // Collect some stats last time
+ double loadavg = get_loadavg();
+ if (s->loadavg_high < loadavg)
+ s->loadavg_high = loadavg;
+
+ gettimeofday(&now, NULL);
+ current_time = _Compute_current_time(now);
+ if (opts->speed_factor) {
+ s->time_ahead = time / opts->speed_factor - current_time;
+ } else {
+ s->time_ahead = time - current_time;
+ }
+
+
+ Put("worker(%d): All threads terminated!", w->rworker_num);
+ fclose(replay_fd);
+
+ return SUCCESS;
+}