[Gluster-users] GlusterFS and inotify (more info)
Harshavardhana
harsha at gluster.com
Tue Jan 26 13:18:31 UTC 2010
Hi Larry,
May i know the glusterfs version under use?. Can you use stat-prefetch
on
the client side above write-behind?.
--
Harshavardhana
Gluster - http://www.gluster.com
On Tue, Jan 26, 2010 at 6:41 PM, Larry Bates <larry.bates at vitalesafe.com>wrote:
> Sure if you want to take a look.
>
>
>
> -Larry
>
>
>
> Two servers (gfs01 and gfs02) configured with following scripts (they
> differ only by server names):
>
>
>
> volume vol1
>
> type storage/posix # POSIX FS translator
>
> option directory /mnt/glusterfs/vol1 # Export this directory
>
> option background-unlink yes # unlink in background
>
> end-volume
>
>
>
> volume vol2
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol2
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume vol3
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol3
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume vol4
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol4
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume vol5
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol5
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume vol6
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol6
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume vol7
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol7
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume vol8
>
> type storage/posix
>
> option directory /mnt/glusterfs/vol8
>
> option background-unlink yes
>
> end-volume
>
>
>
> volume iot1
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol1
>
> end-volume
>
>
>
> volume iot2
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol2
>
> end-volume
>
>
>
> volume iot3
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol3
>
> end-volume
>
>
>
> volume iot4
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol4
>
> end-volume
>
>
>
> volume iot5
>
> type performance/io-threads
>
> option thread-count 8
>
> subvolumes vol5
>
> end-volume
>
>
>
> volume iot6
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol6
>
> end-volume
>
>
>
> volume iot7
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol7
>
> end-volume
>
>
>
> volume iot8
>
> type performance/io-threads
>
> option thread-count 4
>
> subvolumes vol8
>
> end-volume
>
>
>
> volume gfs01brick1
>
> type features/locks
>
> subvolumes iot1
>
> end-volume
>
>
>
> volume gfs01brick2
>
> type features/locks
>
> subvolumes iot2
>
> end-volume
>
>
>
> volume gfs01brick3
>
> type features/locks
>
> subvolumes iot3
>
> end-volume
>
>
>
> volume gfs01brick4
>
> type features/locks
>
> subvolumes iot4
>
> end-volume
>
>
>
> volume gfs01brick5
>
> type features/locks
>
> subvolumes iot5
>
> end-volume
>
>
>
> volume gfs01brick6
>
> type features/locks
>
> subvolumes iot6
>
> end-volume
>
>
>
> volume gfs01brick7
>
> type features/locks
>
> subvolumes iot7
>
> end-volume
>
>
>
> volume gfs01brick8
>
> type features/locks
>
> subvolumes iot8
>
> end-volume
>
>
>
> ## Add network serving capability to volumes
>
> volume server
>
> type protocol/server
>
> option transport-type tcp # For TCP/IP transport
>
> #
>
> # Expose all the bricks to the clients
>
> #
>
> subvolumes gfs01brick1 gfs01brick2 gfs01brick3 gfs01brick4 gfs01brick5
> gfs01brick6 gfs01brick7 gfs01brick8
>
> option auth.addr.gfs01brick1.allow 10.0.0.*
>
> option auth.addr.gfs01brick2.allow 10.0.0.*
>
> option auth.addr.gfs01brick3.allow 10.0.0.*
>
> option auth.addr.gfs01brick4.allow 10.0.0.*
>
> option auth.addr.gfs01brick5.allow 10.0.0.*
>
> option auth.addr.gfs01brick6.allow 10.0.0.*
>
> option auth.addr.gfs01brick7.allow 10.0.0.*
>
> option auth.addr.gfs01brick8.allow 10.0.0.*
>
> end-volume
>
>
>
> Client config:
>
>
>
> #
>
> # Add client feature and attach to remote subvolumes of gfs01
>
> #
>
> volume gfs01vol1
>
> type protocol/client
>
> option transport-type tcp # for TCP/IP transport
>
> option remote-host gfs01 # IP/DNS address of the remote
> volume
>
> option remote-subvolume gfs01brick1 # name of the remote volume
>
> option transport.socket.nodelay on # undocumented option for speed
>
> end-volume
>
>
>
> volume gfs01vol2
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick2
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol3
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick3
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol4
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick4
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol5
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick5
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol6
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick6
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol7
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick7
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs01vol8
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs01
>
> option remote-subvolume gfs01brick8
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> #
>
> # Add client feature and attach to remote subvolumes of gfs02
>
> #
>
> volume gfs02vol1
>
> type protocol/client
>
> option transport-type tcp # for TCP/IP transport
>
> option remote-host gfs02 # IP/DNS address of the remote
> volume
>
> option remote-subvolume gfs02brick1 # name of the remote volume
>
> option transport.socket.nodelay on # undocumented option for speed
>
> end-volume
>
>
>
> volume gfs02vol2
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick2
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol3
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick3
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol4
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick4
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol5
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick5
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol6
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick6
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol7
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick7
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> volume gfs02vol8
>
> type protocol/client
>
> option transport-type tcp
>
> option remote-host gfs02
>
> option remote-subvolume gfs02brick8
>
> option transport.socket.nodelay on
>
> end-volume
>
>
>
> #
>
> # Replicate volumes
>
> #
>
> volume afr-vol1
>
> type cluster/replicate
>
> subvolumes gfs01vol1 gfs02vol1
>
> end-volume
>
>
>
> volume afr-vol2
>
> type cluster/replicate
>
> subvolumes gfs01vol2 gfs02vol2
>
> end-volume
>
>
>
> volume afr-vol3
>
> type cluster/replicate
>
> subvolumes gfs01vol3 gfs02vol3
>
> end-volume
>
>
>
> volume afr-vol4
>
> type cluster/replicate
>
> subvolumes gfs01vol4 gfs02vol4
>
> end-volume
>
>
>
> volume afr-vol5
>
> type cluster/replicate
>
> subvolumes gfs01vol5 gfs02vol5
>
> end-volume
>
>
>
> volume afr-vol6
>
> type cluster/replicate
>
> subvolumes gfs01vol6 gfs02vol6
>
> end-volume
>
>
>
> volume afr-vol7
>
> type cluster/replicate
>
> subvolumes gfs01vol7 gfs02vol7
>
> end-volume
>
>
>
> volume afr-vol8
>
> type cluster/replicate
>
> subvolumes gfs01vol8 gfs02vol8
>
> end-volume
>
>
>
> #
>
> # Distribute files across bricks
>
> #
>
> volume dht-vol
>
> type cluster/distribute
>
> subvolumes afr-vol1 afr-vol2 afr-vol3 afr-vol4 afr-vol5 afr-vol6 afr-vol7
> afr-vol8
>
> option min-free-disk 2% # 2% of 1.8Tb volumes is 36Gb
>
> end-volume
>
>
>
> #
>
> # Writebehind performance addition
>
> #
>
> volume writebehind
>
> type performance/write-behind
>
> subvolumes dht-vol
>
> option flush-behind on # default value is 'off'
>
> option cache-size 3MB
>
> end-volume
>
>
>
> The script is here, but I would need to include a bunch of things that get
> imported for it to run for you.
>
>
>
> import os
>
> import sys
>
> import time
>
> import getopt
>
> import signal
>
> import shutil
>
> import ConfigParser
>
> from ConfigParser import NoOptionError
>
> #
>
> # Get helpers
>
> #
>
> from loggerClass import loggerClass, loggerClassMixin
>
> import VESutils
>
> from VESutils import signon, getFromINI, blobpathfromblobid,
> blobidfromblobpath
>
> from VESutils import getAccountIdFromBlobId, getMemberInfo
>
> from VESutils import epochtime2S3time
>
> from VESutils import elapsedTimeToString
>
> from fmt_wcommas import FMC
>
> from singleinstance import singleinstance
>
> from globalconfig import globalinifilename
>
> #
>
> # Get S3 connection class
>
> #
>
> from boto.s3.connection import S3Connection, S3ResponseError
>
> from boto.exception import S3ResponseError, S3CreateError, S3DataError
>
>
>
> import pyinotify
>
> #from pyinotify import WatchManager, Notifier, ProcessEvent
>
> from xattr import xattr
>
> #
>
> # Get postgreSQL DBI interface
>
> #
>
> import psycopg2
>
>
>
> class Watcher(pyinotify.ProcessEvent, loggerClassMixin):
>
> '''
>
> default maximum for max_user_watches is 8192 (FC5)
>
> set by logging in as root and entering following command:
>
>
>
> sysctl -w fs.inotify.max_user_watches=65536
>
> '''
>
>
>
> def __init__(self, conn, bucket,
>
> logf = None, _trace = False, _debug = 0, _dryrun = False):
>
>
>
> '''
>
> MemberInfo - dictionary-like object holding cached websafe.members
> info
>
> bucket = S3 bucket instance where new files are to be uploaded
>
> '''
>
> self.conn = conn
>
> self.bucket = bucket
>
> self._trace = _trace
>
> self._debug = _debug
>
> self._dryrun = _dryrun
>
> #self.BACKUPS = '/mnt/BACKUPS/blobdata'
>
> self.BACKUPS = None
>
> #
>
> # Cache members_info as I go to save DB accesses
>
> #
>
> self.members_info = dict()
>
> #
>
> # If user passed loggerClass instance use it, otherwise logging
> will
>
> # go to screen (as provided for by loggerClassMixin.LOGF).
>
> #
>
> if logf is not None:
>
> self.logf = logf
>
>
>
> pyinotify.ProcessEvent.__init__(self)
>
> self.len_eventq = 0
>
> self.progress_count = 0
>
>
>
> def process_PROGRESS(self, notifier):
>
> len_eventq = len(notifier._eventq)
>
> LOGF = self.LOGF
>
> LM = "PROGRESS"
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
> LOGF("I", LM, "len_eventq=%s" % FMC(len_eventq))
>
>
>
> self.progress_count += 1
>
> #
>
> # If eventq is shorter than last time, notifier didn't call
>
> # .read_events() so I might need to do it here. This code needs a
>
> # second look because there are LONG pauses currently.
>
> #
>
> if len_eventq < self.len_eventq:
>
> #
>
> # It is too expensive to update the eventq on every file that
>
> # is processed, so I will do it on every 1000th file as a
>
> # compromise.
>
> #
>
> if (self.progress_count % 1000) == 0:
>
> notifier.read_events()
>
>
>
> self.len_eventq = len_eventq
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
>
>
> def process_IN_Q_OVERFLOW(self, event):
>
> '''
>
> process_IN_Q_OVERFLOW - this is fired when events queue overflows.
>
> '''
>
> LM = 'process_IN_Q_OVERFLOW'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> LOGF("E", LM, "Queue overflow, set max_queued_events higher")
>
> LOGF("E", LM, "sudo /sbin/sysctl -w
> fs.inotify.max_queued_events=#####")
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> raise OverflowError('len_eventq= %s' % FMC(self.self.len_eventq))
>
>
>
>
>
> def process_IN_MOVED_TO(self, event):
>
> '''
>
> process_IN_MOVED_TO - this is fired when upload .tmp file is moved
> to
>
> its final resting place.
>
> '''
>
> LM = 'process_IN_MOVED_TO'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> self._backup('M', event)
>
> self._upload('M', event)
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
>
>
> def process_IN_DELETE(self, event):
>
> LM = 'process_IN_DELETE'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> self._delete(event)
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
>
>
> def _delete(self, event):
>
> LM = '_delete'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> src = os.path.join(event.path, event.name)
>
> blob_id = blobidfromblobpath(src)
>
> if self._debug:
>
> LOGF("I", LM, "[%s] file=%s" % (event.name, src))
>
>
>
> #
>
> # Make sure a .tmp file didn't trigger this event
>
> #
>
> if event.name.endswith('.tmp') or \
>
> event.name.startswith('.'):
>
>
>
> if self._debug:
>
> LOGF("I", LM, ".tmp file, skipped")
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> return
>
>
>
> #
>
> # Also make sure that the file is a blob. blobs have 28 character
>
> # hex filenames.
>
> #
>
> if len(os.path.basename(event.name)) != 28:
>
> if self._debug:
>
> LOGF("I", LM, "%s non-blob file deletion skiped")
>
> return
>
>
>
> #
>
> # Make sure file hasn't "disappeared"
>
> #
>
> if not os.path.exists(src):
>
> LOGF("W", LM, "src=%s, disappeared, skipped" % src)
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> return
>
>
>
> #
>
> # Get S3path and delete the blob from S3
>
> #
>
> S3path = blobpathfromblobid('/', blob_id)
>
> result = self.bucket.delete_key(S3path)
>
> #
>
> # See if I've got the information in file_data
>
> #
>
> email = None
>
> account_id = getAccountIdFromBlobId(self.conn, blob_id)
>
> if account_id is not None:
>
> member_info = self.members_info.get(account_id,
>
> getMemberInfo(self.conn, account_id =
> account_id)
>
> )
>
>
>
> if member_info is not None:
>
> email = member_info['email']
>
>
>
> else:
>
> LOGF("W", LM, "blob_id=%s not found in file_data" % \
>
> blob_id)
>
>
>
> LOGF("I", LM, "[D]blob_id=%s email=%s" % (blob_id, email))
>
> #
>
> # If we are keeping local backup, delete it also
>
> #
>
> if self.BACKUPS is not None:
>
> dst = blobpathfromblobid(self.BACKUPS, blob_id)
>
> try:
>
> os.unlink(dst)
>
>
>
> except: # Ugly but it works!
>
> pass
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> def process_IN_ATTRIB(self, event):
>
> '''
>
> process_IN_ATTRIB - this is fired when an blob file has an
> attribute
>
> changed. Normally attributes won't change, but
>
> this can be used to trigger me to do an upload
>
> on a blob file after-the-fact and provides for
> a
>
> 'self-healing' filesystem.
>
> '''
>
> LM = 'process_IN_ATTRIB'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> self._backup('A', event)
>
> self._upload('A',event)
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> def _backup(self, parent, event):
>
> LM = '_backup'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> if self._debug > 1:
>
> LOGF("D", LM, "event.path=%s, event.name=%s" % \
>
> (event.path, event.name))
>
>
>
> if self.BACKUPS is not None:
>
> src = os.path.join(event.path, event.name)
>
> blob_id = blobidfromblobpath(src)
>
> if self._debug:
>
> LOGF("D", "[%s]" % parent, "src=%s" % src)
>
>
>
> #
>
> # Make sure a non-blob (.tmp/hidden) file didn't trigger this
> event
>
> #
>
> if event.name.endswith('.tmp') or \
>
> event.name.startswith('.') or \
>
> len(os.path.basename(event.name)) != 28:
>
>
>
> if self._debug:
>
> LOGF("I", LM, "non-blob file, skipped")
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> return
>
>
>
> #
>
> # Copy the file to backup folder (iff it doesn't exist)
>
> #
>
> dst = blobpathfromblobid(self.BACKUPS, blob_id)
>
> if not os.path.exists(dst):
>
> try:
>
> shutil.copy2(src, dst)
>
>
>
> except:
>
> LOGF("E", LM, "%s->%s backup failed" % (src, dst))
>
> ##raise
>
>
>
> else:
>
> if self._debug:
>
> LOGF("E", LM, "BACKUPS currently disabled, skipped")
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
>
>
> def _upload(self, parent, event):
>
> LM = '_upload'
>
> LOGF = self.LOGF
>
> if self._trace:
>
> LOGF("T", LM, "Entering")
>
>
>
> if self._debug > 1:
>
> LOGF("D", LM, "event.path=%s, event.name=%s" % \
>
> (event.path, event.name))
>
>
>
> src = os.path.join(event.path, event.name)
>
> if self._debug:
>
> LOGF("D", "[%s]" % parent, "src=%s" % src)
>
>
>
> #
>
> # Make sure a .tmp file didn't trigger this event
>
> #
>
> if event.name.endswith('.tmp') or \
>
> event.name.startswith('.'):
>
>
>
> if self._debug:
>
> LOGF("I", LM, ".tmp file, skipped")
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> return
>
>
>
> #
>
> # Also make sure that the file is a blob. blobs have 28 character
>
> # hex filenames.
>
> #
>
> if len(os.path.basename(event.name)) != 28:
>
> if self._debug:
>
> LOGF("I", LM, "%s non-blob file skiped")
>
> return
>
>
>
> #
>
> # Make sure file hasn't "disappeared"
>
> #
>
> if not os.path.exists(src):
>
> LOGF("W", LM, "src=%s, not found, skipped" % src)
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
> return
>
>
>
> #
>
> # See if I've got the information in file_data
>
> #
>
> blob_id = blobidfromblobpath(src)
>
> email = None
>
> account_id = getAccountIdFromBlobId(self.conn, blob_id)
>
> if account_id is not None:
>
> member_info = self.members_info.get(account_id,
>
> getMemberInfo(self.conn, account_id =
> account_id)
>
> )
>
>
>
> if member_info is not None:
>
> email = member_info['email']
>
>
>
>
>
> else:
>
> LOGF("W", LM, "blob_id=%s not found in file_data" % \
>
> blob_id)
>
>
>
> S3path = blobpathfromblobid('/', blob_id)
>
> size = os.path.getsize(src)
>
> #
>
> # Create a new key instance for S3 and pass in the
>
> # meta-information
>
> #
>
> k = self.bucket.new_key(key_name = S3path)
>
> #
>
> # Meta-data needed to restore a file from S3 to local filesystem
>
> # (e.g. to set ctime, mtime properly to work with rsync)
>
> #
>
> ctime = os.path.getctime(src)
>
> mtime = os.path.getmtime(src)
>
> VESctime = epochtime2S3time(ctime)
>
> VESmtime = epochtime2S3time(mtime)
>
> if self._debug > 1:
>
> LOGF("D", LM, "setting VESctime=%s" % VESctime)
>
> LOGF("D", LM, "setting VESmtime=%s" % VESmtime)
>
>
>
> k.set_metadata('ctime', VESctime)
>
> k.set_metadata('mtime', VESmtime)
>
> age = time.time() - ctime
>
> LOGF("I", LM, "[%s]%-28s %s(%11s)[%s]" % \
>
> (parent,
>
> email[:28],
>
> blob_id,
>
> FMC(size),
>
> elapsedTimeToString(age)
>
> )
>
> )
>
>
>
> if not self._dryrun:
>
> #
>
> # Upload the file to S3. Use replace=False to short circuit
>
> # the upload if the file already exists. That way I'm not
> using
>
> # upload bandwidth unnecessarily.
>
> #
>
> k.set_contents_from_filename(src, replace = False)
>
> if self._trace:
>
> LOGF("I", LM, "done")
>
>
>
> if self._trace:
>
> LOGF("T", LM, "Leaving")
>
>
>
>
>
> def sigTERM(signal, frame):
>
> global stopnow
>
> stopnow = True
>
>
>
> def notifyCallback(notifier):
>
> global _trace, LOGF
>
> global stopnow
>
> LM = 'notifyCallback'
>
> if stopnow:
>
> notifier.stop()
>
>
>
>
>
> def conlog(severity, LM, msg):
>
> global _trace, _debug, _quiet, LOGF
>
> if msg:
>
> LOGF(severity, "%s-%s" % (LM,msg))
>
>
>
> if not _quiet:
>
> sys.stdout.write(msg + '\n')
>
> sys.stdout.flush()
>
>
>
> pgm_ = 'monitorblobs'
>
> ver_ = '1.1.1.0'
>
> LM = 'main'
>
> #
>
> # V1.0.0.1 13 JUL 08 LAB Wrapped call to S3 to get HEAD after uploading (to
>
> # access last_modified meta-data) in while-try block
>
> # to work around boto issue #125)
>
> #
>
> # V1.0.0.2 13 JUL 08 LAB Added .strip() to arguments to work with
>
> # supervisord
>
> #
>
> # V1.0.0.3 14 JUL 08 LAB Added start program separator to logfile, fixed
>
> # typo (Xattrs instead of xattrs)
>
> #
>
> # V1.0.0.4 14 JUL 08 LAB Wrapped all boto calls inside _mexe framework
>
> # as a workaround to the bug #125.
>
> #
>
> # V1.0.0.5 15 JUL 08 LAB Removed the IN_MOVED_TO handler because every
> upload
>
> # triggered the IN_ATTRIB handler as well.
>
> #
>
> # V1.1.0.0 17 JUL 08 LAB Upgrded to V0.8 of pyinotify, replaced hard-coded
>
> # checking of the inotify variables with newly
>
> # provided SysCtlNotify class.
>
> #
>
> # V1.1.0.1 26 JUL 08 LAB Added hack to log len(_eventq) for debugging
>
> #
>
> # V1.1.0.2 29 JUL 08 LAB Added dbTableCache so I can display member info
>
> # in logfile as files are processed (unfinished
> until
>
> # fileid xattr is set from upload processing code).
>
>
>
> # V1.1.0.3 07 MAR 09 LAB Introduced getFromINI and _<runtimeparm> naming,
>
> # removed _mexe from class because it is now inside
> boto
>
> #
>
> # V1.1.0.4 12 MAR 09 LAB Shortened some logging messages, put others under
>
> # _trace control
>
> #
>
> # V1.1.0.5 30 MAR 09 LAB Added code to _upload that skips files that start
>
> # with a period (.). These files are generated when
>
> # doing an rsync recovery on a monitored branch.
>
> #
>
> # V1.1.0.6 11 DEC 09 LAB Introduced the VES_STORAGE_S3 environment variable
>
> # that points to bucket_name
>
> #
>
> # V1.1.0.7 27 DEC 09 LAB Added copy to backup method to Watcher class (this
>
> # is temporary because I'm moving to GlusterFS auto
>
> # replication).
>
> #
>
> # V1.1.0.8 04 JAN 10 LAB Cleaned up upload code, eliminated xattr handlers
>
> #
>
> # V1.1.0.9 10 JAN 10 LAB Better logging for _delete, _upload. Cleaned up
>
> # code for making/skipping backups (which won't be
>
> # needed when I switch to GlusterFS).
>
> #
>
> # V1.1.1.0 23 JAN 10 LAB Introduced globalconfig, removed BACKUPS because
>
> # of cutover to GlusterFS
>
> #
>
> # Register the signal handler
>
> #
>
> signal.signal(signal.SIGTERM, sigTERM)
>
> stopnow = False
>
>
>
> PGM_usage='''
>
> Usage: %s [OPTIONS]
>
> Monitor blobdata folders for changes and process files
>
>
>
> -t, --trace run in trace mode
>
> -d, --debug= run at debug level= (e.g 1,2,3)
>
> -q, --quiet run in quiet mode (minimal output)
>
> -D, --dryrun dry run, no changes
>
> -l, --logfilename= specify the path/filename to .LOG file
>
> -i, --inifilename= specify the path/filename to .INI file
>
> -h, --help help (this screen)
>
> -V, --version output version information and stop
>
> ''' % pgm_
>
>
>
> #
>
> # Get the options that the user may have passed on processor call line
>
> #
>
> short_args = "td:ql:i:hVD"
>
> long_args = ["trace", "debug=", "quiet", "logfilename=",
>
> "inifilename=", "help", "version", "dryrun",
>
> ]
>
>
>
> #
>
> # Process command line options
>
> #
>
> opts, args = getopt.gnu_getopt(sys.argv[1:], short_args, long_args)
>
> #
>
> # Set inital values for potential argv parameters
>
> #
>
> _trace = False
>
> _debug = 0
>
> _quiet = False
>
> _dryrun = False
>
> _logfilename = None
>
> _inifilename = None
>
> #
>
> # Parse command line options
>
> #
>
> for option, value in opts:
>
> if option in ("-h", "--help"):
>
> sys.exit(PGM_usage)
>
>
>
> if option in ("-V", "--version"):
>
> print '%s Version %s' % (pgm_, ver_)
>
> sys.exit(0)
>
>
>
> if option in ("-t", "--trace"): _trace = True
>
> elif option in ("-d", "--debug"): _debug = int(value)
>
> elif option in ("-q", "--quiet"): _quiet = 1
>
> elif option in ("-D", "--dryrun"): _dryrun = True
>
> elif option in ("-l", "--logfilename"): _logfilename = value.strip()
>
> elif option in ("-i", "--inifilename"): _inifilename = value.strip()
>
>
>
> #
>
> # If user didn't specify inifilename on processor call, use default
>
> #
>
> if _inifilename is None:
>
> _inifilename = "%s.ini" % pgm_
>
>
>
> if not os.path.exists(globalinifilename):
>
> msg = "inifilename=%s, not found, aborting" % globalinifilename
>
> raise RuntimeError(msg)
>
>
>
> if not os.path.exists(_inifilename):
>
> msg = "inifilename=%s, not found, aborting" % _inifilename
>
> #print "%s-%s.%s" % ("E", LM, msg)
>
> raise RuntimeError(msg)
>
>
>
> #
>
> # Create ConfigParser instance to read .INI information
>
> #
>
> INI = ConfigParser.ConfigParser()
>
> #
>
> # Read .INI file
>
> #
>
> INI.read([globalinifilename, _inifilename])
>
>
>
> _logfilename = getFromINI(INI.get, 'init', 'logfilename',
>
> _logfilename, "%s.log" % pgm_)
>
>
>
> logf = loggerClass(_logfilename, 'monitor', 1<<26) #64Mb max
>
> logf.initsessionlog()
>
> LOGF = logf.writelines
>
> LOGF("I", ("------%s V%s begin" % (pgm_, ver_)).ljust(50, '-'))
>
> #
>
> # Make sure there isn't another copy of me running already
>
> #
>
> _pidPath = getFromINI(INI.get, 'init', 'pidPath', None, None)
>
> myapp = singleinstance(pgm_, _pidPath)
>
> if myapp.alreadyrunning():
>
> msg = "%s already running, exiting" % pgm_
>
> raise RuntimeError(msg)
>
>
>
>
>
> _trace = getFromINI(INI.getboolean, 'init', 'trace', _trace, False)
>
> _debug = getFromINI(INI.getint, 'init', 'debug', _debug, 0)
>
> _quiet = getFromINI(INI.getboolean, 'init', 'quiet', _quiet, False)
>
> _dryrun = getFromINI(INI.getboolean, 'init', 'dryrun', _dryrun, False)
>
>
>
> signon(pgm_, ver_, _quiet=_quiet)
>
> #
>
> # More items to get from the .INI file (or environment)
>
> #
>
> _STORAGE = getFromINI(INI.get, 'init', 'storage', None, None)
>
> _bucketname = getFromINI(INI.get, 'init', 'bucketname', None, None)
>
>
>
> #
>
> # Get files from .INI to read AWS credentials from
>
> #
>
> _accessKeyFile = getFromINI(INI.get, 'init', 'accesskeyfile', None, None)
>
> _secretKeyFile = getFromINI(INI.get, 'init', 'secretkeyfile', None, None)
>
>
>
> if _debug:
>
> conlog("I", LM, "-----Options".ljust(50, '-'))
>
> conlog("I", LM, "trace..........%s" % _trace)
>
> conlog("I", LM, "debug..........%i" % _debug)
>
> conlog("I", LM, "quiet..........%s" % _quiet)
>
> conlog("I", LM, "dryrun.........%s" % _dryrun)
>
> conlog("I", LM, "STORAGE........%s" % _STORAGE)
>
> conlog("I", LM, "pidPath........%s" % _pidPath)
>
> conlog("I", LM, "bucketname.....%s" % _bucketname)
>
> conlog("I", LM, "accessKeyFile..%s" % _accessKeyFile)
>
> conlog("I", LM, "secretKeyFile..%s" % _secretKeyFile)
>
>
>
> _PWMfile = getFromINI(INI.get, 'init', 'pwmfile', None, None)
>
> _host = getFromINI(INI.get, 'database', 'host', None, None)
>
> _database = getFromINI(INI.get, 'database', 'database', None, None)
>
> _dbport = getFromINI(INI.getint, 'database', 'port', None, None)
>
> _user = getFromINI(INI.get, 'database', 'user', None, None)
>
>
>
> conlog("I", LM, "PWMfile........%s" % _PWMfile)
>
> conlog("I", LM, "host...........%s" % _host)
>
> conlog("I", LM, "database.......%s" % _database)
>
> conlog("I", LM, "dbport.........%i" % _dbport)
>
> conlog("I", LM, "user...........%s" % _user)
>
>
>
> if not _quiet:
>
> print
>
>
>
> #
>
> # Get database password from file
>
> #
>
> _password = open(_PWMfile, 'r').readline().rstrip()
>
> conn = psycopg2.connect(host=_host, database=_database, port=_dbport,
>
> user=_user, password=_password)
>
> #
>
> # Get the AccessKey and SecretAccessKey info
>
> #
>
> aws_ak = open(_accessKeyFile,'r').readline().rstrip()
>
> aws_sak = open(_secretKeyFile,'r').readline().rstrip()
>
> #
>
> # Create an instance of boto S3Connection using these credentials
>
> #
>
> S3obj = S3Connection(aws_ak, aws_sak)
>
> if _trace:
>
> conlog("T", LM, "S3 connection object created")
>
> conlog("T", LM, "Retrieving bucketname=%s from S3" % _bucketname)
>
>
>
> bucket = S3obj.get_bucket(_bucketname)
>
> if _trace:
>
> conlog("T", LM, "bucketname = %s, retrieved" % _bucketname)
>
>
>
> #
>
> # Watch for move/delete events
>
> #
>
> #mask = pyinotify.IN_DELETE | pyinotify.IN_ATTRIB | pyinotify.IN_Q_OVERFLOW
>
> mask = pyinotify.IN_ATTRIB
>
> mask |= pyinotify.IN_Q_OVERFLOW
>
> mask |= pyinotify.IN_MOVED_TO
>
> #
>
> # Create instance of WatchManager class and notifier class
>
> #
>
> wm = pyinotify.WatchManager()
>
> if _trace:
>
> conlog("T", LM, "Creating Watcher instance")
>
>
>
> Wobj = Watcher(conn, bucket, logf = logf,
>
> _trace = _trace, _debug = _debug, _dryrun = _dryrun)
>
>
>
> if _trace:
>
> conlog("T", LM, "Watcher instance created")
>
> conlog("T", LM, "Creating Notifier instance")
>
>
>
> notifier = pyinotify.Notifier(wm, Wobj)
>
>
>
> if _trace:
>
> conlog("T", LM, "Notifier instance created")
>
>
>
> #
>
> # If I'm debugging, get a loggerClass instance into notifier class instance
>
> # to log _eventq depth.
>
> #
>
> if _debug:
>
> notifier.LOGF = logf.writelines
>
>
>
> if not _quiet:
>
> print
>
> #
>
> # Folders to watch (this way I won't watch any folders except the ones
>
> # that actually hold blobs 00-ff->00-ff). This keeps me from watching
>
> # temp/junk folders that might accidentally get created.
>
> #
>
> ##flist = ['%02x' % i for i in xrange(0, 256)] #00-ff
>
> flist = ['%02x' % i for i in xrange(int('00',16), int('ff',16) + 1)]
>
> conlog("I", LM, "Watchlist STORAGE..%s (recursive)" % _STORAGE)
>
> conlog("I", LM, "Registering folders to watch...")
>
> foldersRegistered = 0
>
> for n, i in enumerate(flist):
>
> l = n + 1
>
> if (n % 16) == 0:
>
> if not _quiet:
>
> if n != 0:
>
> sys.stdout.write('(recursive)\n')
>
>
>
> sys.stdout.write("Watchlist adding....%s " % i)
>
> sys.stdout.flush()
>
>
>
> else:
>
> if not _quiet:
>
> sys.stdout.write("%s " % i)
>
> sys.stdout.flush()
>
>
>
> for j in flist:
>
> watch_folder = os.path.join(_STORAGE, i, j)
>
> wm.add_watch(watch_folder, mask)
>
> foldersRegistered +=1
>
>
>
> if not _quiet:
>
> sys.stdout.write('(recursive)\n')
>
> print
>
>
>
> conlog("I", LM, "%s folder monitors registered" % FMC(foldersRegistered))
>
> if _trace:
>
> conlog("T", LM, "Entering notifier.loop")
>
>
>
> try:
>
> notifier.loop(callback = notifyCallback)
>
>
>
> except KeyboardInterrupt:
>
> print "KeyboardInterrupt, stopping..."
>
> stopnow = True
>
> #
>
> # destroy the inotify's instance on this interrupt (stop monitoring)
>
> #
>
> notifier.stop()
>
>
>
> del myapp
>
> if _dryrun:
>
> conlog("I", LM, "WARNING-dryrun = True, nothing committed")
>
>
>
>
>
> Hope this helps.
>
>
>
> -Larry
>
>
>
>
>
> *From:* harshavardhanacool at gmail.com [mailto:harshavardhanacool at gmail.com]
> *On Behalf Of *Harshavardhana
> *Sent:* Tuesday, January 26, 2010 2:01 AM
> *To:* Larry Bates
> *Cc:* gluster-users at gluster.org
> *Subject:* Re: [Gluster-users] GlusterFS and inotify (more info)
>
>
>
> Hi Larry,
>
> Can you share with us the volume files you are using with GlusterFS?.
> Also the scripts you are trying to run.
>
> Thanks
> --
> Harshavardhana
> Gluster - http://www.gluster.com
>
> On Tue, Jan 26, 2010 at 3:31 AM, Larry Bates <larry.bates at vitalesafe.com>
> wrote:
>
> Well it seems that I can register, but the registration of the subfolders
> is so
> slow that I thought it was not working. I have subfolders 00-ff and
> subfolders
> under them 00-ff (64K total folder structure). Registering on normal
> storage
> took about 30 seconds. Registering inotify watcher (recursive=True) on
> GlusterFS mount takes over 1 hr, 15 min! Walking the tree and registering
> them
> each individually takes 6 minutes.
>
> -Larry
>
> -----Original Message-----
> From: Larry Bates [mailto:larry.bates at vitalesafe.com]
> Sent: Monday, January 25, 2010 1:51 PM
> To: 'gluster-users at gluster.org'
> Subject: GlusterFS and inotify
>
> I recently moved my backend storage to GlusterFS V3.0 and with one
> exception
> everything is running great. That exception is that I had a daemon using
> inotify that was watching my storage for new files. Upon arrival this
> watcher
> uploaded a copy of the file to Amazon S3. This daemon had been running
> just
> fine for well over a year. Moving to GlusterFS seems to indicate that
> inotify
> doesn't work on GlusterFS volumes. I don't know if it is a Gluster, Fuse,
> or
> some other problem. inotify just refuses to allow me to register the top
> level
> folder to be watched. Before I spend a lot of time on this, I thought I'd
> bounce it off of the "experts" on this list.
>
> Anyone have any ideas?
>
> Thanks in advance,
> Larry Bates
> vitalEsafe, Inc.
>
> _______________________________________________
> Gluster-users mailing list
> Gluster-users at gluster.org
> http://gluster.org/cgi-bin/mailman/listinfo/gluster-users
>
>
>
More information about the Gluster-users
mailing list