[Gluster-devel] libgfapi zero copy write - application in samba, nfs-ganesha

Saravanakumar Arumugam sarumuga at redhat.com
Mon Sep 26 13:48:26 UTC 2016


Hi,

I have carried out "basic" performance measurement with zero copy write 
APIs.
Throughput of zero copy write  is 57 MB/sec  vs default write 43 MB/sec.*
*( I have modified Ben England's gfapi_perf_test.c for this. Attached 
the same for reference )

We would like to hear how samba/ nfs-ganesha who are libgfapi users can 
make use of this.
Please provide your comments. Refer attached results.

Zero copy in write patch: http://review.gluster.org/#/c/14784/

Thanks,
Saravana


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.gluster.org/pipermail/gluster-devel/attachments/20160926/8122ac44/attachment-0001.html>
-------------- next part --------------
ZERO COPY write:

[root at gfvm3 parallel-libgfapi]# sync; echo 3 > /proc/sys/vm/drop_caches [root at gfvm3 parallel-libgfapi]# 
[root at gfvm3 parallel-libgfapi]# 
root at gfvm3 parallel-libgfapi]# DEBUG=0  GFAPI_VOLNAME=tv1 GFAPI_FSZ=1048576  GFAPI_FILES=3   GFAPI_HOSTNAME=gfvm3 GFAPI_BASEDIR=gluster_tmp ./gfapi_perf_test        
prm.debug: 0
GLUSTER: 
  volume=tv1
  transport=tcp
  host=gfvm3
  port=24007
  fuse?No
  trace level=0
  start timeout=60
WORKLOAD:
  type = seq-wr 
  threads/proc = 1
  base directory = gluster_tmp
  prefix=f
  file size = 1048576 KB
  file count = 3
  record size = 64 KB
  files/dir=1000
  fsync-at-close? No 
zero copy writezero copy writezero copy writethread   0:   files written = 3
  files done = 3
  I/O (record) transfers = 49152
  total bytes = 3221225472
  elapsed time    = 53.74     sec
  throughput      = 57.16     MB/sec
  IOPS            = 914.58    (sequential write)
aggregate:   files written = 3
  files done = 3
  I/O (record) transfers = 49152
  total bytes = 3221225472
  elapsed time    = 53.74     sec
  throughput      = 57.16     MB/sec
  IOPS            = 914.58    (sequential write)
[root at gfvm3 parallel-libgfapi]# 

Default write: 

[root at gfvm3 parallel-libgfapi]# sync; echo 3 > /proc/sys/vm/drop_caches 

[root at gfvm3 parallel-libgfapi]# DEBUG=0  GFAPI_VOLNAME=tv1 GFAPI_FSZ=1048576  GFAPI_FILES=3   GFAPI_HOSTNAME=gfvm3 GFAPI_BASEDIR=gluster_tmp ./gfapi_perf_test        
prm.debug: 0
GLUSTER: 
  volume=tv1
  transport=tcp
  host=gfvm3
  port=24007
  fuse?No
  trace level=0
  start timeout=60
WORKLOAD:
  type = seq-wr 
  threads/proc = 1
  base directory = gluster_tmp
  prefix=f
  file size = 1048576 KB
  file count = 3
  record size = 64 KB
  files/dir=1000
  fsync-at-close? No 
thread   0:   files written = 3
  files done = 3
  I/O (record) transfers = 49152
  total bytes = 3221225472
  elapsed time    = 70.00     sec
  throughput      = 43.89     MB/sec
  IOPS            = 702.19    (sequential write)
aggregate:   files written = 3
  files done = 3
  I/O (record) transfers = 49152
  total bytes = 3221225472
  elapsed time    = 70.00     sec
  throughput      = 43.89     MB/sec
  IOPS            = 702.19    (sequential write)
[root at gfvm3 parallel-libgfapi]# 


-------------- next part --------------
/*
 * gfapi_perf_test.c - single-thread test of Gluster libgfapi file perf, enhanced to do small files also
 *
 * install the glusterfs-api RPM before trying to compile and link
 *
 * to compile: gcc -pthread -g -O0  -Wall --pedantic -o gfapi_perf_test -I /usr/include/glusterfs/api gfapi_perf_test.c  -lgfapi -lrt
 *
 * environment variables used as inputs, see usage() below
 *
 * NOTE: we allow random workloads to process a fraction of the entire file
 * this allows us to generate a file that will not fit in cache 
 * we then can do random I/O on a fraction of the data in that file, unlike iozone
 */

#define _GNU_SOURCE
#include <libgen.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>
#include <memory.h>
#include <malloc.h>
#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <time.h>
#include <math.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <pthread.h>
#include <fcntl.h>
#include "glfs.h"

#define NOTOK 1 /* process exit status indicates error of some sort */
#define OK 0    /* system call or process exit status indicating success */
#define KB_PER_MB 1024
#define BYTES_PER_KB 1024
#define BYTES_PER_MB (1024*1024)
#define KB_PER_MB 1024
#define NSEC_PER_SEC 1000000000.0
#define UINT64DFMT "%ld"

/* power of 2 corresponding to 4096-byte page boundary, used in memalign() call */
#define PAGE_BOUNDARY 12 

#define FOREACH(_index, _count) for(_index=0; _index < (_count); _index++)

/* last array element of workload_types must be NULL */
static const char * workload_types[] = 
   { "seq-wr", "seq-rd", "rnd-wr", "rnd-rd", "unlink", "seq-rdwrmix", NULL };
static const char * workload_description[] = 
   { "sequential write", "sequential read", "random write", "random read", "delete", "sequential read-write mix", NULL };
/* define numeric workload types as indexes into preceding array */
#define WL_SEQWR 0
#define WL_SEQRD 1
#define WL_RNDWR 2
#define WL_RNDRD 3
#define WL_DELETE 4
#define WL_SEQRDWRMIX 5

static glfs_t * glfs_p = NULL;

/* shared parameter values common to all threads */

struct gfapi_prm {
  int threads_per_proc;            /* threads spawned within each process */
  char * workload_str;             /* name of workload to run */
  int workload_type;               /* post-parse numeric code for workload - contains WL_something */
  unsigned usec_delay_per_file;    /* microseconds of delay between each file operation */
  int recsz;                       /* I/O transfer size (KB) */
  uint64_t filesz_kb;              /* file size (KB) */
  int filecount;                   /* how many files per thread */
  uint64_t io_requests;            /* if random I/O, how many I/O requests to issue per thread */
  int files_per_dir;               /* max files placed in each subdirectory beneath thread directory */
  float rdpct;                     /* read percentage for mixed workload */
  char * prefix;                   /* filename prefix (lets you run multiple creates in same directory) */
  char * thrd_basedir;             /* per-thread base directory */
  char * starting_gun_file;        /* name of file that tells threads to start running */
  int fsync_at_close;              /* on write tests, whether or not to call fsync() before close() */
  int use_fuse;                    /* if TRUE, use POSIX filesystem calls, otherwise use libgfapi. default libgfapi */
  int o_direct;                    /* use O_DIRECT flag? */
  int o_append;                    /* use O_APPEND flag? */
  int o_overwrite;                 /* overwrite the file instead of creating it? */
  unsigned bytes_to_xfer;          /* io size in bytes instead of KB */
  int trclvl;                      /* libgfapi tracing level */
  char * glfs_volname;             /* Gluster volume name */
  char * glfs_hostname;            /* Gluster server participating in that volume */
  char * glfs_transport;           /* transport protocol (RDMA or TCP, defaults to TCP) */
  int glfs_portnum;                /* port number (DO WE NEED?) */
  int open_flags;                  /* calculate flags to use with open or glfs_open */
  int starting_gun_timeout;        /* how long should threads wait for starting gun to fire */
  int debug;                       /* debugging messages */
};
static struct gfapi_prm prm = {0};  /* initializer ensures everything is zero (static probably is anyway) */

/* per-thread data structure */

struct gfapi_result {
  pthread_t thr;
  int thread_num;
  uint64_t elapsed_time, end_time, start_time;
  uint64_t total_bytes_xferred, total_io_count;
  uint64_t files_read, files_written, files_deleted;
};
typedef struct gfapi_result gfapi_result_t;



/*** code begins here ****/

char * now_str(void) {
        time_t now = time((time_t * )NULL);
        char * timebuf = (char * )malloc(100);
        if (!timebuf) return timebuf;
        strcpy(timebuf, ctime(&now));
        timebuf[strlen(timebuf)-1] = 0;
        return timebuf; /* MEMORY LEAK, doesn't matter unless you do it a lot */
}

/* if system call error occurs, call this to print errno and then exit */

void scallerr(char * msg)
{
        printf("%s : %s : errno (%d)%s\n", now_str(), msg, errno, strerror(errno));
        if (glfs_p) glfs_fini(glfs_p);
        exit(NOTOK); 
}

/* if user inputs are wrong, print this and exit */

void usage2(const char * msg, const char * param)
{
        if (param) { printf(msg, param); puts(""); }
        else puts(msg);
        puts("usage: ./gfapi_perf_test");
        puts("environment variables may be inserted at front of command or exported");
        puts("defaults are in parentheses");
        puts("DEBUG (0 means off)     - print everything the program does");
        puts("GFAPI_VOLNAME           - Gluster volume to use");
        puts("GFAPI_HOSTNAME          - Gluster server participating in the volume");
        puts("GFAPI_TRANSPORT (tcp)   - transport protocol to use, can be tcp or rdma");
        puts("GFAPI_PORT (24007)      - port number to connect to");
        puts("GFAPI_RECSZ (64)        - I/O transfer size (i.e. record size) to use");
        puts("GFAPI_FSZ (1M)          - file size ");
        puts("GFAPI_BASEDIR(/tmp)     - directory for this thread to use");
        puts("GFAPI_LOAD (seq-wr)     - workload to apply, can be one of seq-wr, seq-rd, rnd-wr, rnd-rd, unlink, seq-rdwrmix");
        puts("GFAPI_IOREQ (0 = all)   - for random workloads , how many requests to issue");
        puts("GFAPI_DIRECT (0 = off)  - force use of O_DIRECT even for sequential reads/writes");
        puts("GFAPI_FUSE (0 = false)  - if true, use POSIX (through FUSE) instead of libgfapi");
        puts("GFAPI_TRCLVL (0 = none) - trace level specified in glfs_set_logging");
        puts("GFAPI_FILES (100)       - number of files to access");
        puts("GFAPI_STARTING_GUN (none) - touch this file to begin test after all processes are started");
        puts("GFAPI_STARTING_GUN_TIMEOUT (60) - each thread waits this many seconds for starting gun file before timing out");
        puts("GFAPI_FILES_PER_DIR (1000) - maximum files placed in a leaf directory");
        puts("GFAPI_APPEND (0)        - if 1, then append to existing file, instead of creating it");
        puts("GFAPI_OVERWRITE (0)     - if 1, then overwrite existing file, instead of creating it");
        puts("GFAPI_PREFIX (none)     - insert string in front of filename");
        puts("GFAPI_USEC_DELAY_PER_FILE (0) - if non-zero, then sleep this many microseconds after each file is accessed");
        puts("GFAPI_FSYNC_AT_CLOSE (0) - if 1, then issue fsync() call on file before closing");
        /* puts("GFAPI_DIRS_PER_DIR (1000) - maximum subdirs placed in a directory"); */
        exit(NOTOK);
}

void usage(const char * msg) { usage2(msg, NULL); }

/* get an integer environment variable, returning default value if undefined */

int getenv_int( const char * env_var, const int default_value)
{
        char * str_val = getenv(env_var);
        int val = default_value;

        if (str_val)
                val = atoi(str_val);

        /* printf("getenv_int: returning value %d for variable %s\n", val, env_var); */
        return val;
}

/* get a floating-point environment variable, returning default value if undefined */

float getenv_float( const char * env_var, const float default_value)
{
        char * str_val = getenv(env_var);
        float val = default_value;
        if (str_val) val = atof(str_val);
        /* printf("getenv_float: returning value %f for variable %s\n", val, env_var); */
        return val;
}

/* get an integer file size environment variable, returning default value if undefined */

uint64_t getenv_size64_kb( const char * env_var, const uint64_t default_value)
{
        char * str_val = getenv(env_var);
        uint64_t val = default_value;
        int slen;
        if (str_val) {
          slen = strlen(str_val);
          if (slen > 0) {
            char lastch = str_val[slen-1];
            val = atoi(str_val);
            if (isalpha(lastch)) {
              str_val[slen-1] = 0; /* drop the unit */
              switch (toupper(lastch)) {
                case 'M':
                  val *= KB_PER_MB;
                  break;
                case 'K':
                  break;
                case 'G':
                  val *= (KB_PER_MB * KB_PER_MB);
                  break;
                case 'T':
                  val *= (KB_PER_MB * KB_PER_MB * KB_PER_MB);
                  break;
                default:
                  usage("use lower- or upper-case suffixes K, M, G, or T for file size");
              }
            }
          }
        }

        return val;
}

/* get a string environment variable, returning default value if undefined */

char * getenv_str( const char * env_var, const char * default_value)
{
        char * str_val = getenv(env_var);
        const char * val = default_value;
        if (str_val) val = str_val;
        else if (!default_value) 
                usage2("getenv_str: you must define environment variable %s", env_var);
        /* printf("getenv_str: returning value %s for variable %s\n", val, env_var); */
        return (char * )val;
}

/* get current time in nanosec */

uint64_t gettime_ns(void)
{
        uint64_t ns;
        struct timespec t;

        clock_gettime(CLOCK_REALTIME, &t);
        ns = t.tv_nsec + 1000000000*t.tv_sec;
        return ns;
}

void sleep_for_usec( unsigned usec_delay_per_file )
{
     int rc;
     struct timeval tval = {0};
     tval.tv_usec = usec_delay_per_file;
     rc = select( 0, NULL, NULL, NULL, &tval );
     if (rc < OK) scallerr("select");
}

/* used to generate random offsets into a file for random I/O workloads */

off_t * random_offset_sequence( uint64_t file_size_bytes, size_t record_size_bytes )
{
        unsigned j;
        uint64_t io_requests = file_size_bytes / record_size_bytes;
        off_t * offset_sequence = (off_t * )calloc(io_requests, sizeof(off_t));

        for (j=0; j<io_requests; j++) offset_sequence[j] = j*record_size_bytes;
        for (j=0; j<io_requests; j++) {
                off_t next_offset = offset_sequence[j];
                unsigned random_index = random() % io_requests;
                off_t random_offset = offset_sequence[random_index];
                offset_sequence[j] = random_offset;
                offset_sequence[random_index] = next_offset;
        }
        return offset_sequence;
}

/* compute next pathname for thread to use */

void get_next_path( const int filenum, const int files_per_dir, const int thread_num, const char * base_dir, const char * prefix, char *next_fname  )
{
  int subdir = filenum / files_per_dir;
  sprintf(next_fname, "%s/thrd%03d-d%04d/%s.%07d", base_dir, thread_num, subdir, prefix, filenum);
}

// comment out below for normal write.
#define ZERO_CPY

/* each thread runs code in this routine */

void * gfapi_thread_run( void * void_result_p )
{
  gfapi_result_t * result_p = (gfapi_result_t * )void_result_p;
  off_t * random_offsets;
  char ready_path[1024] = {0}, hostnamebuf[1024] = {0}, pidstr[100] = {0}, threadstr[100] = {0};
  int ready_fd;
  glfs_fd_t * ready_fd_p;
  glfs_fd_t * glfs_fd_p = NULL;
  int fd = -1;
  int rc, k;
  int sec;
  struct stat st = {0};
  char next_fname[1024] = {0};
  int create_flags = O_WRONLY|O_EXCL|O_CREAT;
  off_t offset;
  unsigned io_count;
  int bytes_xferred;
  char * buf;
  char *zero_buf = NULL;
  void  *write_ref = NULL;



  /* use same random offset sequence for all files */

  if (prm.workload_type == WL_RNDWR || prm.workload_type == WL_RNDRD) {
    random_offsets = random_offset_sequence( 
                          (uint64_t )prm.filesz_kb*BYTES_PER_KB, prm.recsz*BYTES_PER_KB );
  }

  /* wait for the starting gun file, which should be in parent directory */
  /* it is invoker's responsibility to unlink the starting gun file before starting this program */

  if (strlen(prm.starting_gun_file) > 0) {
    static const int sg_create_flags = O_CREAT|O_EXCL|O_WRONLY;
    char ready_buf[1024] = {0};

    /* signal that we are ready */

    gethostname(hostnamebuf, sizeof(hostnamebuf)-4);
    sprintf(pidstr, "%d", getpid());
    sprintf(threadstr, "%d", result_p->thread_num);

    strcpy(ready_buf, prm.starting_gun_file);
    dirname(ready_buf);
    strcpy(ready_path, ready_buf);
    strcat(ready_path, "/");
    strcat(ready_path, strtok(hostnamebuf,"."));
    strcat(ready_path, ".");
    strcat(ready_path, pidstr);
    strcat(ready_path, ".");
    strcat(ready_path, threadstr);
    strcat(ready_path, ".ready");
    printf("%s : ", now_str());
    printf("signaling ready with file %s\n", ready_path);
    if (prm.use_fuse) {
      ready_fd = open(ready_path, sg_create_flags, 0666);
      if (ready_fd < 0) scallerr(ready_path);
      else {
        rc = close(ready_fd);
        if (rc < OK) scallerr("ready path close");
      }
    } else {
      ready_fd_p = glfs_creat(glfs_p, ready_path, sg_create_flags, 0644);
      if (!ready_fd_p) scallerr(ready_path);
      else {
        rc = glfs_close(ready_fd_p);
        if (rc < OK) scallerr("ready path close");
      }
    }

    /* wait until we are told to start the test, to give other threads time to get ready */

    printf("%s : ", now_str());
    printf("awaiting starting gun file %s\n", prm.starting_gun_file);
    FOREACH(sec, prm.starting_gun_timeout) {
      rc = prm.use_fuse ? stat(prm.starting_gun_file, &st) : glfs_stat(glfs_p, prm.starting_gun_file, &st);
      if (prm.debug) printf("rc=%d errno=%d\n", rc, errno);
      if (rc != OK) {
        if (errno != ENOENT) scallerr(prm.use_fuse ? "stat" : "glfs_stat");
      } else {
        break; /* we heard the starting gun */
      }
      sleep(1);
    }
    if (sec == prm.starting_gun_timeout) {
      printf(now_str());
      printf("ERROR: timed out after %d sec waiting for starting gun file %s\n", 
             prm.starting_gun_timeout, prm.starting_gun_file);
      exit(NOTOK);
    }
    sleep(3); /* give everyone a chance to see it */
  }

  /* we can use page-aligned buffer regardless of whether O_DIRECT is used or not */
  buf = memalign(PAGE_BOUNDARY, prm.bytes_to_xfer);
  if (!buf) scallerr("posix_memalign");

  /* open the file */

  result_p->start_time = gettime_ns();
  create_flags |= prm.o_direct;
  if (prm.o_append|prm.o_overwrite) create_flags &= ~(O_EXCL|O_CREAT);
  FOREACH(k, prm.filecount) {
   int workload = prm.workload_type;
   if (workload == WL_SEQRDWRMIX) {
     float rndsample = (float )(random() % 100);
     workload = (rndsample > prm.rdpct) ? WL_SEQWR : WL_SEQRD;
     if (prm.debug) printf("workload %s\n", workload_description[workload]);
   }
   get_next_path( k, prm.files_per_dir, result_p->thread_num, prm.thrd_basedir, prm.prefix, next_fname );
   if (prm.debug) printf("starting file %s\n", next_fname);
   fd = -2;
   glfs_fd_p = NULL;
   if (prm.use_fuse) {
     switch (workload) {
      case WL_DELETE:
        rc = unlink(next_fname);
        if (rc < OK && errno != ENOENT) scallerr(next_fname);
        break;

      case WL_SEQWR: 
        fd = open(next_fname, create_flags, 0666);
        if ((fd < OK) && (errno == ENOENT)) {
          char subdir[1024];
          strcpy(subdir, dirname(next_fname));
          rc = mkdir(subdir, 0755);
          if (rc < OK) scallerr(subdir);
          /* we have to reconstruct filename because dirname() function sticks null into it */
          get_next_path( k, prm.files_per_dir, result_p->thread_num, prm.thrd_basedir, prm.prefix, next_fname );
          fd = open(next_fname, create_flags, 0666);
        }
        if ((prm.workload_type == WL_SEQRDWRMIX) && (rc < OK) && (errno == EEXIST)) {
          rc = unlink(next_fname);
          if (rc < OK) scallerr(next_fname);
          fd = open(next_fname, create_flags, 0666);
        }
        if (fd < OK) scallerr(next_fname);
        if (prm.o_append) {
          rc = lseek( fd, 0, SEEK_END);
          if (rc < OK) scallerr(next_fname);
        }
        break;

      case WL_SEQRD:
        fd = open(next_fname, O_RDONLY|prm.o_direct);
        if (fd < OK) scallerr(next_fname);
        break;

      case WL_RNDWR:
        fd = open(next_fname, O_WRONLY|prm.o_direct);
        if (fd < OK) scallerr(next_fname);
        break;

      case WL_RNDRD:
        fd = open(next_fname, O_RDONLY|prm.o_direct);
        if (fd < OK) scallerr(next_fname);
        break;

      default: exit(NOTOK);
     }
   } else {
     switch (workload) {
      case WL_DELETE:
        rc = glfs_unlink(glfs_p, next_fname);
        if (rc < OK && errno != ENOENT) scallerr(next_fname);
        break;

      case WL_SEQWR: 
        if (prm.o_append|prm.o_overwrite) {
          glfs_fd_p = glfs_open(glfs_p, next_fname, create_flags );
          if (!glfs_fd_p) scallerr(next_fname);
          if (prm.o_append) {
            rc = glfs_lseek( glfs_fd_p, 0, SEEK_END);
            if (rc < OK) scallerr(next_fname);
          }
        } else {
          glfs_fd_p = glfs_creat(glfs_p, next_fname, create_flags, 0666 );
          if ((!glfs_fd_p) && (errno == ENOENT)) {
            char subdir[1024];
            strcpy(subdir, dirname(next_fname));
            rc = glfs_mkdir(glfs_p, subdir, 0755);
            if (rc < OK) scallerr(subdir);
            /* we have to reconstruct filename because dirname() function sticks null into it */
            get_next_path( k, prm.files_per_dir, result_p->thread_num, prm.thrd_basedir, prm.prefix, next_fname );
            glfs_fd_p = glfs_creat(glfs_p, next_fname, create_flags, 0666);
          }
          if ((prm.workload_type == WL_SEQRDWRMIX) && (rc < OK) && (errno == EEXIST)) {
            rc = glfs_unlink(glfs_p, next_fname);
            if (rc < OK && errno != ENOENT) scallerr(next_fname);
            glfs_fd_p = glfs_creat(glfs_p, next_fname, create_flags, 0666);
          }
          if (!glfs_fd_p) scallerr(next_fname);
        }
        break;

      case WL_SEQRD:
        glfs_fd_p = glfs_open(glfs_p, next_fname, O_RDONLY|prm.o_direct);
        if (!glfs_fd_p) scallerr(next_fname);
        break;

      case WL_RNDWR:
        glfs_fd_p = glfs_open(glfs_p, next_fname, O_WRONLY|prm.o_direct);
        if (!glfs_fd_p) scallerr(next_fname);
        break;

      case WL_RNDRD:
        glfs_fd_p = glfs_open(glfs_p, next_fname, O_RDONLY|prm.o_direct);
        if (!glfs_fd_p) scallerr(next_fname);
        break;

      default: exit(NOTOK);
     }
   }
   if (workload == WL_DELETE) {
     if (prm.usec_delay_per_file) sleep_for_usec(prm.usec_delay_per_file);
     result_p->files_deleted++;
     continue;
   }

   /* perform the requested I/O operations */

   offset = 0;
   if (prm.debug) printf("io_requests = %ld\n", prm.io_requests);

#ifdef ZERO_CPY
   printf ("zero copy write");
   zero_buf = glfs_get_buffer (glfs_p, &write_ref, prm.bytes_to_xfer);
   if (NULL == zero_buf) {
           scallerr("\nglfs_get_buffer failed");
   }
#endif

   FOREACH( io_count, prm.io_requests ) {
    if (workload == WL_SEQWR) {
      offset += prm.bytes_to_xfer;
      #ifndef  ZERO_CPY
      bytes_xferred = prm.use_fuse ?
        write(fd, buf, prm.bytes_to_xfer) :
        glfs_write(glfs_fd_p, buf, prm.bytes_to_xfer, 0);
      #else
      bytes_xferred = glfs_zero_write (glfs_fd_p, zero_buf, prm.bytes_to_xfer, 0, write_ref);
      if (bytes_xferred < 0) {
              scallerr("\nglfs_write failed\n");
      }
      #endif
      if (bytes_xferred < prm.bytes_to_xfer) 
                scallerr(prm.use_fuse?"write":"glfs_write");

    } else if (workload == WL_SEQRD) {
      offset += prm.bytes_to_xfer;
      bytes_xferred = prm.use_fuse ? 
        read(fd, buf, prm.bytes_to_xfer) :
        glfs_read(glfs_fd_p, buf, prm.bytes_to_xfer, 0);
      if (bytes_xferred < prm.bytes_to_xfer) 
                scallerr(prm.use_fuse?"read":"glfs_read");

    } else if (workload == WL_RNDWR) {
      offset = random_offsets[io_count];
      bytes_xferred = prm.use_fuse ?
        pwrite(fd, buf, prm.bytes_to_xfer, offset) :
        glfs_pwrite(glfs_fd_p, buf, prm.bytes_to_xfer, offset, 0);
      if (bytes_xferred < prm.bytes_to_xfer) 
                scallerr(prm.use_fuse?"pwrite":"glfs_pwrite");

    } else if (workload == WL_RNDRD) {
      offset = random_offsets[io_count];
      bytes_xferred = prm.use_fuse ? 
        pread(fd, buf, prm.bytes_to_xfer, offset) :
        glfs_pread(glfs_fd_p, buf, prm.bytes_to_xfer, offset, 0);
      if (bytes_xferred < prm.bytes_to_xfer) 
                scallerr(prm.use_fuse?"pwrite":"glfs_pwrite");
    }
    result_p->total_bytes_xferred += bytes_xferred;
    if (prm.debug) printf("offset %-20ld, io_count %-10u total_bytes_xferred %-20ld\n", 
                      offset, io_count, result_p->total_bytes_xferred);
   }
#ifdef ZERO_CPY
   glfs_free_buffer (write_ref);
#endif

   result_p->total_io_count += io_count;

   /* shut down file access */

   if ((workload == WL_SEQWR || workload == WL_RNDWR) && prm.fsync_at_close) {
     rc = prm.use_fuse ? fsync(fd) : glfs_fsync(glfs_fd_p);
     if (rc) scallerr(prm.use_fuse ? "fsync" : "glfs_fsync");
   }
   rc = prm.use_fuse ? close(fd) : glfs_close(glfs_fd_p);
   if (rc) scallerr(prm.use_fuse ? "close" : "glfs_close");
   if (prm.usec_delay_per_file) sleep_for_usec(prm.usec_delay_per_file);
   if ((workload == WL_SEQWR) || (workload == WL_RNDWR))
     result_p->files_written++;
   if ((workload == WL_SEQRD) || (workload == WL_RNDRD))
     result_p->files_read++;
  }
  result_p->end_time = gettime_ns();
  return NULL;
}

void print_result( gfapi_result_t * result_p )
{
  float thru, files_thru, mb_transferred, pct_actual_reads;
  uint64_t files_done;

  /* calculate and print stats */

  if (result_p->thread_num < 0) printf("aggregate: "); else printf("thread %3d: ", result_p->thread_num);
  result_p->elapsed_time = result_p->end_time - result_p->start_time;
  if (prm.debug) printf("start %ld end %ld elapsed %ld\n", result_p->start_time, result_p->end_time, result_p->elapsed_time);
  if (prm.debug) printf("  total byte count = "UINT64DFMT" total io count = "UINT64DFMT"\n", 
                     result_p->total_bytes_xferred, result_p->total_io_count );
  mb_transferred = (float )result_p->total_io_count * prm.recsz / KB_PER_MB;
  thru = mb_transferred * NSEC_PER_SEC / result_p->elapsed_time ;
  files_done = result_p->files_written + result_p->files_read;
  files_thru = files_done * NSEC_PER_SEC / result_p->elapsed_time;
  if (files_done < 10) {
    files_thru = 0.0;
  }
  if (result_p->files_written) printf("  files written = "UINT64DFMT"\n", result_p->files_written);
  if (result_p->files_read) printf("  files read = "UINT64DFMT"\n", result_p->files_read);
  printf("  files done = "UINT64DFMT"\n", files_done);
  if (prm.workload_type == WL_SEQRDWRMIX) {
    pct_actual_reads = 100.0 * result_p->files_read / files_done;
    printf("  fraction of reads = %6.2f%%\n", pct_actual_reads );
  }
  if (result_p->total_io_count > 0) printf("  I/O (record) transfers = "UINT64DFMT"\n", result_p->total_io_count);
  if (result_p->total_bytes_xferred > 0) printf("  total bytes = "UINT64DFMT"\n", result_p->total_bytes_xferred);
  printf("  elapsed time    = %-9.2f sec\n", result_p->elapsed_time/NSEC_PER_SEC);
  if (thru > 0.0) printf("  throughput      = %-9.2f MB/sec\n", thru);
  if (files_thru > 0.0) printf("  file rate       = %-9.2f files/sec\n", files_thru);
  if (thru > 0.0) printf("  IOPS            = %-9.2f (%s)\n", thru * 1024 / prm.recsz, workload_description[prm.workload_type]);
}

void aggregate_result( gfapi_result_t * r_in_p, gfapi_result_t * r_out_p )
{
  if (r_out_p->start_time == 0) r_out_p->start_time = (uint64_t )-1; /* positive infinity */
  if (r_out_p->start_time > r_in_p->start_time) r_out_p->start_time = r_in_p->start_time;
  if (r_out_p->end_time < r_in_p->end_time) r_out_p->end_time = r_in_p->end_time;
  r_out_p->total_bytes_xferred += r_in_p->total_bytes_xferred;
  r_out_p->total_io_count += r_in_p->total_io_count;
  r_out_p->files_read += r_in_p->files_read;
  r_out_p->files_written += r_in_p->files_written;
  r_out_p->files_deleted += r_in_p->files_deleted;
}

int main(int argc, char * argv[])
{
  int rc, j, t;
  uint64_t max_io_requests;

  gfapi_result_t * result_array;
  gfapi_result_t aggregate = {0};

  /* define environment variable inputs */

  prm.debug = getenv_int("DEBUG", 0);
  printf("prm.debug: %d\n", prm.debug);

  prm.rdpct = getenv_float("GFAPI_RDPCT", 0.0);
  prm.threads_per_proc = getenv_int("GFAPI_THREADS_PER_PROC", 1);
  prm.trclvl = getenv_int("GFAPI_TRCLVL", 0);
  prm.glfs_volname = getenv_str("GFAPI_VOLNAME", NULL);
  prm.glfs_hostname = getenv_str("GFAPI_HOSTNAME", NULL);
  prm.glfs_transport = getenv_str("GFAPI_TRANSPORT", "tcp");
  prm.glfs_portnum = getenv_int("GFAPI_PORT", 24007);
  prm.recsz = getenv_int("GFAPI_RECSZ", 64);
  prm.filesz_kb = getenv_size64_kb("GFAPI_FSZ", 1024);
  prm.prefix = getenv_str("GFAPI_PREFIX", "f");
  prm.thrd_basedir = getenv_str("GFAPI_BASEDIR", "/tmp" );
  prm.starting_gun_file = getenv_str("GFAPI_STARTING_GUN", "");
  prm.workload_str = getenv_str("GFAPI_LOAD", "seq-wr");  
  prm.io_requests = (uint64_t )getenv_int("GFAPI_IOREQ", 0);
  prm.starting_gun_timeout = getenv_int("GFAPI_STARTING_GUN_TIMEOUT", 60);
  prm.fsync_at_close = getenv_int("GFAPI_FSYNC_AT_CLOSE", 0);
  prm.use_fuse = getenv_int("GFAPI_FUSE", 0);
  prm.o_direct = getenv_int("GFAPI_DIRECT", 0) ? O_DIRECT : 0;
  prm.o_append = getenv_int("GFAPI_APPEND", 0);
  prm.o_overwrite = getenv_int("GFAPI_OVERWRITE", 0);
  prm.filecount = getenv_int("GFAPI_FILES", 100);
  prm.usec_delay_per_file = getenv_int("GFAPI_USEC_DELAY_PER_FILE", 0);
  /* int dirs_per_dir = getenv_int("GFAPI_DIRS_PER_DIR", 1000); */
  prm.files_per_dir = getenv_int("GFAPI_FILES_PER_DIR", 1000);

  printf("GLUSTER: \n  volume=%s\n  transport=%s\n  host=%s\n  port=%d\n  fuse?%s\n  trace level=%d\n  start timeout=%d\n", 
          prm.glfs_volname, prm.glfs_transport, prm.glfs_hostname, prm.glfs_portnum, prm.use_fuse ? "Yes" : "No", prm.trclvl, prm.starting_gun_timeout );

  printf("WORKLOAD:\n  type = %s \n  threads/proc = %d\n  base directory = %s\n  prefix=%s\n"
         "  file size = "UINT64DFMT" KB\n  file count = %d\n  record size = %u KB"
         "\n  files/dir=%d\n  fsync-at-close? %s \n", 
                prm.workload_str, prm.threads_per_proc, prm.thrd_basedir, prm.prefix, 
                prm.filesz_kb, prm.filecount, prm.recsz, 
                prm.files_per_dir, prm.fsync_at_close?"Yes":"No");

  if (prm.o_direct) printf("  forcing use of direct I/O with O_DIRECT flag in open call\n");
  if (prm.usec_delay_per_file) printf("  sleeping %d microsec after each file access\n", prm.usec_delay_per_file);
  if (argc > 1) usage("glfs_io_test doesn't take command line parameters");
  if (prm.o_append && prm.o_overwrite) usage("GFAPI_APPEND and GFAPI_OVERWRITE cannot be used in the same test");

  /* validate inputs */

  for (j=0; workload_types[j]; j++) {
    if (strcmp(workload_types[j], prm.workload_str) == 0)
        break;
  }

  if (!workload_types[j])
        usage2("invalid workload type %s", prm.workload_str);
  prm.workload_type = j; /* one of WL_* codes */

  if (prm.workload_type == WL_SEQRDWRMIX) {
    printf( "  percent reads = %6.2f\n", prm.rdpct );
    if ((prm.o_append == 0) && (prm.o_overwrite == 0)) prm.o_append = 1;
  }

  if (prm.o_append) printf("  using O_APPEND flag to append to existing files\n");

  if (prm.o_overwrite) printf("  overwriting existing files\n");

  if (prm.filesz_kb < prm.recsz) {
    printf("  truncating record size %u KB to file size %lu KB\n", prm.recsz, prm.filesz_kb );
    prm.recsz = prm.filesz_kb;
  }

  max_io_requests = prm.filesz_kb / prm.recsz;
  if (prm.workload_type == WL_RNDRD || prm.workload_type == WL_RNDWR) {
    if (prm.io_requests == 0) prm.io_requests = max_io_requests;
    printf("  random read/write requests = "UINT64DFMT"\n", prm.io_requests);
    if (prm.io_requests > max_io_requests) {
        usage("GFAPI_IOREQ too large for file size and record size");
    }
  } else { /* if sequential workload, do entire file  */
    prm.io_requests = max_io_requests;
  }
  if (prm.debug) printf("max_io_requests = %ld\n", (long )max_io_requests);

  srandom(time(NULL));
  prm.bytes_to_xfer = prm.recsz * BYTES_PER_KB;

  /* initialize libgfapi instance */

  if (!prm.use_fuse) {
    char logfilename[100];
    /* mount volume */
    glfs_p = glfs_new(prm.glfs_volname);
    if (!glfs_p) scallerr("ERROR: could not initialize Gluster volume mount with volname");

    sprintf(logfilename, "/tmp/glfs-%d.log", getpid());
    if (glfs_set_logging(glfs_p, logfilename, prm.trclvl)) scallerr("set_logging");
  
    if (glfs_set_volfile_server( glfs_p, prm.glfs_transport, prm.glfs_hostname, prm.glfs_portnum ))
        scallerr("ERROR: could not initialize gfapi mount");

    rc = glfs_init(glfs_p);
    if (rc) scallerr("glfs_init");
  }

  /* allocate and initialize per-thread structure and start each thread */

  result_array = (gfapi_result_t * )calloc(prm.threads_per_proc, sizeof(gfapi_result_t));
  FOREACH(t, prm.threads_per_proc) {
    gfapi_result_t * next_result_p = &result_array[t];
    next_result_p->thread_num = t;
    rc = pthread_create(&next_result_p->thr, NULL, gfapi_thread_run, next_result_p);
    if (rc != OK) scallerr("pthread_create");
  }
   
  /* wait for each thread to finish */

  FOREACH(t, prm.threads_per_proc) {
    void * retval;
    gfapi_result_t * next_result_p = &result_array[t];
    rc = pthread_join( next_result_p->thr, &retval );
    if (rc != OK) {
      printf("thread %d return code %d\n", t, rc);
    }
    if (retval == PTHREAD_CANCELED) {
      printf("thread %d cancelled\n", t);
    }
    if (retval) {
      printf("thread %d failed with rc %p\n", t, retval);
    }
  }
  if (!prm.use_fuse) {
    rc = glfs_fini(glfs_p);
    if (rc < OK) scallerr("glfs_fini");
  }
  FOREACH(t, prm.threads_per_proc) {
    print_result(&result_array[t]);
    aggregate_result(&result_array[t], &aggregate);
  }
  aggregate.thread_num = -1;
  print_result(&aggregate);
  return OK;
}


More information about the Gluster-devel mailing list