[Gluster-devel] libgfapi zero copy write - application in samba, nfs-ganesha
Saravanakumar Arumugam
sarumuga at redhat.com
Mon Sep 26 13:48:26 UTC 2016
Hi,
I have carried out "basic" performance measurement with zero copy write
APIs.
Throughput of zero copy write is 57 MB/sec vs default write 43 MB/sec.*
*( I have modified Ben England's gfapi_perf_test.c for this. Attached
the same for reference )
We would like to hear how samba/ nfs-ganesha who are libgfapi users can
make use of this.
Please provide your comments. Refer attached results.
Zero copy in write patch: http://review.gluster.org/#/c/14784/
Thanks,
Saravana
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://www.gluster.org/pipermail/gluster-devel/attachments/20160926/8122ac44/attachment-0001.html>
-------------- next part --------------
ZERO COPY write:
[root at gfvm3 parallel-libgfapi]# sync; echo 3 > /proc/sys/vm/drop_caches [root at gfvm3 parallel-libgfapi]#
[root at gfvm3 parallel-libgfapi]#
root at gfvm3 parallel-libgfapi]# DEBUG=0 GFAPI_VOLNAME=tv1 GFAPI_FSZ=1048576 GFAPI_FILES=3 GFAPI_HOSTNAME=gfvm3 GFAPI_BASEDIR=gluster_tmp ./gfapi_perf_test
prm.debug: 0
GLUSTER:
volume=tv1
transport=tcp
host=gfvm3
port=24007
fuse?No
trace level=0
start timeout=60
WORKLOAD:
type = seq-wr
threads/proc = 1
base directory = gluster_tmp
prefix=f
file size = 1048576 KB
file count = 3
record size = 64 KB
files/dir=1000
fsync-at-close? No
zero copy writezero copy writezero copy writethread 0: files written = 3
files done = 3
I/O (record) transfers = 49152
total bytes = 3221225472
elapsed time = 53.74 sec
throughput = 57.16 MB/sec
IOPS = 914.58 (sequential write)
aggregate: files written = 3
files done = 3
I/O (record) transfers = 49152
total bytes = 3221225472
elapsed time = 53.74 sec
throughput = 57.16 MB/sec
IOPS = 914.58 (sequential write)
[root at gfvm3 parallel-libgfapi]#
Default write:
[root at gfvm3 parallel-libgfapi]# sync; echo 3 > /proc/sys/vm/drop_caches
[root at gfvm3 parallel-libgfapi]# DEBUG=0 GFAPI_VOLNAME=tv1 GFAPI_FSZ=1048576 GFAPI_FILES=3 GFAPI_HOSTNAME=gfvm3 GFAPI_BASEDIR=gluster_tmp ./gfapi_perf_test
prm.debug: 0
GLUSTER:
volume=tv1
transport=tcp
host=gfvm3
port=24007
fuse?No
trace level=0
start timeout=60
WORKLOAD:
type = seq-wr
threads/proc = 1
base directory = gluster_tmp
prefix=f
file size = 1048576 KB
file count = 3
record size = 64 KB
files/dir=1000
fsync-at-close? No
thread 0: files written = 3
files done = 3
I/O (record) transfers = 49152
total bytes = 3221225472
elapsed time = 70.00 sec
throughput = 43.89 MB/sec
IOPS = 702.19 (sequential write)
aggregate: files written = 3
files done = 3
I/O (record) transfers = 49152
total bytes = 3221225472
elapsed time = 70.00 sec
throughput = 43.89 MB/sec
IOPS = 702.19 (sequential write)
[root at gfvm3 parallel-libgfapi]#
-------------- next part --------------
/*
* gfapi_perf_test.c - single-thread test of Gluster libgfapi file perf, enhanced to do small files also
*
* install the glusterfs-api RPM before trying to compile and link
*
* to compile: gcc -pthread -g -O0 -Wall --pedantic -o gfapi_perf_test -I /usr/include/glusterfs/api gfapi_perf_test.c -lgfapi -lrt
*
* environment variables used as inputs, see usage() below
*
* NOTE: we allow random workloads to process a fraction of the entire file
* this allows us to generate a file that will not fit in cache
* we then can do random I/O on a fraction of the data in that file, unlike iozone
*/
#define _GNU_SOURCE
#include <libgen.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <ctype.h>
#include <memory.h>
#include <malloc.h>
#include <errno.h>
#include <fcntl.h>
#include <stdint.h>
#include <time.h>
#include <math.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <pthread.h>
#include <fcntl.h>
#include "glfs.h"
#define NOTOK 1 /* process exit status indicates error of some sort */
#define OK 0 /* system call or process exit status indicating success */
#define KB_PER_MB 1024
#define BYTES_PER_KB 1024
#define BYTES_PER_MB (1024*1024)
#define KB_PER_MB 1024
#define NSEC_PER_SEC 1000000000.0
#define UINT64DFMT "%ld"
/* power of 2 corresponding to 4096-byte page boundary, used in memalign() call */
#define PAGE_BOUNDARY 12
#define FOREACH(_index, _count) for(_index=0; _index < (_count); _index++)
/* last array element of workload_types must be NULL */
static const char * workload_types[] =
{ "seq-wr", "seq-rd", "rnd-wr", "rnd-rd", "unlink", "seq-rdwrmix", NULL };
static const char * workload_description[] =
{ "sequential write", "sequential read", "random write", "random read", "delete", "sequential read-write mix", NULL };
/* define numeric workload types as indexes into preceding array */
#define WL_SEQWR 0
#define WL_SEQRD 1
#define WL_RNDWR 2
#define WL_RNDRD 3
#define WL_DELETE 4
#define WL_SEQRDWRMIX 5
static glfs_t * glfs_p = NULL;
/* shared parameter values common to all threads */
struct gfapi_prm {
int threads_per_proc; /* threads spawned within each process */
char * workload_str; /* name of workload to run */
int workload_type; /* post-parse numeric code for workload - contains WL_something */
unsigned usec_delay_per_file; /* microseconds of delay between each file operation */
int recsz; /* I/O transfer size (KB) */
uint64_t filesz_kb; /* file size (KB) */
int filecount; /* how many files per thread */
uint64_t io_requests; /* if random I/O, how many I/O requests to issue per thread */
int files_per_dir; /* max files placed in each subdirectory beneath thread directory */
float rdpct; /* read percentage for mixed workload */
char * prefix; /* filename prefix (lets you run multiple creates in same directory) */
char * thrd_basedir; /* per-thread base directory */
char * starting_gun_file; /* name of file that tells threads to start running */
int fsync_at_close; /* on write tests, whether or not to call fsync() before close() */
int use_fuse; /* if TRUE, use POSIX filesystem calls, otherwise use libgfapi. default libgfapi */
int o_direct; /* use O_DIRECT flag? */
int o_append; /* use O_APPEND flag? */
int o_overwrite; /* overwrite the file instead of creating it? */
unsigned bytes_to_xfer; /* io size in bytes instead of KB */
int trclvl; /* libgfapi tracing level */
char * glfs_volname; /* Gluster volume name */
char * glfs_hostname; /* Gluster server participating in that volume */
char * glfs_transport; /* transport protocol (RDMA or TCP, defaults to TCP) */
int glfs_portnum; /* port number (DO WE NEED?) */
int open_flags; /* calculate flags to use with open or glfs_open */
int starting_gun_timeout; /* how long should threads wait for starting gun to fire */
int debug; /* debugging messages */
};
static struct gfapi_prm prm = {0}; /* initializer ensures everything is zero (static probably is anyway) */
/* per-thread data structure */
struct gfapi_result {
pthread_t thr;
int thread_num;
uint64_t elapsed_time, end_time, start_time;
uint64_t total_bytes_xferred, total_io_count;
uint64_t files_read, files_written, files_deleted;
};
typedef struct gfapi_result gfapi_result_t;
/*** code begins here ****/
char * now_str(void) {
time_t now = time((time_t * )NULL);
char * timebuf = (char * )malloc(100);
if (!timebuf) return timebuf;
strcpy(timebuf, ctime(&now));
timebuf[strlen(timebuf)-1] = 0;
return timebuf; /* MEMORY LEAK, doesn't matter unless you do it a lot */
}
/* if system call error occurs, call this to print errno and then exit */
void scallerr(char * msg)
{
printf("%s : %s : errno (%d)%s\n", now_str(), msg, errno, strerror(errno));
if (glfs_p) glfs_fini(glfs_p);
exit(NOTOK);
}
/* if user inputs are wrong, print this and exit */
void usage2(const char * msg, const char * param)
{
if (param) { printf(msg, param); puts(""); }
else puts(msg);
puts("usage: ./gfapi_perf_test");
puts("environment variables may be inserted at front of command or exported");
puts("defaults are in parentheses");
puts("DEBUG (0 means off) - print everything the program does");
puts("GFAPI_VOLNAME - Gluster volume to use");
puts("GFAPI_HOSTNAME - Gluster server participating in the volume");
puts("GFAPI_TRANSPORT (tcp) - transport protocol to use, can be tcp or rdma");
puts("GFAPI_PORT (24007) - port number to connect to");
puts("GFAPI_RECSZ (64) - I/O transfer size (i.e. record size) to use");
puts("GFAPI_FSZ (1M) - file size ");
puts("GFAPI_BASEDIR(/tmp) - directory for this thread to use");
puts("GFAPI_LOAD (seq-wr) - workload to apply, can be one of seq-wr, seq-rd, rnd-wr, rnd-rd, unlink, seq-rdwrmix");
puts("GFAPI_IOREQ (0 = all) - for random workloads , how many requests to issue");
puts("GFAPI_DIRECT (0 = off) - force use of O_DIRECT even for sequential reads/writes");
puts("GFAPI_FUSE (0 = false) - if true, use POSIX (through FUSE) instead of libgfapi");
puts("GFAPI_TRCLVL (0 = none) - trace level specified in glfs_set_logging");
puts("GFAPI_FILES (100) - number of files to access");
puts("GFAPI_STARTING_GUN (none) - touch this file to begin test after all processes are started");
puts("GFAPI_STARTING_GUN_TIMEOUT (60) - each thread waits this many seconds for starting gun file before timing out");
puts("GFAPI_FILES_PER_DIR (1000) - maximum files placed in a leaf directory");
puts("GFAPI_APPEND (0) - if 1, then append to existing file, instead of creating it");
puts("GFAPI_OVERWRITE (0) - if 1, then overwrite existing file, instead of creating it");
puts("GFAPI_PREFIX (none) - insert string in front of filename");
puts("GFAPI_USEC_DELAY_PER_FILE (0) - if non-zero, then sleep this many microseconds after each file is accessed");
puts("GFAPI_FSYNC_AT_CLOSE (0) - if 1, then issue fsync() call on file before closing");
/* puts("GFAPI_DIRS_PER_DIR (1000) - maximum subdirs placed in a directory"); */
exit(NOTOK);
}
void usage(const char * msg) { usage2(msg, NULL); }
/* get an integer environment variable, returning default value if undefined */
int getenv_int( const char * env_var, const int default_value)
{
char * str_val = getenv(env_var);
int val = default_value;
if (str_val)
val = atoi(str_val);
/* printf("getenv_int: returning value %d for variable %s\n", val, env_var); */
return val;
}
/* get a floating-point environment variable, returning default value if undefined */
float getenv_float( const char * env_var, const float default_value)
{
char * str_val = getenv(env_var);
float val = default_value;
if (str_val) val = atof(str_val);
/* printf("getenv_float: returning value %f for variable %s\n", val, env_var); */
return val;
}
/* get an integer file size environment variable, returning default value if undefined */
uint64_t getenv_size64_kb( const char * env_var, const uint64_t default_value)
{
char * str_val = getenv(env_var);
uint64_t val = default_value;
int slen;
if (str_val) {
slen = strlen(str_val);
if (slen > 0) {
char lastch = str_val[slen-1];
val = atoi(str_val);
if (isalpha(lastch)) {
str_val[slen-1] = 0; /* drop the unit */
switch (toupper(lastch)) {
case 'M':
val *= KB_PER_MB;
break;
case 'K':
break;
case 'G':
val *= (KB_PER_MB * KB_PER_MB);
break;
case 'T':
val *= (KB_PER_MB * KB_PER_MB * KB_PER_MB);
break;
default:
usage("use lower- or upper-case suffixes K, M, G, or T for file size");
}
}
}
}
return val;
}
/* get a string environment variable, returning default value if undefined */
char * getenv_str( const char * env_var, const char * default_value)
{
char * str_val = getenv(env_var);
const char * val = default_value;
if (str_val) val = str_val;
else if (!default_value)
usage2("getenv_str: you must define environment variable %s", env_var);
/* printf("getenv_str: returning value %s for variable %s\n", val, env_var); */
return (char * )val;
}
/* get current time in nanosec */
uint64_t gettime_ns(void)
{
uint64_t ns;
struct timespec t;
clock_gettime(CLOCK_REALTIME, &t);
ns = t.tv_nsec + 1000000000*t.tv_sec;
return ns;
}
void sleep_for_usec( unsigned usec_delay_per_file )
{
int rc;
struct timeval tval = {0};
tval.tv_usec = usec_delay_per_file;
rc = select( 0, NULL, NULL, NULL, &tval );
if (rc < OK) scallerr("select");
}
/* used to generate random offsets into a file for random I/O workloads */
off_t * random_offset_sequence( uint64_t file_size_bytes, size_t record_size_bytes )
{
unsigned j;
uint64_t io_requests = file_size_bytes / record_size_bytes;
off_t * offset_sequence = (off_t * )calloc(io_requests, sizeof(off_t));
for (j=0; j<io_requests; j++) offset_sequence[j] = j*record_size_bytes;
for (j=0; j<io_requests; j++) {
off_t next_offset = offset_sequence[j];
unsigned random_index = random() % io_requests;
off_t random_offset = offset_sequence[random_index];
offset_sequence[j] = random_offset;
offset_sequence[random_index] = next_offset;
}
return offset_sequence;
}
/* compute next pathname for thread to use */
void get_next_path( const int filenum, const int files_per_dir, const int thread_num, const char * base_dir, const char * prefix, char *next_fname )
{
int subdir = filenum / files_per_dir;
sprintf(next_fname, "%s/thrd%03d-d%04d/%s.%07d", base_dir, thread_num, subdir, prefix, filenum);
}
// comment out below for normal write.
#define ZERO_CPY
/* each thread runs code in this routine */
void * gfapi_thread_run( void * void_result_p )
{
gfapi_result_t * result_p = (gfapi_result_t * )void_result_p;
off_t * random_offsets;
char ready_path[1024] = {0}, hostnamebuf[1024] = {0}, pidstr[100] = {0}, threadstr[100] = {0};
int ready_fd;
glfs_fd_t * ready_fd_p;
glfs_fd_t * glfs_fd_p = NULL;
int fd = -1;
int rc, k;
int sec;
struct stat st = {0};
char next_fname[1024] = {0};
int create_flags = O_WRONLY|O_EXCL|O_CREAT;
off_t offset;
unsigned io_count;
int bytes_xferred;
char * buf;
char *zero_buf = NULL;
void *write_ref = NULL;
/* use same random offset sequence for all files */
if (prm.workload_type == WL_RNDWR || prm.workload_type == WL_RNDRD) {
random_offsets = random_offset_sequence(
(uint64_t )prm.filesz_kb*BYTES_PER_KB, prm.recsz*BYTES_PER_KB );
}
/* wait for the starting gun file, which should be in parent directory */
/* it is invoker's responsibility to unlink the starting gun file before starting this program */
if (strlen(prm.starting_gun_file) > 0) {
static const int sg_create_flags = O_CREAT|O_EXCL|O_WRONLY;
char ready_buf[1024] = {0};
/* signal that we are ready */
gethostname(hostnamebuf, sizeof(hostnamebuf)-4);
sprintf(pidstr, "%d", getpid());
sprintf(threadstr, "%d", result_p->thread_num);
strcpy(ready_buf, prm.starting_gun_file);
dirname(ready_buf);
strcpy(ready_path, ready_buf);
strcat(ready_path, "/");
strcat(ready_path, strtok(hostnamebuf,"."));
strcat(ready_path, ".");
strcat(ready_path, pidstr);
strcat(ready_path, ".");
strcat(ready_path, threadstr);
strcat(ready_path, ".ready");
printf("%s : ", now_str());
printf("signaling ready with file %s\n", ready_path);
if (prm.use_fuse) {
ready_fd = open(ready_path, sg_create_flags, 0666);
if (ready_fd < 0) scallerr(ready_path);
else {
rc = close(ready_fd);
if (rc < OK) scallerr("ready path close");
}
} else {
ready_fd_p = glfs_creat(glfs_p, ready_path, sg_create_flags, 0644);
if (!ready_fd_p) scallerr(ready_path);
else {
rc = glfs_close(ready_fd_p);
if (rc < OK) scallerr("ready path close");
}
}
/* wait until we are told to start the test, to give other threads time to get ready */
printf("%s : ", now_str());
printf("awaiting starting gun file %s\n", prm.starting_gun_file);
FOREACH(sec, prm.starting_gun_timeout) {
rc = prm.use_fuse ? stat(prm.starting_gun_file, &st) : glfs_stat(glfs_p, prm.starting_gun_file, &st);
if (prm.debug) printf("rc=%d errno=%d\n", rc, errno);
if (rc != OK) {
if (errno != ENOENT) scallerr(prm.use_fuse ? "stat" : "glfs_stat");
} else {
break; /* we heard the starting gun */
}
sleep(1);
}
if (sec == prm.starting_gun_timeout) {
printf(now_str());
printf("ERROR: timed out after %d sec waiting for starting gun file %s\n",
prm.starting_gun_timeout, prm.starting_gun_file);
exit(NOTOK);
}
sleep(3); /* give everyone a chance to see it */
}
/* we can use page-aligned buffer regardless of whether O_DIRECT is used or not */
buf = memalign(PAGE_BOUNDARY, prm.bytes_to_xfer);
if (!buf) scallerr("posix_memalign");
/* open the file */
result_p->start_time = gettime_ns();
create_flags |= prm.o_direct;
if (prm.o_append|prm.o_overwrite) create_flags &= ~(O_EXCL|O_CREAT);
FOREACH(k, prm.filecount) {
int workload = prm.workload_type;
if (workload == WL_SEQRDWRMIX) {
float rndsample = (float )(random() % 100);
workload = (rndsample > prm.rdpct) ? WL_SEQWR : WL_SEQRD;
if (prm.debug) printf("workload %s\n", workload_description[workload]);
}
get_next_path( k, prm.files_per_dir, result_p->thread_num, prm.thrd_basedir, prm.prefix, next_fname );
if (prm.debug) printf("starting file %s\n", next_fname);
fd = -2;
glfs_fd_p = NULL;
if (prm.use_fuse) {
switch (workload) {
case WL_DELETE:
rc = unlink(next_fname);
if (rc < OK && errno != ENOENT) scallerr(next_fname);
break;
case WL_SEQWR:
fd = open(next_fname, create_flags, 0666);
if ((fd < OK) && (errno == ENOENT)) {
char subdir[1024];
strcpy(subdir, dirname(next_fname));
rc = mkdir(subdir, 0755);
if (rc < OK) scallerr(subdir);
/* we have to reconstruct filename because dirname() function sticks null into it */
get_next_path( k, prm.files_per_dir, result_p->thread_num, prm.thrd_basedir, prm.prefix, next_fname );
fd = open(next_fname, create_flags, 0666);
}
if ((prm.workload_type == WL_SEQRDWRMIX) && (rc < OK) && (errno == EEXIST)) {
rc = unlink(next_fname);
if (rc < OK) scallerr(next_fname);
fd = open(next_fname, create_flags, 0666);
}
if (fd < OK) scallerr(next_fname);
if (prm.o_append) {
rc = lseek( fd, 0, SEEK_END);
if (rc < OK) scallerr(next_fname);
}
break;
case WL_SEQRD:
fd = open(next_fname, O_RDONLY|prm.o_direct);
if (fd < OK) scallerr(next_fname);
break;
case WL_RNDWR:
fd = open(next_fname, O_WRONLY|prm.o_direct);
if (fd < OK) scallerr(next_fname);
break;
case WL_RNDRD:
fd = open(next_fname, O_RDONLY|prm.o_direct);
if (fd < OK) scallerr(next_fname);
break;
default: exit(NOTOK);
}
} else {
switch (workload) {
case WL_DELETE:
rc = glfs_unlink(glfs_p, next_fname);
if (rc < OK && errno != ENOENT) scallerr(next_fname);
break;
case WL_SEQWR:
if (prm.o_append|prm.o_overwrite) {
glfs_fd_p = glfs_open(glfs_p, next_fname, create_flags );
if (!glfs_fd_p) scallerr(next_fname);
if (prm.o_append) {
rc = glfs_lseek( glfs_fd_p, 0, SEEK_END);
if (rc < OK) scallerr(next_fname);
}
} else {
glfs_fd_p = glfs_creat(glfs_p, next_fname, create_flags, 0666 );
if ((!glfs_fd_p) && (errno == ENOENT)) {
char subdir[1024];
strcpy(subdir, dirname(next_fname));
rc = glfs_mkdir(glfs_p, subdir, 0755);
if (rc < OK) scallerr(subdir);
/* we have to reconstruct filename because dirname() function sticks null into it */
get_next_path( k, prm.files_per_dir, result_p->thread_num, prm.thrd_basedir, prm.prefix, next_fname );
glfs_fd_p = glfs_creat(glfs_p, next_fname, create_flags, 0666);
}
if ((prm.workload_type == WL_SEQRDWRMIX) && (rc < OK) && (errno == EEXIST)) {
rc = glfs_unlink(glfs_p, next_fname);
if (rc < OK && errno != ENOENT) scallerr(next_fname);
glfs_fd_p = glfs_creat(glfs_p, next_fname, create_flags, 0666);
}
if (!glfs_fd_p) scallerr(next_fname);
}
break;
case WL_SEQRD:
glfs_fd_p = glfs_open(glfs_p, next_fname, O_RDONLY|prm.o_direct);
if (!glfs_fd_p) scallerr(next_fname);
break;
case WL_RNDWR:
glfs_fd_p = glfs_open(glfs_p, next_fname, O_WRONLY|prm.o_direct);
if (!glfs_fd_p) scallerr(next_fname);
break;
case WL_RNDRD:
glfs_fd_p = glfs_open(glfs_p, next_fname, O_RDONLY|prm.o_direct);
if (!glfs_fd_p) scallerr(next_fname);
break;
default: exit(NOTOK);
}
}
if (workload == WL_DELETE) {
if (prm.usec_delay_per_file) sleep_for_usec(prm.usec_delay_per_file);
result_p->files_deleted++;
continue;
}
/* perform the requested I/O operations */
offset = 0;
if (prm.debug) printf("io_requests = %ld\n", prm.io_requests);
#ifdef ZERO_CPY
printf ("zero copy write");
zero_buf = glfs_get_buffer (glfs_p, &write_ref, prm.bytes_to_xfer);
if (NULL == zero_buf) {
scallerr("\nglfs_get_buffer failed");
}
#endif
FOREACH( io_count, prm.io_requests ) {
if (workload == WL_SEQWR) {
offset += prm.bytes_to_xfer;
#ifndef ZERO_CPY
bytes_xferred = prm.use_fuse ?
write(fd, buf, prm.bytes_to_xfer) :
glfs_write(glfs_fd_p, buf, prm.bytes_to_xfer, 0);
#else
bytes_xferred = glfs_zero_write (glfs_fd_p, zero_buf, prm.bytes_to_xfer, 0, write_ref);
if (bytes_xferred < 0) {
scallerr("\nglfs_write failed\n");
}
#endif
if (bytes_xferred < prm.bytes_to_xfer)
scallerr(prm.use_fuse?"write":"glfs_write");
} else if (workload == WL_SEQRD) {
offset += prm.bytes_to_xfer;
bytes_xferred = prm.use_fuse ?
read(fd, buf, prm.bytes_to_xfer) :
glfs_read(glfs_fd_p, buf, prm.bytes_to_xfer, 0);
if (bytes_xferred < prm.bytes_to_xfer)
scallerr(prm.use_fuse?"read":"glfs_read");
} else if (workload == WL_RNDWR) {
offset = random_offsets[io_count];
bytes_xferred = prm.use_fuse ?
pwrite(fd, buf, prm.bytes_to_xfer, offset) :
glfs_pwrite(glfs_fd_p, buf, prm.bytes_to_xfer, offset, 0);
if (bytes_xferred < prm.bytes_to_xfer)
scallerr(prm.use_fuse?"pwrite":"glfs_pwrite");
} else if (workload == WL_RNDRD) {
offset = random_offsets[io_count];
bytes_xferred = prm.use_fuse ?
pread(fd, buf, prm.bytes_to_xfer, offset) :
glfs_pread(glfs_fd_p, buf, prm.bytes_to_xfer, offset, 0);
if (bytes_xferred < prm.bytes_to_xfer)
scallerr(prm.use_fuse?"pwrite":"glfs_pwrite");
}
result_p->total_bytes_xferred += bytes_xferred;
if (prm.debug) printf("offset %-20ld, io_count %-10u total_bytes_xferred %-20ld\n",
offset, io_count, result_p->total_bytes_xferred);
}
#ifdef ZERO_CPY
glfs_free_buffer (write_ref);
#endif
result_p->total_io_count += io_count;
/* shut down file access */
if ((workload == WL_SEQWR || workload == WL_RNDWR) && prm.fsync_at_close) {
rc = prm.use_fuse ? fsync(fd) : glfs_fsync(glfs_fd_p);
if (rc) scallerr(prm.use_fuse ? "fsync" : "glfs_fsync");
}
rc = prm.use_fuse ? close(fd) : glfs_close(glfs_fd_p);
if (rc) scallerr(prm.use_fuse ? "close" : "glfs_close");
if (prm.usec_delay_per_file) sleep_for_usec(prm.usec_delay_per_file);
if ((workload == WL_SEQWR) || (workload == WL_RNDWR))
result_p->files_written++;
if ((workload == WL_SEQRD) || (workload == WL_RNDRD))
result_p->files_read++;
}
result_p->end_time = gettime_ns();
return NULL;
}
void print_result( gfapi_result_t * result_p )
{
float thru, files_thru, mb_transferred, pct_actual_reads;
uint64_t files_done;
/* calculate and print stats */
if (result_p->thread_num < 0) printf("aggregate: "); else printf("thread %3d: ", result_p->thread_num);
result_p->elapsed_time = result_p->end_time - result_p->start_time;
if (prm.debug) printf("start %ld end %ld elapsed %ld\n", result_p->start_time, result_p->end_time, result_p->elapsed_time);
if (prm.debug) printf(" total byte count = "UINT64DFMT" total io count = "UINT64DFMT"\n",
result_p->total_bytes_xferred, result_p->total_io_count );
mb_transferred = (float )result_p->total_io_count * prm.recsz / KB_PER_MB;
thru = mb_transferred * NSEC_PER_SEC / result_p->elapsed_time ;
files_done = result_p->files_written + result_p->files_read;
files_thru = files_done * NSEC_PER_SEC / result_p->elapsed_time;
if (files_done < 10) {
files_thru = 0.0;
}
if (result_p->files_written) printf(" files written = "UINT64DFMT"\n", result_p->files_written);
if (result_p->files_read) printf(" files read = "UINT64DFMT"\n", result_p->files_read);
printf(" files done = "UINT64DFMT"\n", files_done);
if (prm.workload_type == WL_SEQRDWRMIX) {
pct_actual_reads = 100.0 * result_p->files_read / files_done;
printf(" fraction of reads = %6.2f%%\n", pct_actual_reads );
}
if (result_p->total_io_count > 0) printf(" I/O (record) transfers = "UINT64DFMT"\n", result_p->total_io_count);
if (result_p->total_bytes_xferred > 0) printf(" total bytes = "UINT64DFMT"\n", result_p->total_bytes_xferred);
printf(" elapsed time = %-9.2f sec\n", result_p->elapsed_time/NSEC_PER_SEC);
if (thru > 0.0) printf(" throughput = %-9.2f MB/sec\n", thru);
if (files_thru > 0.0) printf(" file rate = %-9.2f files/sec\n", files_thru);
if (thru > 0.0) printf(" IOPS = %-9.2f (%s)\n", thru * 1024 / prm.recsz, workload_description[prm.workload_type]);
}
void aggregate_result( gfapi_result_t * r_in_p, gfapi_result_t * r_out_p )
{
if (r_out_p->start_time == 0) r_out_p->start_time = (uint64_t )-1; /* positive infinity */
if (r_out_p->start_time > r_in_p->start_time) r_out_p->start_time = r_in_p->start_time;
if (r_out_p->end_time < r_in_p->end_time) r_out_p->end_time = r_in_p->end_time;
r_out_p->total_bytes_xferred += r_in_p->total_bytes_xferred;
r_out_p->total_io_count += r_in_p->total_io_count;
r_out_p->files_read += r_in_p->files_read;
r_out_p->files_written += r_in_p->files_written;
r_out_p->files_deleted += r_in_p->files_deleted;
}
int main(int argc, char * argv[])
{
int rc, j, t;
uint64_t max_io_requests;
gfapi_result_t * result_array;
gfapi_result_t aggregate = {0};
/* define environment variable inputs */
prm.debug = getenv_int("DEBUG", 0);
printf("prm.debug: %d\n", prm.debug);
prm.rdpct = getenv_float("GFAPI_RDPCT", 0.0);
prm.threads_per_proc = getenv_int("GFAPI_THREADS_PER_PROC", 1);
prm.trclvl = getenv_int("GFAPI_TRCLVL", 0);
prm.glfs_volname = getenv_str("GFAPI_VOLNAME", NULL);
prm.glfs_hostname = getenv_str("GFAPI_HOSTNAME", NULL);
prm.glfs_transport = getenv_str("GFAPI_TRANSPORT", "tcp");
prm.glfs_portnum = getenv_int("GFAPI_PORT", 24007);
prm.recsz = getenv_int("GFAPI_RECSZ", 64);
prm.filesz_kb = getenv_size64_kb("GFAPI_FSZ", 1024);
prm.prefix = getenv_str("GFAPI_PREFIX", "f");
prm.thrd_basedir = getenv_str("GFAPI_BASEDIR", "/tmp" );
prm.starting_gun_file = getenv_str("GFAPI_STARTING_GUN", "");
prm.workload_str = getenv_str("GFAPI_LOAD", "seq-wr");
prm.io_requests = (uint64_t )getenv_int("GFAPI_IOREQ", 0);
prm.starting_gun_timeout = getenv_int("GFAPI_STARTING_GUN_TIMEOUT", 60);
prm.fsync_at_close = getenv_int("GFAPI_FSYNC_AT_CLOSE", 0);
prm.use_fuse = getenv_int("GFAPI_FUSE", 0);
prm.o_direct = getenv_int("GFAPI_DIRECT", 0) ? O_DIRECT : 0;
prm.o_append = getenv_int("GFAPI_APPEND", 0);
prm.o_overwrite = getenv_int("GFAPI_OVERWRITE", 0);
prm.filecount = getenv_int("GFAPI_FILES", 100);
prm.usec_delay_per_file = getenv_int("GFAPI_USEC_DELAY_PER_FILE", 0);
/* int dirs_per_dir = getenv_int("GFAPI_DIRS_PER_DIR", 1000); */
prm.files_per_dir = getenv_int("GFAPI_FILES_PER_DIR", 1000);
printf("GLUSTER: \n volume=%s\n transport=%s\n host=%s\n port=%d\n fuse?%s\n trace level=%d\n start timeout=%d\n",
prm.glfs_volname, prm.glfs_transport, prm.glfs_hostname, prm.glfs_portnum, prm.use_fuse ? "Yes" : "No", prm.trclvl, prm.starting_gun_timeout );
printf("WORKLOAD:\n type = %s \n threads/proc = %d\n base directory = %s\n prefix=%s\n"
" file size = "UINT64DFMT" KB\n file count = %d\n record size = %u KB"
"\n files/dir=%d\n fsync-at-close? %s \n",
prm.workload_str, prm.threads_per_proc, prm.thrd_basedir, prm.prefix,
prm.filesz_kb, prm.filecount, prm.recsz,
prm.files_per_dir, prm.fsync_at_close?"Yes":"No");
if (prm.o_direct) printf(" forcing use of direct I/O with O_DIRECT flag in open call\n");
if (prm.usec_delay_per_file) printf(" sleeping %d microsec after each file access\n", prm.usec_delay_per_file);
if (argc > 1) usage("glfs_io_test doesn't take command line parameters");
if (prm.o_append && prm.o_overwrite) usage("GFAPI_APPEND and GFAPI_OVERWRITE cannot be used in the same test");
/* validate inputs */
for (j=0; workload_types[j]; j++) {
if (strcmp(workload_types[j], prm.workload_str) == 0)
break;
}
if (!workload_types[j])
usage2("invalid workload type %s", prm.workload_str);
prm.workload_type = j; /* one of WL_* codes */
if (prm.workload_type == WL_SEQRDWRMIX) {
printf( " percent reads = %6.2f\n", prm.rdpct );
if ((prm.o_append == 0) && (prm.o_overwrite == 0)) prm.o_append = 1;
}
if (prm.o_append) printf(" using O_APPEND flag to append to existing files\n");
if (prm.o_overwrite) printf(" overwriting existing files\n");
if (prm.filesz_kb < prm.recsz) {
printf(" truncating record size %u KB to file size %lu KB\n", prm.recsz, prm.filesz_kb );
prm.recsz = prm.filesz_kb;
}
max_io_requests = prm.filesz_kb / prm.recsz;
if (prm.workload_type == WL_RNDRD || prm.workload_type == WL_RNDWR) {
if (prm.io_requests == 0) prm.io_requests = max_io_requests;
printf(" random read/write requests = "UINT64DFMT"\n", prm.io_requests);
if (prm.io_requests > max_io_requests) {
usage("GFAPI_IOREQ too large for file size and record size");
}
} else { /* if sequential workload, do entire file */
prm.io_requests = max_io_requests;
}
if (prm.debug) printf("max_io_requests = %ld\n", (long )max_io_requests);
srandom(time(NULL));
prm.bytes_to_xfer = prm.recsz * BYTES_PER_KB;
/* initialize libgfapi instance */
if (!prm.use_fuse) {
char logfilename[100];
/* mount volume */
glfs_p = glfs_new(prm.glfs_volname);
if (!glfs_p) scallerr("ERROR: could not initialize Gluster volume mount with volname");
sprintf(logfilename, "/tmp/glfs-%d.log", getpid());
if (glfs_set_logging(glfs_p, logfilename, prm.trclvl)) scallerr("set_logging");
if (glfs_set_volfile_server( glfs_p, prm.glfs_transport, prm.glfs_hostname, prm.glfs_portnum ))
scallerr("ERROR: could not initialize gfapi mount");
rc = glfs_init(glfs_p);
if (rc) scallerr("glfs_init");
}
/* allocate and initialize per-thread structure and start each thread */
result_array = (gfapi_result_t * )calloc(prm.threads_per_proc, sizeof(gfapi_result_t));
FOREACH(t, prm.threads_per_proc) {
gfapi_result_t * next_result_p = &result_array[t];
next_result_p->thread_num = t;
rc = pthread_create(&next_result_p->thr, NULL, gfapi_thread_run, next_result_p);
if (rc != OK) scallerr("pthread_create");
}
/* wait for each thread to finish */
FOREACH(t, prm.threads_per_proc) {
void * retval;
gfapi_result_t * next_result_p = &result_array[t];
rc = pthread_join( next_result_p->thr, &retval );
if (rc != OK) {
printf("thread %d return code %d\n", t, rc);
}
if (retval == PTHREAD_CANCELED) {
printf("thread %d cancelled\n", t);
}
if (retval) {
printf("thread %d failed with rc %p\n", t, retval);
}
}
if (!prm.use_fuse) {
rc = glfs_fini(glfs_p);
if (rc < OK) scallerr("glfs_fini");
}
FOREACH(t, prm.threads_per_proc) {
print_result(&result_array[t]);
aggregate_result(&result_array[t], &aggregate);
}
aggregate.thread_num = -1;
print_result(&aggregate);
return OK;
}
More information about the Gluster-devel
mailing list