diff options
-rw-r--r-- | chroma/cache.py | 160 | ||||
-rw-r--r-- | test/test_cache.py | 169 |
2 files changed, 329 insertions, 0 deletions
diff --git a/chroma/cache.py b/chroma/cache.py new file mode 100644 index 0000000..fe7d180 --- /dev/null +++ b/chroma/cache.py @@ -0,0 +1,160 @@ +'''chroma.cache: On-disk geometry and bounding volume hierarchy cache. + +The ``Cache`` class is used to manage an on-disk cache of geometry and +BVH objects, which are slow to calculate. By default, the cache is in +$HOME/.chroma. + +Currently this cache is not thread or process-safe when a writing +process is present. +''' + +import os +import cPickle as pickle + +from chroma.log import logger + +class GeometryNotFoundError(Exception): + '''A requested geometry was not found in the on-disk cache.''' + def __init__(self, msg): + Exception.__init__(self, msg) + +def verify_or_create_dir(dirname, exception_msg, logger_msg=None): + '''Checks if ``dirname`` exists and is a directory. If it does not exist, + then it is created. If it does exist, but is not a directory, an IOError + is raised with ``exception_message`` as the description. + + If the directory is created, an info message will be sent to the + Chroma logger if ``logger_message`` is not None. + ''' + if not os.path.isdir(dirname): + if os.path.exists(dirname): + raise IOError(exception_msg) + else: + if logger_msg is not None: + logger.info(logger_msg) + os.mkdir(dirname) + +class Cache(object): + '''Class for manipulating a Chroma disk cache directory. + + Use this class to read and write cached geometries or bounding + volume hierarchies rather than reading and writing disk directly. + ''' + + def __init__(self, cache_dir=os.path.expanduser('~/.chroma/')): + '''Open a Chroma cache stored at ``cache_dir``. + + If ``cache_dir`` does not already exist, it will be created. By default, + the cache is in the ``.chroma`` directory under the user's home directory. + ''' + self.cache_dir = cache_dir + verify_or_create_dir(self.cache_dir, + exception_msg='Path for cache already exists, ' + 'but is not a directory: %s' % cache_dir, + logger_msg='Creating new Chroma cache directory at %s' + % cache_dir) + + + self.geo_dir = os.path.join(cache_dir, 'geo') + verify_or_create_dir(self.geo_dir, + exception_msg='Path for geometry directory in cache ' + 'already exists, but is not a directory: %s' + % self.geo_dir) + + self.bvh_dir = os.path.join(cache_dir, 'bvh') + verify_or_create_dir(self.bvh_dir, + exception_msg='Path for BVH directory in cache already ' + 'exists, but is not a directory: %s' % self.bvh_dir) + + + def get_geometry_filename(self, name): + '''Return the full pathname for the geometry file corresponding to + ``name``. + ''' + return os.path.join(self.geo_dir, name) + + def list_geometry(self): + '''Returns a list of all geometry names in the cache.''' + return os.listdir(self.geo_dir) + + def save_geometry(self, name, geometry): + '''Save ``geometry`` in the cache with the name ``name``.''' + geo_file = self.get_geometry_filename(name) + with open(geo_file, 'wb') as output_file: + pickle.dump(geometry.mesh.md5(), output_file, + pickle.HIGHEST_PROTOCOL) + pickle.dump(geometry, output_file, + pickle.HIGHEST_PROTOCOL) + + def load_geometry(self, name): + '''Returns the chroma.geometry.Geometry object associated with + ``name`` in the cache. + + Raises ``GeometryNotFoundError`` if ``name`` is not in the cache. + ''' + geo_file = self.get_geometry_filename(name) + if not os.path.exists(geo_file): + raise GeometryNotFoundError(name) + with open(geo_file, 'rb') as input_file: + _ = pickle.load(input_file) # skip mesh hash + geo = pickle.load(input_file) + return geo + + def remove_geometry(self, name): + '''Remove the geometry file associated with ``name`` from the cache. + + If ``name`` does not exist, no action is taken. + ''' + geo_file = self.get_geometry_filename(name) + if os.path.exists(geo_file): + os.remove(geo_file) + + def get_geometry_hash(self, name): + '''Get the mesh hash for the geometry associated with ``name``. + + This is faster than loading the entire geometry file and calling the + hash() method on the mesh. + ''' + geo_file = self.get_geometry_filename(name) + if not os.path.exists(geo_file): + raise GeometryNotFoundError(name) + with open(geo_file, 'rb') as input_file: + return pickle.load(input_file) + + def load_default_geometry(self): + '''Load the default geometry as set by previous call to + set_default_geometry(). + + If no geometry has been designated the default, raise + GeometryNotFoundError. + ''' + return self.load_geometry('.default') + + def set_default_geometry(self, name): + '''Set the geometry in the cache corresponding to ``name`` to + be the default geometry returned by load_default_geometry(). + ''' + default_geo_file = self.get_geometry_filename('.default') + geo_file = self.get_geometry_filename(name) + if not os.path.exists(geo_file): + raise GeometryNotFoundError(name) + + if os.path.exists(default_geo_file): + if os.path.islink(default_geo_file): + os.remove(default_geo_file) + else: + raise IOError('Non-symlink found where expected a symlink: ' + +default_geo_file) + os.symlink(geo_file, default_geo_file) + + + def list_bvh(self, mesh_hash): + pass + + def save_bvh(self, mesh_hash, name=None): + pass + + def load_bvh(self, mesh_hash, name=None): + pass + + diff --git a/test/test_cache.py b/test/test_cache.py new file mode 100644 index 0000000..e0162c8 --- /dev/null +++ b/test/test_cache.py @@ -0,0 +1,169 @@ +import unittest +import os +import shutil +import tempfile +import binascii + +from chroma.cache import verify_or_create_dir, Cache, GeometryNotFoundError +from chroma.geometry import Geometry, Solid +from chroma.make import box + +def random_tempdir(prefix): + '''Select a random directory name inside the $TMP directory that + starts with ``prefix`` and ends with a random hex string.''' + subdir = prefix + '_' + binascii.b2a_hex(os.urandom(8)) + return os.path.join(tempfile.gettempdir(), subdir) + +def remove_path(path): + '''If path is a file, delete it. If it is a directory, remove it. + If it doesn't exist, do nothing.''' + if os.path.isfile(path): + os.remove(path) + elif os.path.isdir(path): + shutil.rmtree(path) + + +class TestVerifyOrCreateDir(unittest.TestCase): + def setUp(self): + self.test_dir = random_tempdir('vcd') + + def test_no_dir(self): + assert not os.path.isdir(self.test_dir) + verify_or_create_dir(self.test_dir, 'msg') + assert os.path.isdir(self.test_dir) + + def test_exist_dir(self): + os.mkdir(self.test_dir) + assert os.path.isdir(self.test_dir) + verify_or_create_dir(self.test_dir, 'msg') + assert os.path.isdir(self.test_dir) + + def test_exist_file(self): + f = open(self.test_dir, 'w') + f.write('foo') + f.close() + with self.assertRaises(IOError): + verify_or_create_dir(self.test_dir, 'msg') + + def tearDown(self): + remove_path(self.test_dir) + +class TestCacheCreation(unittest.TestCase): + def setUp(self): + self.cache_dir = random_tempdir('chroma_cache_test') + + def test_creation(self): + assert not os.path.isdir(self.cache_dir) + cache = Cache(self.cache_dir) + assert os.path.isdir(self.cache_dir) + + def test_recreation(self): + assert not os.path.isdir(self.cache_dir) + cache = Cache(self.cache_dir) + del cache + assert os.path.isdir(self.cache_dir) + cache = Cache(self.cache_dir) + assert os.path.isdir(self.cache_dir) + + def tearDown(self): + remove_path(self.cache_dir) + +class TestCacheGeometry(unittest.TestCase): + def setUp(self): + self.cache_dir = random_tempdir('chroma_cache_test') + self.cache = Cache(self.cache_dir) + + self.a = Geometry() + self.a.add_solid(Solid(box(1,1,1))) + self.a.add_solid(Solid(box(1,1,1)), displacement=(10,10,10)) + self.a.flatten() + + self.b = Geometry() + self.b.add_solid(Solid(box(2,2,2))) + self.b.add_solid(Solid(box(2,2,2)), displacement=(10,10,10)) + self.b.add_solid(Solid(box(2,2,2)), displacement=(-10,-10,-10)) + self.b.flatten() + + def test_list_geometry(self): + self.assertEqual(len(self.cache.list_geometry()), 0) + + self.cache.save_geometry('a', self.a) + l = self.cache.list_geometry() + self.assertEqual(len(l), 1) + self.assertIn('a', l) + + self.cache.save_geometry('b', self.b) + l = self.cache.list_geometry() + self.assertEquals(len(l), 2) + self.assertIn('a', l) + self.assertIn('b', l) + + self.cache.save_geometry('a', self.a) + l = self.cache.list_geometry() + self.assertEquals(len(l), 2) + self.assertIn('a', l) + self.assertIn('b', l) + + def test_load_geometry_not_found(self): + with self.assertRaises(GeometryNotFoundError): + self.cache.load_geometry('a') + + def test_save_load_new_geometry(self): + self.cache.save_geometry('b', self.b) + b = self.cache.load_geometry('b') + + def test_replace_geometry(self): + self.cache.save_geometry('b', self.b) + b = self.cache.load_geometry('b') + self.assertEqual(b.mesh.md5(), self.b.mesh.md5()) + + self.cache.save_geometry('b', self.b) + b = self.cache.load_geometry('b') + self.assertEqual(b.mesh.md5(), self.b.mesh.md5()) + + def test_remove_geometry(self): + self.cache.save_geometry('b', self.b) + self.assertIn('b', self.cache.list_geometry()) + self.cache.remove_geometry('b') + self.assertNotIn('b', self.cache.list_geometry()) + + def test_get_geometry_hash(self): + self.cache.save_geometry('b', self.b) + self.assertEqual(self.cache.get_geometry_hash('b'), self.b.mesh.md5()) + + def test_get_geometry_hash_not_found(self): + with self.assertRaises(GeometryNotFoundError): + self.cache.get_geometry_hash('a') + + def test_default_geometry(self): + self.cache.save_geometry('a', self.a) + self.cache.save_geometry('b', self.b) + + with self.assertRaises(GeometryNotFoundError): + self.cache.set_default_geometry('c') + + self.cache.set_default_geometry('b') + b = self.cache.load_default_geometry() + + self.cache.set_default_geometry('a') + a = self.cache.load_default_geometry() + + def test_default_geometry_corruption(self): + self.cache.save_geometry('a', self.a) + self.cache.save_geometry('b', self.b) + + # Put a file where a symlink should be + default_symlink_path = self.cache.get_geometry_filename('.default') + with open(default_symlink_path, 'w') as f: + f.write('foo') + + with self.assertRaises(IOError): + self.cache.set_default_geometry('b') + + # Verify file not modified + assert os.path.isfile(default_symlink_path) + with open(default_symlink_path) as f: + self.assertEqual(f.read(), 'foo') + + def tearDown(self): + remove_path(self.cache_dir) |