[Gluster-devel] [RFC PATCH v0 1/1] readv_zcopy() implementation
Bharata B Rao
bharata.rao at gmail.com
Tue Mar 5 14:41:46 UTC 2013
A hack to support zero copy readv, only supports glfs_preadv_async() now.
From: Bharata B Rao <bharata at linux.vnet.ibm.com>
---
api/src/glfs-fops.c | 38 +++++++
api/src/glfs.h | 2
libglusterfs/src/call-stub.c | 63 ++++++++++++
libglusterfs/src/call-stub.h | 17 +++
libglusterfs/src/globals.c | 1
libglusterfs/src/glusterfs.h | 1
libglusterfs/src/syncop.c | 37 +++++++
libglusterfs/src/syncop.h | 4 +
libglusterfs/src/xlator.h | 20 ++++
rpc/rpc-lib/src/protocol-common.h | 1
rpc/rpc-lib/src/rpc-clnt.c | 7 +
rpc/rpc-lib/src/rpc-clnt.h | 2
rpc/rpc-lib/src/rpc-transport.h | 3 +
rpc/rpc-transport/socket/src/socket.c | 103 ++++++++++++++++++-
xlators/cluster/dht/src/dht-common.h | 7 +
xlators/cluster/dht/src/dht-inode-read.c | 92 +++++++++++++++++
xlators/cluster/dht/src/dht.c | 1
xlators/debug/error-gen/src/error-gen.c | 49 +++++++++
xlators/debug/io-stats/src/io-stats.c | 52 ++++++++++
xlators/performance/md-cache/src/md-cache.c | 43 ++++++++
xlators/protocol/client/src/client-rpc-fops.c | 135 +++++++++++++++++++++++++
xlators/protocol/client/src/client.c | 41 ++++++++
xlators/protocol/client/src/client.h | 2
xlators/protocol/server/src/server-rpc-fops.c | 133 +++++++++++++++++++++++++
24 files changed, 848 insertions(+), 6 deletions(-)
diff --git a/api/src/glfs-fops.c b/api/src/glfs-fops.c
index be26dc1..b977a4b 100644
--- a/api/src/glfs-fops.c
+++ b/api/src/glfs-fops.c
@@ -344,6 +344,35 @@ glfs_preadv (struct glfs_fd *glfd, const struct
iovec *iovec, int iovcnt,
return size;
}
+ssize_t
+glfs_preadv_zcopy (struct glfs_fd *glfd, const struct iovec *iovec, int iovcnt,
+ off_t offset, int flags)
+{
+ xlator_t *subvol = NULL;
+ int ret = -1;
+ struct iovec *iov = NULL;
+ int cnt = 0;
+ struct iobref *iobref = NULL;
+
+ __glfs_entry_fd (glfd);
+
+ subvol = glfs_fd_subvol (glfd);
+
+ ret = syncop_readv_zcopy (subvol, glfd->fd, offset,
+ 0, iovec, iovcnt, &iobref);
+ if (ret <= 0)
+ return ret;
+
+ glfd->offset = (offset + ret);
+
+ if (iov)
+ GF_FREE (iov);
+ if (iobref)
+ iobref_unref (iobref);
+
+ return ret;
+}
+
ssize_t
glfs_read (struct glfs_fd *glfd, void *buf, size_t count, int flags)
@@ -425,6 +454,10 @@ glfs_io_async_task (void *data)
ret = glfs_preadv (gio->glfd, gio->iov, gio->count,
gio->offset, gio->flags);
break;
+ case GF_FOP_READ_ZCOPY:
+ ret = glfs_preadv_zcopy (gio->glfd, gio->iov, gio->count,
+ gio->offset, gio->flags);
+ break;
case GF_FOP_WRITE:
ret = glfs_pwritev (gio->glfd, gio->iov, gio->count,
gio->offset, gio->flags);
@@ -464,7 +497,10 @@ glfs_preadv_async (struct glfs_fd *glfd, const
struct iovec *iovec, int count,
return -1;
}
- gio->op = GF_FOP_READ;
+ if (flags & GLUSTERFS_READV_ZCOPY)
+ gio->op = GF_FOP_READ_ZCOPY;
+ else
+ gio->op = GF_FOP_READ;
gio->glfd = glfd;
gio->count = count;
gio->offset = offset;
diff --git a/api/src/glfs.h b/api/src/glfs.h
index e19c1cd..435fe45 100644
--- a/api/src/glfs.h
+++ b/api/src/glfs.h
@@ -48,6 +48,8 @@ __BEGIN_DECLS
struct glfs;
typedef struct glfs glfs_t;
+/* flags for readv variants */
+#define GLUSTERFS_READV_ZCOPY 0x1
/*
SYNOPSIS
diff --git a/libglusterfs/src/call-stub.c b/libglusterfs/src/call-stub.c
index 7bf8613..3f74eb6 100644
--- a/libglusterfs/src/call-stub.c
+++ b/libglusterfs/src/call-stub.c
@@ -918,6 +918,58 @@ out:
return stub;
}
+call_stub_t *
+fop_readv_zcopy_stub (call_frame_t *frame, fop_readv_zcopy_t fn,
+ fd_t *fd, struct iovec *vector, int32_t count, off_t off,
+ uint32_t flags, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+
+ GF_VALIDATE_OR_GOTO ("call-stub", frame, out);
+
+ stub = stub_new (frame, 1, GF_FOP_READ_ZCOPY);
+ GF_VALIDATE_OR_GOTO ("call-stub", stub, out);
+
+ stub->fn.readv = fn;
+ if (fd)
+ stub->args.fd = fd_ref (fd);
+ stub->args.size = iov_length(vector, count);
+ stub->args.offset = off;
+ stub->args.flags = flags;
+
+ if (xdata)
+ stub->args.xdata = dict_ref (xdata);
+out:
+ return stub;
+}
+
+
+call_stub_t *
+fop_readv_zcopy_cbk_stub (call_frame_t *frame, fop_readv_cbk_t fn,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt *stbuf,
+ struct iobref *iobref, dict_t *xdata)
+{
+ call_stub_t *stub = NULL;
+
+ GF_VALIDATE_OR_GOTO ("call-stub", frame, out);
+
+ stub = stub_new (frame, 0, GF_FOP_READ_ZCOPY);
+ GF_VALIDATE_OR_GOTO ("call-stub", stub, out);
+
+ stub->fn_cbk.readv = fn;
+ stub->args_cbk.op_ret = op_ret;
+ stub->args_cbk.op_errno = op_errno;
+ if (op_ret >= 0) {
+ stub->args_cbk.stat = *stbuf;
+ stub->args_cbk.iobref = iobref_ref (iobref);
+ }
+ if (xdata)
+ stub->args_cbk.xdata = dict_ref (xdata);
+out:
+ return stub;
+}
+
call_stub_t *
fop_writev_stub (call_frame_t *frame, fop_writev_t fn,
@@ -2204,6 +2256,12 @@ call_resume_wind (call_stub_t *stub)
stub->args.offset, stub->args.flags,
stub->args.xdata);
break;
+ case GF_FOP_READ_ZCOPY:
+ stub->fn.readv_zcopy (stub->frame, stub->frame->this,
+ stub->args.fd, stub->args.vector,
+ stub->args.count, stub->args.offset,
+ stub->args.flags, stub->args.xdata);
+ break;
case GF_FOP_WRITE:
stub->fn.writev (stub->frame, stub->frame->this,
stub->args.fd, stub->args.vector,
@@ -2439,6 +2497,11 @@ call_resume_unwind (call_stub_t *stub)
stub->args_cbk.count, &stub->args_cbk.stat,
stub->args_cbk.iobref, stub->args_cbk.xdata);
break;
+ case GF_FOP_READ_ZCOPY:
+ STUB_UNWIND (stub, readv_zcopy, stub->args_cbk.vector,
+ stub->args_cbk.count, &stub->args_cbk.stat,
+ stub->args_cbk.iobref, stub->args_cbk.xdata);
+ break;
case GF_FOP_WRITE:
STUB_UNWIND (stub, writev, &stub->args_cbk.prestat,
&stub->args_cbk.poststat, stub->args_cbk.xdata);
diff --git a/libglusterfs/src/call-stub.h b/libglusterfs/src/call-stub.h
index 3351118..ad03f6b 100644
--- a/libglusterfs/src/call-stub.h
+++ b/libglusterfs/src/call-stub.h
@@ -69,6 +69,7 @@ typedef struct {
fop_fxattrop_t fxattrop;
fop_setattr_t setattr;
fop_fsetattr_t fsetattr;
+ fop_readv_zcopy_t readv_zcopy;
} fn;
union {
@@ -113,6 +114,7 @@ typedef struct {
fop_fxattrop_cbk_t fxattrop;
fop_setattr_cbk_t setattr;
fop_fsetattr_cbk_t fsetattr;
+ fop_readv_zcopy_cbk_t readv_zcopy;
} fn_cbk;
struct {
@@ -410,6 +412,21 @@ fop_readv_cbk_stub (call_frame_t *frame,
struct iobref *iobref, dict_t *xdata);
call_stub_t *
+fop_readv_zcopy_stub (call_frame_t *frame,
+ fop_readv_zcopy_t fn,
+ fd_t *fd, struct iovec *vector,
+ int32_t count,
+ off_t off, uint32_t flags, dict_t *xdata);
+
+call_stub_t *
+fop_readv_zcopy_cbk_stub (call_frame_t *frame,
+ fop_readv_zcopy_cbk_t fn,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iatt *stbuf,
+ struct iobref *iobref, dict_t *xdata);
+
+call_stub_t *
fop_writev_stub (call_frame_t *frame,
fop_writev_t fn,
fd_t *fd,
diff --git a/libglusterfs/src/globals.c b/libglusterfs/src/globals.c
index 05ff52c..8bb6458 100644
--- a/libglusterfs/src/globals.c
+++ b/libglusterfs/src/globals.c
@@ -67,6 +67,7 @@ const char *gf_fop_list[GF_FOP_MAXVALUE] = {
[GF_FOP_RELEASE] = "RELEASE",
[GF_FOP_RELEASEDIR] = "RELEASEDIR",
[GF_FOP_FREMOVEXATTR]= "FREMOVEXATTR",
+ [GF_FOP_READ_ZCOPY] = "READ_ZCOPY",
};
/* THIS */
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index 74e6847..ef6067a 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -196,6 +196,7 @@ typedef enum {
GF_FOP_RELEASEDIR,
GF_FOP_GETSPEC,
GF_FOP_FREMOVEXATTR,
+ GF_FOP_READ_ZCOPY,
GF_FOP_MAXVALUE,
} glusterfs_fop_t;
diff --git a/libglusterfs/src/syncop.c b/libglusterfs/src/syncop.c
index c996b8f..0596188 100644
--- a/libglusterfs/src/syncop.c
+++ b/libglusterfs/src/syncop.c
@@ -1095,6 +1095,43 @@ out:
}
+int32_t
+syncop_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, struct iatt *stbuf,
+ struct iobref *iobref, dict_t *xdata)
+{
+ struct syncargs *args = NULL;
+
+ args = cookie;
+
+ INIT_LIST_HEAD (&args->entries.list);
+
+ args->op_ret = op_ret;
+ args->op_errno = op_errno;
+
+ __wake (args);
+
+ return 0;
+
+}
+
+int
+syncop_readv_zcopy (xlator_t *subvol, fd_t *fd, off_t off,
+ uint32_t flags, struct iovec *vector, int count,
+ struct iobref **iobref)
+{
+ struct syncargs args = {0, };
+
+ SYNCOP (subvol, (&args), syncop_readv_zcopy_cbk,
+ subvol->fops->readv_zcopy, fd, vector, count, off, flags, NULL);
+
+ if (args.op_ret < 0)
+ goto out;
+out:
+ errno = args.op_errno;
+ return args.op_ret;
+}
+
int
syncop_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, struct iatt *prebuf,
diff --git a/libglusterfs/src/syncop.h b/libglusterfs/src/syncop.h
index 001c68f..61c4222 100644
--- a/libglusterfs/src/syncop.h
+++ b/libglusterfs/src/syncop.h
@@ -252,6 +252,10 @@ int syncop_readv (xlator_t *subvol, fd_t *fd,
size_t size, off_t off,
uint32_t flags,
/* out */
struct iovec **vector, int *count, struct iobref **iobref);
+int syncop_readv_zcopy (xlator_t *subvol, fd_t *fd, off_t off,
+ uint32_t flags,
+ /* out */
+ struct iovec *vector, int count, struct iobref **iobref);
int syncop_ftruncate (xlator_t *subvol, fd_t *fd, off_t offset);
int syncop_truncate (xlator_t *subvol, loc_t *loc, off_t offset);
diff --git a/libglusterfs/src/xlator.h b/libglusterfs/src/xlator.h
index 1e21b63..30884c3 100644
--- a/libglusterfs/src/xlator.h
+++ b/libglusterfs/src/xlator.h
@@ -261,6 +261,16 @@ typedef int32_t (*fop_readv_cbk_t) (call_frame_t *frame,
struct iatt *stbuf,
struct iobref *iobref, dict_t *xdata);
+typedef int32_t (*fop_readv_zcopy_cbk_t) (call_frame_t *frame,
+ void *cookie,
+ xlator_t *this,
+ int32_t op_ret,
+ int32_t op_errno,
+ struct iovec *vector,
+ int32_t count,
+ struct iatt *stbuf,
+ struct iobref *iobref, dict_t *xdata);
+
typedef int32_t (*fop_writev_cbk_t) (call_frame_t *frame,
void *cookie,
xlator_t *this,
@@ -501,6 +511,14 @@ typedef int32_t (*fop_readv_t) (call_frame_t *frame,
off_t offset,
uint32_t flags, dict_t *xdata);
+typedef int32_t (*fop_readv_zcopy_t) (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset,
+ uint32_t flags, dict_t *xdata);
+
typedef int32_t (*fop_writev_t) (call_frame_t *frame,
xlator_t *this,
fd_t *fd,
@@ -678,6 +696,7 @@ struct xlator_fops {
fop_setattr_t setattr;
fop_fsetattr_t fsetattr;
fop_getspec_t getspec;
+ fop_readv_zcopy_t readv_zcopy;
/* these entries are used for a typechecking hack in
STACK_WIND _only_ */
fop_lookup_cbk_t lookup_cbk;
@@ -722,6 +741,7 @@ struct xlator_fops {
fop_setattr_cbk_t setattr_cbk;
fop_fsetattr_cbk_t fsetattr_cbk;
fop_getspec_cbk_t getspec_cbk;
+ fop_readv_zcopy_cbk_t readv_zcopy_cbk;
};
typedef int32_t (*cbk_forget_t) (xlator_t *this,
diff --git a/rpc/rpc-lib/src/protocol-common.h
b/rpc/rpc-lib/src/protocol-common.h
index 97017e5..e65d806 100644
--- a/rpc/rpc-lib/src/protocol-common.h
+++ b/rpc/rpc-lib/src/protocol-common.h
@@ -56,6 +56,7 @@ enum gf_fop_procnum {
GFS3_OP_RELEASE,
GFS3_OP_RELEASEDIR,
GFS3_OP_FREMOVEXATTR,
+ GFS3_OP_READ_ZCOPY,
GFS3_OP_MAXVALUE,
} ;
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index e6c681d..212f4c7 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -466,6 +466,8 @@ rpc_clnt_fill_request_info (struct rpc_clnt *clnt,
rpc_request_info_t *info)
info->progver = saved_frame.rpcreq->prog->progver;
info->rpc_req = saved_frame.rpcreq;
info->rsp = saved_frame.rsp;
+ info->rsp_payload = saved_frame.rpcreq->rsp_payload;
+ info->rsp_payload_count = saved_frame.rpcreq->rsp_payload_count;
ret = 0;
out:
@@ -1437,6 +1439,11 @@ rpc_clnt_submit (struct rpc_clnt *rpc,
rpc_clnt_prog_t *prog,
rpcreq->xid = callid;
rpcreq->cbkfn = cbkfn;
+ memcpy(rpcreq->rsp_payload, rsp_payload,
+ rsp_payload_count * sizeof (struct iovec));
+ rpcreq->rsp_payload_count = rsp_payload_count;
+
+
ret = -1;
if (proghdr) {
diff --git a/rpc/rpc-lib/src/rpc-clnt.h b/rpc/rpc-lib/src/rpc-clnt.h
index 0da1655..710951e 100644
--- a/rpc/rpc-lib/src/rpc-clnt.h
+++ b/rpc/rpc-lib/src/rpc-clnt.h
@@ -161,6 +161,8 @@ struct rpc_req {
int procnum;
fop_cbk_fn_t cbkfn;
void *conn_private;
+ struct iovec rsp_payload[256]; /* TODO: Allocate this */
+ int32_t rsp_payload_count;
};
typedef struct rpc_clnt {
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 272de9d..8bc2d18 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -158,6 +158,9 @@ struct rpc_request_info {
int procnum;
void *rpc_req; /* struct rpc_req */
rpc_transport_rsp_t rsp;
+ /* TODO: This should ideally reside in @rsp above */
+ struct iovec *rsp_payload;
+ int32_t rsp_payload_count;
};
typedef struct rpc_request_info rpc_request_info_t;
diff --git a/rpc/rpc-transport/socket/src/socket.c
b/rpc/rpc-transport/socket/src/socket.c
index fffc137..1a7302e 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -378,6 +378,9 @@ __socket_cached_read (rpc_transport_t *this,
struct iovec *opvector, int opcount
goto uncached;
}
+ /* TODO */
+ goto uncached;
+
if (!in->ra_max) {
/* first call after passing SP_STATE_READING_FRAGHDR */
in->ra_max = min (RPC_FRAGSIZE (in->fraghdr), GF_SOCKET_RA_MAX);
@@ -1165,6 +1168,72 @@ out:
return ret;
}
+static inline int
+__socket_read_simple_msg_zcopy (rpc_transport_t *this)
+{
+ int ret = 0;
+ uint32_t remaining_size = 0;
+ size_t bytes_read = 0;
+ socket_private_t *priv = NULL;
+ struct gf_sock_incoming *in = NULL;
+ struct gf_sock_incoming_frag *frag = NULL;
+
+ GF_VALIDATE_OR_GOTO ("socket", this, out);
+ GF_VALIDATE_OR_GOTO ("socket", this->private, out);
+
+ priv = this->private;
+
+ in = &priv->incoming;
+ frag = &in->frag;
+
+ switch (frag->simple_state) {
+
+ case SP_STATE_SIMPLE_MSG_INIT:
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ frag->simple_state = SP_STATE_READING_SIMPLE_MSG;
+
+ /* fall through */
+
+ case SP_STATE_READING_SIMPLE_MSG:
+ ret = 0;
+
+ remaining_size = RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read;
+
+ if (remaining_size > 0) {
+ ret = __socket_readv (this,
+ in->pending_vector,
+ in->pending_count,
+ &in->pending_vector,
+ &in->pending_count,
+ &bytes_read);
+ }
+
+ if (ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "reading from socket failed. Error (%s), "
+ "peer (%s)", strerror (errno),
+ this->peerinfo.identifier);
+ break;
+ }
+
+ frag->bytes_read += bytes_read;
+ //frag->fragcurrent += bytes_read;
+
+ if (ret > 0) {
+ gf_log (this->name, GF_LOG_TRACE,
+ "partial read on non-blocking socket.");
+ break;
+ }
+
+ if (ret == 0) {
+ frag->simple_state = SP_STATE_SIMPLE_MSG_INIT;
+ }
+ }
+
+out:
+ return ret;
+}
static inline int
__socket_read_simple_request (rpc_transport_t *this)
@@ -1510,6 +1579,17 @@ __socket_read_accepted_successful_reply
(rpc_transport_t *this)
case SP_STATE_READ_PROC_OPAQUE:
read_proc_opaque:
+ /* Initialize in->pending_vector with user supplied iovec */
+ if (in->request_info &&
+ in->request_info->procnum == GFS3_OP_READ_ZCOPY) {
+ in->pending_vector = in->request_info->rsp_payload;
+ in->pending_count = in->request_info->rsp_payload_count;
+ frag->call_body.reply.accepted_success_state
+ = SP_STATE_READ_PROC_HEADER;
+
+ /* fall through */
+
+ } else {
if (in->payload_vector.iov_base == NULL) {
size = (RPC_FRAGSIZE (in->fraghdr) - frag->bytes_read);
@@ -1541,12 +1621,17 @@ __socket_read_accepted_successful_reply
(rpc_transport_t *this)
frag->call_body.reply.accepted_success_state
= SP_STATE_READ_PROC_HEADER;
-
+ }
/* fall through */
case SP_STATE_READ_PROC_HEADER:
/* now read the entire remaining msg into new iobuf */
- ret = __socket_read_simple_msg (this);
+ if (in->request_info &&
+ in->request_info->procnum == GFS3_OP_READ_ZCOPY) {
+ ret = __socket_read_simple_msg_zcopy (this);
+ } else {
+ ret = __socket_read_simple_msg (this);
+ }
if ((ret == -1)
|| ((ret == 0) && RPC_LASTFRAG (in->fraghdr))) {
frag->call_body.reply.accepted_success_state
@@ -1797,7 +1882,8 @@ __socket_read_reply (rpc_transport_t *this)
}
if ((request_info->prognum == GLUSTER_FOP_PROGRAM)
- && (request_info->procnum == GF_FOP_READ)) {
+ && (request_info->procnum == GF_FOP_READ ||
+ request_info->procnum == GFS3_OP_READ_ZCOPY)) {
if (map_xid && request_info->rsp.rsp_payload_count != 0) {
in->iobref = iobref_ref (request_info->rsp.rsp_iobref);
in->payload_vector = *request_info->rsp.rsp_payload;
@@ -1976,7 +2062,14 @@ __socket_proto_state_machine (rpc_transport_t *this,
/* fall through */
case SP_STATE_READ_FRAGHDR:
-
+ /*
+ * TODO:
+ * IIUC, Memory is allocated for entire payload here,
+ * but in case of readv, payload memory is allocated
+ * again in __socket_read_accepted_successful_reply().
+ * The latter one is actually used to return data to
+ * the caller.
+ */
in->fraghdr = ntoh32 (in->fraghdr);
in->total_bytes_read += RPC_FRAGSIZE(in->fraghdr);
iobuf = iobuf_get2 (this->ctx->iobuf_pool,
@@ -2059,6 +2152,8 @@ __socket_proto_state_machine (rpc_transport_t *this,
in->request_info = NULL;
}
in->record_state = SP_STATE_COMPLETE;
+ in->pending_vector = NULL;
+ in->pending_count = 0;
break;
case SP_STATE_COMPLETE:
diff --git a/xlators/cluster/dht/src/dht-common.h
b/xlators/cluster/dht/src/dht-common.h
index bd00089..e0271f4 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -560,6 +560,13 @@ int32_t dht_readv (call_frame_t *frame,
size_t size,
off_t offset, uint32_t flags, dict_t *xdata);
+int32_t dht_readv_zcopy (call_frame_t *frame,
+ xlator_t *this,
+ fd_t *fd,
+ struct iovec *vector,
+ int32_t count,
+ off_t offset, uint32_t flags, dict_t *xdata);
+
int32_t dht_writev (call_frame_t *frame,
xlator_t *this,
fd_t *fd,
diff --git a/xlators/cluster/dht/src/dht-inode-read.c
b/xlators/cluster/dht/src/dht-inode-read.c
index f17cb73..9438c99 100644
--- a/xlators/cluster/dht/src/dht-inode-read.c
+++ b/xlators/cluster/dht/src/dht-inode-read.c
@@ -420,6 +420,52 @@ out:
}
int
+dht_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int op_ret, int op_errno,
+ struct iatt *stbuf,
+ struct iobref *iobref, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ int ret = 0;
+
+ local = frame->local;
+ if (!local) {
+ op_ret = -1;
+ op_errno = EINVAL;
+ goto out;
+ }
+
+ /* This is already second try, no need for re-check */
+ if (local->call_cnt != 1)
+ goto out;
+
+ if ((op_ret == -1) && (op_errno != ENOENT))
+ goto out;
+
+ if ((op_ret == -1) || IS_DHT_MIGRATION_PHASE2 (stbuf)) {
+ /* File would be migrated to other node */
+ ret = fd_ctx_get (local->fd, this, NULL);
+ if (ret) {
+ local->rebalance.target_op_fn = dht_readv2;
+ ret = dht_rebalance_complete_check (this, frame);
+ } else {
+ /* value is already set in fd_ctx, that means no need
+ to check for whether its complete or not. */
+ dht_readv2 (this, frame, 0);
+ }
+ if (!ret)
+ return 0;
+ }
+
+out:
+ DHT_STRIP_PHASE1_FLAGS (stbuf);
+ DHT_STACK_UNWIND (readv, frame, op_ret, op_errno, NULL, 0, stbuf,
+ iobref, xdata);
+
+ return 0;
+}
+
+int
dht_readv2 (xlator_t *this, call_frame_t *frame, int op_ret)
{
dht_local_t *local = NULL;
@@ -493,6 +539,52 @@ err:
}
int
+dht_readv_zcopy (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, struct iovec *vector, int32_t count, off_t off,
+ uint32_t flags, dict_t *xdata)
+{
+ xlator_t *subvol = NULL;
+ int op_errno = -1;
+ dht_local_t *local = NULL;
+ size_t size;
+
+ VALIDATE_OR_GOTO (frame, err);
+ VALIDATE_OR_GOTO (this, err);
+ VALIDATE_OR_GOTO (fd, err);
+
+ local = dht_local_init (frame, NULL, fd, GF_FOP_READ_ZCOPY);
+ if (!local) {
+ op_errno = ENOMEM;
+ goto err;
+ }
+
+ subvol = local->cached_subvol;
+ if (!subvol) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "no cached subvolume for fd=%p", fd);
+ op_errno = EINVAL;
+ goto err;
+ }
+
+ size = iov_length(vector, count);
+ local->rebalance.offset = off;
+ local->rebalance.size = size;
+ local->rebalance.flags = flags;
+ local->call_cnt = 1;
+
+ STACK_WIND (frame, dht_readv_zcopy_cbk,
+ subvol, subvol->fops->readv_zcopy,
+ fd, vector, count, off, flags, xdata);
+
+ return 0;
+
+err:
+ op_errno = (op_errno == -1) ? errno : op_errno;
+ DHT_STACK_UNWIND (readv, frame, -1, op_errno, NULL, 0, NULL,
NULL, NULL);
+
+ return 0;
+}
+int
dht_access_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int op_ret, int op_errno, dict_t *xdata)
{
diff --git a/xlators/cluster/dht/src/dht.c b/xlators/cluster/dht/src/dht.c
index 784ed92..1289b0f 100644
--- a/xlators/cluster/dht/src/dht.c
+++ b/xlators/cluster/dht/src/dht.c
@@ -599,6 +599,7 @@ struct xlator_fops fops = {
.fxattrop = dht_fxattrop,
.setattr = dht_setattr,
.fsetattr = dht_fsetattr,
+ .readv_zcopy = dht_readv_zcopy,
};
struct xlator_dumpops dumpops = {
diff --git a/xlators/debug/error-gen/src/error-gen.c
b/xlators/debug/error-gen/src/error-gen.c
index 6bdb041..f62784d 100644
--- a/xlators/debug/error-gen/src/error-gen.c
+++ b/xlators/debug/error-gen/src/error-gen.c
@@ -174,7 +174,10 @@ sys_error_t error_no_list[] = {
EROFS,EBADF,EIO}},
[GF_FOP_GETSPEC] = { .error_no_count = 4,
.error_no = {EACCES,EBADF,ENAMETOOLONG,
- EINTR}}
+ EINTR}},
+ [GF_FOP_READ_ZCOPY] = { .error_no_count = 5,
+ .error_no = {EINVAL,EBADF,EFAULT,EISDIR,
+ ENAMETOOLONG}}
};
int
@@ -275,6 +278,8 @@ get_fop_int (char **op_no_str)
return GF_FOP_OPEN;
else if (!strcmp ((*op_no_str), "readv"))
return GF_FOP_READ;
+ else if (!strcmp ((*op_no_str), "readv_zcopy"))
+ return GF_FOP_READ_ZCOPY;
else if (!strcmp ((*op_no_str), "writev"))
return GF_FOP_WRITE;
else if (!strcmp ((*op_no_str), "statfs"))
@@ -1082,6 +1087,47 @@ error_gen_readv (call_frame_t *frame, xlator_t *this,
return 0;
}
+int
+error_gen_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
+{
+ STACK_UNWIND_STRICT (readv_zcopy, frame, op_ret, op_errno,
+ NULL, 0, stbuf, iobref, xdata);
+ return 0;
+}
+
+
+int
+error_gen_readv_zcopy (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, struct iovec *vector, int32_t count, off_t offset,
+ uint32_t flags, dict_t *xdata)
+{
+ int op_errno = 0;
+ eg_t *egp = NULL;
+ int enable = 1;
+
+ egp = this->private;
+ enable = egp->enable[GF_FOP_READ_ZCOPY];
+
+ if (enable)
+ op_errno = error_gen (this, GF_FOP_READ_ZCOPY);
+
+ if (op_errno) {
+ GF_ERROR(this, "unwind(-1, %s)", strerror (op_errno));
+ STACK_UNWIND_STRICT (readv_zcopy, frame, -1, op_errno, NULL, 0,
+ NULL, NULL, xdata);
+ return 0;
+ }
+
+
+ STACK_WIND (frame, error_gen_readv_zcopy_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv_zcopy,
+ fd, vector, count, offset, flags, xdata);
+ return 0;
+}
+
int
error_gen_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -2148,6 +2194,7 @@ struct xlator_fops fops = {
.create = error_gen_create,
.open = error_gen_open,
.readv = error_gen_readv,
+ .readv_zcopy = error_gen_readv_zcopy,
.writev = error_gen_writev,
.statfs = error_gen_statfs,
.flush = error_gen_flush,
diff --git a/xlators/debug/io-stats/src/io-stats.c
b/xlators/debug/io-stats/src/io-stats.c
index 63bb8fa..f3d6660 100644
--- a/xlators/debug/io-stats/src/io-stats.c
+++ b/xlators/debug/io-stats/src/io-stats.c
@@ -1346,6 +1346,40 @@ io_stats_readv_cbk (call_frame_t *frame, void
*cookie, xlator_t *this,
}
+int
+io_stats_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt *buf, struct iobref *iobref, dict_t *xdata)
+{
+ //int len = 0;
+ fd_t *fd = NULL;
+ struct ios_stat *iosstat = NULL;
+
+ fd = frame->local;
+ frame->local = NULL;
+
+#if 0
+ if (op_ret > 0) {
+ len = iov_length (vector, count);
+ BUMP_READ (fd, len);
+ }
+#endif
+
+ UPDATE_PROFILE_STATS (frame, READ);
+ ios_inode_ctx_get (fd->inode, this, &iosstat);
+
+ if (iosstat) {
+ BUMP_STATS (iosstat, IOS_STATS_TYPE_READ);
+ BUMP_THROUGHPUT (iosstat, IOS_STATS_THRU_READ);
+ iosstat = NULL;
+ }
+
+ STACK_UNWIND_STRICT (readv, frame, op_ret, op_errno,
+ NULL, 0, buf, iobref, xdata);
+ return 0;
+
+}
+
int
io_stats_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -2074,6 +2108,23 @@ io_stats_readv (call_frame_t *frame, xlator_t *this,
int
+io_stats_readv_zcopy (call_frame_t *frame, xlator_t *this,
+ fd_t *fd, struct iovec *vector, int32_t count, off_t offset,
+ uint32_t flags, dict_t *xdata)
+{
+ frame->local = fd;
+
+ START_FOP_LATENCY (frame);
+
+ STACK_WIND (frame, io_stats_readv_zcopy_cbk,
+ FIRST_CHILD(this),
+ FIRST_CHILD(this)->fops->readv_zcopy,
+ fd, vector, count, offset, flags, xdata);
+ return 0;
+}
+
+
+int
io_stats_writev (call_frame_t *frame, xlator_t *this,
fd_t *fd, struct iovec *vector,
int32_t count, off_t offset,
@@ -2790,6 +2841,7 @@ struct xlator_fops fops = {
.truncate = io_stats_truncate,
.open = io_stats_open,
.readv = io_stats_readv,
+ .readv_zcopy = io_stats_readv_zcopy,
.writev = io_stats_writev,
.statfs = io_stats_statfs,
.flush = io_stats_flush,
diff --git a/xlators/performance/md-cache/src/md-cache.c
b/xlators/performance/md-cache/src/md-cache.c
index 0c5ca87..0c3d6b2 100644
--- a/xlators/performance/md-cache/src/md-cache.c
+++ b/xlators/performance/md-cache/src/md-cache.c
@@ -1385,6 +1385,48 @@ mdc_readv (call_frame_t *frame, xlator_t *this,
fd_t *fd, size_t size,
return 0;
}
+int
+mdc_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
+{
+ mdc_local_t *local = NULL;
+
+ local = frame->local;
+
+ if (op_ret != 0)
+ goto out;
+
+ if (!local)
+ goto out;
+
+ mdc_inode_iatt_set (this, local->fd->inode, stbuf);
+
+out:
+ MDC_STACK_UNWIND (readv, frame, op_ret, op_errno, NULL, 0,
+ stbuf, iobref, xdata);
+
+ return 0;
+}
+
+
+int
+mdc_readv_zcopy (call_frame_t *frame, xlator_t *this, fd_t *fd,
+ struct iovec *vector, int32_t count, off_t offset, uint32_t flags,
+ dict_t *xdata)
+{
+ mdc_local_t *local = NULL;
+
+ local = mdc_local_get (frame);
+
+ local->fd = fd_ref (fd);
+
+ STACK_WIND (frame, mdc_readv_zcopy_cbk,
+ FIRST_CHILD(this), FIRST_CHILD(this)->fops->readv_zcopy,
+ fd, vector, count, offset, flags, xdata);
+ return 0;
+}
+
int
mdc_writev_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
@@ -1947,6 +1989,7 @@ struct xlator_fops fops = {
.link = mdc_link,
.create = mdc_create,
.readv = mdc_readv,
+ .readv_zcopy = mdc_readv_zcopy,
.writev = mdc_writev,
.setattr = mdc_setattr,
.fsetattr = mdc_fsetattr,
diff --git a/xlators/protocol/client/src/client-rpc-fops.c
b/xlators/protocol/client/src/client-rpc-fops.c
index f524c1a..b05c003 100644
--- a/xlators/protocol/client/src/client-rpc-fops.c
+++ b/xlators/protocol/client/src/client-rpc-fops.c
@@ -2712,6 +2712,79 @@ out:
}
int
+client3_3_readv_zcopy_cbk (struct rpc_req *req, struct iovec *iov, int count,
+ void *myframe)
+{
+ call_frame_t *frame = NULL;
+ struct iobref *iobref = NULL;
+ struct iovec vector[MAX_IOVEC] = {{0}, };
+ struct iatt stat = {0,};
+ gfs3_read_rsp rsp = {0,};
+ int ret = 0, rspcount = 0;
+ clnt_local_t *local = NULL;
+ xlator_t *this = NULL;
+ dict_t *xdata = NULL;
+
+ this = THIS;
+
+ memset (vector, 0, sizeof (vector));
+
+ frame = myframe;
+ local = frame->local;
+
+ if (-1 == req->rpc_status) {
+ rsp.op_ret = -1;
+ rsp.op_errno = ENOTCONN;
+ goto out;
+ }
+
+ ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gfs3_read_rsp);
+ if (ret < 0) {
+ gf_log (this->name, GF_LOG_ERROR, "XDR decoding failed");
+ rsp.op_ret = -1;
+ rsp.op_errno = EINVAL;
+ goto out;
+ }
+
+ if (rsp.op_ret != -1) {
+ iobref = req->rsp_iobref;
+ gf_stat_to_iatt (&rsp.stat, &stat);
+
+ vector[0].iov_len = rsp.op_ret;
+ if (rsp.op_ret > 0)
+ vector[0].iov_base = req->rsp[1].iov_base;
+ rspcount = 1;
+ }
+ GF_PROTOCOL_DICT_UNSERIALIZE (this, xdata, (rsp.xdata.xdata_val),
+ (rsp.xdata.xdata_len), ret,
+ rsp.op_errno, out);
+
+#ifdef GF_TESTING_IO_XDATA
+ dict_dump (xdata);
+#endif
+
+out:
+ if (rsp.op_ret == -1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "remote operation failed: %s",
+ strerror (gf_error_to_errno (rsp.op_errno)));
+ } else if (rsp.op_ret >= 0) {
+ if (local->attempt_reopen)
+ client_attempt_reopen (local->fd, this);
+ }
+ CLIENT_STACK_UNWIND (readv_zcopy, frame, rsp.op_ret,
+ gf_error_to_errno (rsp.op_errno),
vector, rspcount,
+ &stat, iobref, xdata);
+
+ free (rsp.xdata.xdata_val);
+
+ if (xdata)
+ dict_unref (xdata);
+
+ return 0;
+}
+
+int
client3_3_release_cbk (struct rpc_req *req, struct iovec *iov, int count,
void *myframe)
{
@@ -3955,6 +4028,7 @@ client3_3_readv (call_frame_t *frame, xlator_t *this,
}
local->iobref = rsp_iobref;
+ local->iobref = NULL;
rsp_iobref = NULL;
GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val),
@@ -3986,6 +4060,65 @@ unwind:
return 0;
}
+int32_t
+client3_3_readv_zcopy (call_frame_t *frame, xlator_t *this,
+ void *data)
+{
+ clnt_args_t *args = NULL;
+ int64_t remote_fd = -1;
+ clnt_conf_t *conf = NULL;
+ clnt_local_t *local = NULL;
+ int op_errno = ESTALE;
+ gfs3_read_req req = {{0,},};
+ int ret = 0;
+
+ if (!frame || !this || !data)
+ goto unwind;
+
+ args = data;
+ conf = this->private;
+
+ CLIENT_GET_REMOTE_FD (this, args->fd, FALLBACK_TO_ANON_FD,
+ remote_fd, op_errno, unwind);
+ ret = client_fd_fop_prepare_local (frame, args->fd, remote_fd);
+ if (ret) {
+ op_errno = -ret;
+ goto unwind;
+ }
+ local = frame->local;
+
+ req.size = args->size;
+ req.offset = args->offset;
+ req.fd = remote_fd;
+ req.flag = args->flags;
+
+ memcpy (req.gfid, args->fd->inode->gfid, 16);
+
+ GF_PROTOCOL_DICT_SERIALIZE (this, args->xdata, (&req.xdata.xdata_val),
+ req.xdata.xdata_len, op_errno, unwind);
+
+ ret = client_submit_request (this, &req, frame, conf->fops,
+ GFS3_OP_READ_ZCOPY,
client3_3_readv_zcopy_cbk, NULL,
+ NULL, 0, args->payload_vector,
+ args->payload_count,
+ local->iobref,
+ (xdrproc_t)xdr_gfs3_read_req);
+ if (ret) {
+ //unwind is done in the cbk
+ gf_log (this->name, GF_LOG_WARNING, "failed to send the fop");
+ }
+
+ GF_FREE (req.xdata.xdata_val);
+
+ return 0;
+unwind:
+
+ CLIENT_STACK_UNWIND (readv_zcopy, frame, -1, op_errno, NULL,
0, NULL, NULL, NULL);
+ GF_FREE (req.xdata.xdata_val);
+
+ return 0;
+}
+
int32_t
client3_3_writev (call_frame_t *frame, xlator_t *this, void *data)
@@ -5845,6 +5978,7 @@ rpc_clnt_procedure_t
clnt3_3_fop_actors[GF_FOP_MAXVALUE] = {
[GF_FOP_RELEASEDIR] = { "RELEASEDIR", client3_3_releasedir },
[GF_FOP_GETSPEC] = { "GETSPEC", client3_getspec },
[GF_FOP_FREMOVEXATTR] = { "FREMOVEXATTR", client3_3_fremovexattr },
+ [GF_FOP_READ_ZCOPY] = { "READ_ZCOPY", client3_3_readv_zcopy },
};
/* Used From RPC-CLNT library to log proper name of procedure based
on number */
@@ -5893,6 +6027,7 @@ char *clnt3_3_fop_names[GFS3_OP_MAXVALUE] = {
[GFS3_OP_RELEASE] = "RELEASE",
[GFS3_OP_RELEASEDIR] = "RELEASEDIR",
[GFS3_OP_FREMOVEXATTR] = "FREMOVEXATTR",
+ [GFS3_OP_READ_ZCOPY] = "READ_ZCOPY",
};
rpc_clnt_prog_t clnt3_3_fop_prog = {
diff --git a/xlators/protocol/client/src/client.c
b/xlators/protocol/client/src/client.c
index 931c671..5d4e9d1 100644
--- a/xlators/protocol/client/src/client.c
+++ b/xlators/protocol/client/src/client.c
@@ -923,6 +923,46 @@ out:
}
+int32_t
+client_readv_zcopy (call_frame_t *frame, xlator_t *this, fd_t *fd,
+ struct iovec *vector, int32_t count,
+ off_t offset, uint32_t flags, dict_t *xdata)
+{
+ int ret = -1;
+ clnt_conf_t *conf = NULL;
+ rpc_clnt_procedure_t *proc = NULL;
+ clnt_args_t args = {0,};
+
+ conf = this->private;
+ if (!conf || !conf->fops)
+ goto out;
+
+ args.fd = fd;
+ args.size = iov_length(vector, count);
+ args.offset = offset;
+ args.flags = flags;
+ args.xdata = xdata;
+ args.payload_vector = vector;
+ args.payload_count = count;
+
+ proc = &conf->fops->proctable[GF_FOP_READ_ZCOPY];
+ if (!proc) {
+ gf_log (this->name, GF_LOG_ERROR,
+ "rpc procedure not found for %s",
+ gf_fop_list[GF_FOP_READ]);
+ goto out;
+ }
+ if (proc->fn)
+ ret = proc->fn (frame, this, &args);
+
+out:
+ if (ret)
+ STACK_UNWIND_STRICT (readv_zcopy, frame, -1, ENOTCONN,
+ NULL, 0, NULL, NULL, NULL);
+
+ return 0;
+}
+
int32_t
@@ -2636,6 +2676,7 @@ struct xlator_fops fops = {
.truncate = client_truncate,
.open = client_open,
.readv = client_readv,
+ .readv_zcopy = client_readv_zcopy,
.writev = client_writev,
.statfs = client_statfs,
.flush = client_flush,
diff --git a/xlators/protocol/client/src/client.h
b/xlators/protocol/client/src/client.h
index 0a27c09..39c3bf8 100644
--- a/xlators/protocol/client/src/client.h
+++ b/xlators/protocol/client/src/client.h
@@ -195,6 +195,8 @@ typedef struct client_args {
mode_t umask;
dict_t *xdata;
+ struct iovec *payload_vector;
+ int32_t payload_count;
} clnt_args_t;
typedef ssize_t (*gfs_serialize_t) (struct iovec outmsg, void *args);
diff --git a/xlators/protocol/server/src/server-rpc-fops.c
b/xlators/protocol/server/src/server-rpc-fops.c
index f44ced4..f16187d 100644
--- a/xlators/protocol/server/src/server-rpc-fops.c
+++ b/xlators/protocol/server/src/server-rpc-fops.c
@@ -1515,6 +1515,55 @@ out:
}
int
+server_readv_zcopy_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno,
+ struct iovec *vector, int32_t count,
+ struct iatt *stbuf, struct iobref *iobref, dict_t *xdata)
+{
+ gfs3_read_rsp rsp = {0,};
+ server_state_t *state = NULL;
+ rpcsvc_request_t *req = NULL;
+
+ req = frame->local;
+ state = CALL_STATE(frame);
+
+#ifdef GF_TESTING_IO_XDATA
+ {
+ int ret = 0;
+ if (!xdata)
+ xdata = dict_new ();
+
+ ret = dict_set_str (xdata, "testing-the-xdata-key",
+ "testing-xdata-value");
+ }
+#endif
+ GF_PROTOCOL_DICT_SERIALIZE (this, xdata, (&rsp.xdata.xdata_val),
+ rsp.xdata.xdata_len, op_errno, out);
+
+ if (op_ret < 0) {
+ gf_log (this->name, GF_LOG_INFO,
+ "%"PRId64": READV_ZCOPY %"PRId64" (%s) ==> (%s)",
+ frame->root->unique, state->resolve.fd_no,
+ uuid_utoa (state->resolve.gfid), strerror (op_errno));
+ goto out;
+ }
+
+ gf_stat_from_iatt (&rsp.stat, stbuf);
+ rsp.size = op_ret;
+
+out:
+ rsp.op_ret = op_ret;
+ rsp.op_errno = gf_errno_to_error (op_errno);
+
+ server_submit_reply (frame, req, &rsp, vector, count, iobref,
+ (xdrproc_t)xdr_gfs3_read_rsp);
+
+ GF_FREE (rsp.xdata.xdata_val);
+
+ return 0;
+}
+
+int
server_rchecksum_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
uint32_t weak_checksum, uint8_t *strong_checksum,
@@ -2749,6 +2798,27 @@ err:
return 0;
}
+int
+server_readv_zcopy_resume (call_frame_t *frame, xlator_t *bound_xl)
+{
+ server_state_t *state = NULL;
+
+ state = CALL_STATE (frame);
+
+ if (state->resolve.op_ret != 0)
+ goto err;
+
+ STACK_WIND (frame, server_readv_cbk,
+ bound_xl, bound_xl->fops->readv,
+ state->fd, state->size, state->offset,
state->flags, state->xdata);
+
+ return 0;
+err:
+ server_readv_cbk (frame, NULL, frame->this, state->resolve.op_ret,
+ state->resolve.op_errno, NULL, 0, NULL, NULL, NULL);
+ return 0;
+}
+
int
server_create_resume (call_frame_t *frame, xlator_t *bound_xl)
@@ -3347,6 +3417,68 @@ out:
return ret;
}
+int
+server3_3_readv_zcopy (rpcsvc_request_t *req)
+{
+ server_state_t *state = NULL;
+ call_frame_t *frame = NULL;
+ gfs3_read_req args = {{0,},};
+ int ret = -1;
+ int op_errno = 0;
+
+ if (!req)
+ goto out;
+
+ ret = xdr_to_generic (req->msg[0], &args,
(xdrproc_t)xdr_gfs3_read_req);
+ if (ret < 0) {
+ //failed to decode msg;
+ req->rpc_err = GARBAGE_ARGS;
+ goto out;
+ }
+
+ frame = get_frame_from_request (req);
+ if (!frame) {
+ // something wrong, mostly insufficient memory
+ req->rpc_err = GARBAGE_ARGS; /* TODO */
+ goto out;
+ }
+ /*
+ * TODO: ZCOPY client requests are treated as normal READ requests
+ * in server
+ */
+ frame->root->op = GF_FOP_READ;
+
+ state = CALL_STATE (frame);
+ if (!state->conn->bound_xl) {
+ /* auth failure, request on subvolume without setvolume */
+ req->rpc_err = GARBAGE_ARGS;
+ goto out;
+ }
+
+ state->resolve.type = RESOLVE_MUST;
+ state->resolve.fd_no = args.fd;
+ state->size = args.size;
+ state->offset = args.offset;
+ state->flags = args.flag;
+
+ memcpy (state->resolve.gfid, args.gfid, 16);
+
+ GF_PROTOCOL_DICT_UNSERIALIZE (state->conn->bound_xl, state->xdata,
+ (args.xdata.xdata_val),
+ (args.xdata.xdata_len), ret,
+ op_errno, out);
+
+ ret = 0;
+ resolve_and_resume (frame, server_readv_resume);
+out:
+ /* memory allocated by libc, don't use GF_FREE */
+ free (args.xdata.xdata_val);
+
+ if (op_errno)
+ req->rpc_err = GARBAGE_ARGS;
+
+ return ret;
+}
int
server3_3_writev (rpcsvc_request_t *req)
@@ -5760,6 +5892,7 @@ rpcsvc_actor_t glusterfs3_3_fop_actors[] = {
[GFS3_OP_RELEASE] = { "RELEASE", GFS3_OP_RELEASE,
server3_3_release, NULL, 0},
[GFS3_OP_RELEASEDIR] = { "RELEASEDIR", GFS3_OP_RELEASEDIR,
server3_3_releasedir, NULL, 0},
[GFS3_OP_FREMOVEXATTR] = { "FREMOVEXATTR",
GFS3_OP_FREMOVEXATTR, server3_3_fremovexattr, NULL, 0},
+ [GFS3_OP_READ_ZCOPY] = { "READ_ZCOPY", GFS3_OP_READ_ZCOPY,
server3_3_readv_zcopy, NULL, 0},
};
--
http://raobharata.wordpress.com/
More information about the Gluster-devel
mailing list