Source code for pyvolume.zkfuse

# -*- coding: utf-8 -*-
""" Module providing ZKFileSystem implementation."""
from __future__ import unicode_literals

from __future__ import absolute_import
import logging
import os
import os.path

from plumbum import ProcessExecutionError

from pyvolume.exceptions import NeedOptionsException

log = logging.getLogger(__name__)

try:
    from plumbum.cmd import docker
except ImportError:
    log.warning("This can be ignored during testing or on travis")

DOCKER_IMAGE = "ronin/zkfuse:latest"
CONTAINER_NAME = "zkfuse"


[docs]class ZKFileSystem(object): """ Mounts an external directory pointed by `remote_path` onto `base` (/mnt by default) and passes it to Docker to use as a volume. Uses vol_dict to keep track of different volumes. """ def __init__(self, base, zkfuse_opt="-d"): self.base = os.path.join(base, "zkmount") os.mkdir(self.base) self.zkfuse_options = zkfuse_opt self.vol_dict = {} self.docker_opt = "run --name {0} --device /dev/fuse --cap-add SYS_ADMIN -d ".format( CONTAINER_NAME )
[docs] def create(self, volname, options): """ Creates the directories but does not mount it yet.""" if "zookeeper_string" not in options: raise NeedOptionsException("zookeeper_string needed") zkstring = options["zookeeper_string"] local_path = os.path.join(self.base, volname) log.info("Creating directory " + local_path) os.mkdir(local_path) if "docker_opt" in options: # Need to provide "--net=host here if zookeeper is running locally" self.docker_opt += options["docker_opt"] cmdline = ( self.docker_opt + " -v {hostmount}:/zkmount:shared {dimage} /usr/bin/zkfuse \ {zkfuse_opt} -m /zkmount/{volname} -z {zkstring}".format( hostmount=self.base, zkfuse_opt=self.zkfuse_options, zkstring=zkstring, dimage=DOCKER_IMAGE, volname=volname, ) ) cmdline = cmdline.split() self.vol_dict[volname] = { "Local": local_path, "cmdline": cmdline, "mounted": False, } if "pull_early" in options: cmdstring = "pull " + DOCKER_IMAGE cmd = docker[cmdstring.split()] cmd()
[docs] def list(self): """ Lists the existing volumes being managed.""" vol_list = [] for volumes in self.vol_dict: vol_list += [volumes] return vol_list
[docs] def mount_check(self, volname): """Check if the volume is already mounted. If mounted, return its path. """ if not self.vol_dict[volname]["mounted"]: log.error("Volume {0} is not mounted".format(volname)) return None return self.vol_dict[volname]["Local"]
[docs] def path(self, volname): """Check if the volume is already mounted. If mounted, return its path. """ if not self.mount_check(volname): return None return self.vol_dict[volname]["Local"]
[docs] def remove(self, volname): """ Removes the volume. It unmounts the remote if necessary, tolerates if already unmounted. After which, it removes the mounted directory. """ local_path = self.vol_dict[volname]["Local"] try: self.umount(volname) except ProcessExecutionError as e: if e.retcode != 1: raise log.info("Removing local path " + local_path) if os.path.exists(local_path): os.rmdir(local_path) return True
[docs] def mount(self, volname): """ Mount the remote onto local for volname. """ check = self.mount_check(volname) if check: return check cmdline = self.vol_dict[volname]["cmdline"] mount_cmd = docker[cmdline] mount_cmd() self.vol_dict[volname]["mounted"] = True return self.vol_dict[volname]["Local"]
[docs] def umount(self, volname): if not self.mount_check(volname): return None cmdline = "stop " + CONTAINER_NAME umount_cmd = docker[cmdline.split()] umount_cmd() cmdline = "rm -f " + CONTAINER_NAME remove_cmd = docker[cmdline.split()] remove_cmd() self.vol_dict[volname]["mounted"] = False return True
[docs] def cleanup(self): """ Unmounts and removes mount paths when shutting down.""" for volume in self.vol_dict: self.remove(volume) os.rmdir(self.base)
[docs] def scope(self): """ Returns scope of this - global.""" return "global"