summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--chroma/cache.py160
-rw-r--r--test/test_cache.py169
2 files changed, 329 insertions, 0 deletions
diff --git a/chroma/cache.py b/chroma/cache.py
new file mode 100644
index 0000000..fe7d180
--- /dev/null
+++ b/chroma/cache.py
@@ -0,0 +1,160 @@
+'''chroma.cache: On-disk geometry and bounding volume hierarchy cache.
+
+The ``Cache`` class is used to manage an on-disk cache of geometry and
+BVH objects, which are slow to calculate. By default, the cache is in
+$HOME/.chroma.
+
+Currently this cache is not thread or process-safe when a writing
+process is present.
+'''
+
+import os
+import cPickle as pickle
+
+from chroma.log import logger
+
+class GeometryNotFoundError(Exception):
+ '''A requested geometry was not found in the on-disk cache.'''
+ def __init__(self, msg):
+ Exception.__init__(self, msg)
+
+def verify_or_create_dir(dirname, exception_msg, logger_msg=None):
+ '''Checks if ``dirname`` exists and is a directory. If it does not exist,
+ then it is created. If it does exist, but is not a directory, an IOError
+ is raised with ``exception_message`` as the description.
+
+ If the directory is created, an info message will be sent to the
+ Chroma logger if ``logger_message`` is not None.
+ '''
+ if not os.path.isdir(dirname):
+ if os.path.exists(dirname):
+ raise IOError(exception_msg)
+ else:
+ if logger_msg is not None:
+ logger.info(logger_msg)
+ os.mkdir(dirname)
+
+class Cache(object):
+ '''Class for manipulating a Chroma disk cache directory.
+
+ Use this class to read and write cached geometries or bounding
+ volume hierarchies rather than reading and writing disk directly.
+ '''
+
+ def __init__(self, cache_dir=os.path.expanduser('~/.chroma/')):
+ '''Open a Chroma cache stored at ``cache_dir``.
+
+ If ``cache_dir`` does not already exist, it will be created. By default,
+ the cache is in the ``.chroma`` directory under the user's home directory.
+ '''
+ self.cache_dir = cache_dir
+ verify_or_create_dir(self.cache_dir,
+ exception_msg='Path for cache already exists, '
+ 'but is not a directory: %s' % cache_dir,
+ logger_msg='Creating new Chroma cache directory at %s'
+ % cache_dir)
+
+
+ self.geo_dir = os.path.join(cache_dir, 'geo')
+ verify_or_create_dir(self.geo_dir,
+ exception_msg='Path for geometry directory in cache '
+ 'already exists, but is not a directory: %s'
+ % self.geo_dir)
+
+ self.bvh_dir = os.path.join(cache_dir, 'bvh')
+ verify_or_create_dir(self.bvh_dir,
+ exception_msg='Path for BVH directory in cache already '
+ 'exists, but is not a directory: %s' % self.bvh_dir)
+
+
+ def get_geometry_filename(self, name):
+ '''Return the full pathname for the geometry file corresponding to
+ ``name``.
+ '''
+ return os.path.join(self.geo_dir, name)
+
+ def list_geometry(self):
+ '''Returns a list of all geometry names in the cache.'''
+ return os.listdir(self.geo_dir)
+
+ def save_geometry(self, name, geometry):
+ '''Save ``geometry`` in the cache with the name ``name``.'''
+ geo_file = self.get_geometry_filename(name)
+ with open(geo_file, 'wb') as output_file:
+ pickle.dump(geometry.mesh.md5(), output_file,
+ pickle.HIGHEST_PROTOCOL)
+ pickle.dump(geometry, output_file,
+ pickle.HIGHEST_PROTOCOL)
+
+ def load_geometry(self, name):
+ '''Returns the chroma.geometry.Geometry object associated with
+ ``name`` in the cache.
+
+ Raises ``GeometryNotFoundError`` if ``name`` is not in the cache.
+ '''
+ geo_file = self.get_geometry_filename(name)
+ if not os.path.exists(geo_file):
+ raise GeometryNotFoundError(name)
+ with open(geo_file, 'rb') as input_file:
+ _ = pickle.load(input_file) # skip mesh hash
+ geo = pickle.load(input_file)
+ return geo
+
+ def remove_geometry(self, name):
+ '''Remove the geometry file associated with ``name`` from the cache.
+
+ If ``name`` does not exist, no action is taken.
+ '''
+ geo_file = self.get_geometry_filename(name)
+ if os.path.exists(geo_file):
+ os.remove(geo_file)
+
+ def get_geometry_hash(self, name):
+ '''Get the mesh hash for the geometry associated with ``name``.
+
+ This is faster than loading the entire geometry file and calling the
+ hash() method on the mesh.
+ '''
+ geo_file = self.get_geometry_filename(name)
+ if not os.path.exists(geo_file):
+ raise GeometryNotFoundError(name)
+ with open(geo_file, 'rb') as input_file:
+ return pickle.load(input_file)
+
+ def load_default_geometry(self):
+ '''Load the default geometry as set by previous call to
+ set_default_geometry().
+
+ If no geometry has been designated the default, raise
+ GeometryNotFoundError.
+ '''
+ return self.load_geometry('.default')
+
+ def set_default_geometry(self, name):
+ '''Set the geometry in the cache corresponding to ``name`` to
+ be the default geometry returned by load_default_geometry().
+ '''
+ default_geo_file = self.get_geometry_filename('.default')
+ geo_file = self.get_geometry_filename(name)
+ if not os.path.exists(geo_file):
+ raise GeometryNotFoundError(name)
+
+ if os.path.exists(default_geo_file):
+ if os.path.islink(default_geo_file):
+ os.remove(default_geo_file)
+ else:
+ raise IOError('Non-symlink found where expected a symlink: '
+ +default_geo_file)
+ os.symlink(geo_file, default_geo_file)
+
+
+ def list_bvh(self, mesh_hash):
+ pass
+
+ def save_bvh(self, mesh_hash, name=None):
+ pass
+
+ def load_bvh(self, mesh_hash, name=None):
+ pass
+
+
diff --git a/test/test_cache.py b/test/test_cache.py
new file mode 100644
index 0000000..e0162c8
--- /dev/null
+++ b/test/test_cache.py
@@ -0,0 +1,169 @@
+import unittest
+import os
+import shutil
+import tempfile
+import binascii
+
+from chroma.cache import verify_or_create_dir, Cache, GeometryNotFoundError
+from chroma.geometry import Geometry, Solid
+from chroma.make import box
+
+def random_tempdir(prefix):
+ '''Select a random directory name inside the $TMP directory that
+ starts with ``prefix`` and ends with a random hex string.'''
+ subdir = prefix + '_' + binascii.b2a_hex(os.urandom(8))
+ return os.path.join(tempfile.gettempdir(), subdir)
+
+def remove_path(path):
+ '''If path is a file, delete it. If it is a directory, remove it.
+ If it doesn't exist, do nothing.'''
+ if os.path.isfile(path):
+ os.remove(path)
+ elif os.path.isdir(path):
+ shutil.rmtree(path)
+
+
+class TestVerifyOrCreateDir(unittest.TestCase):
+ def setUp(self):
+ self.test_dir = random_tempdir('vcd')
+
+ def test_no_dir(self):
+ assert not os.path.isdir(self.test_dir)
+ verify_or_create_dir(self.test_dir, 'msg')
+ assert os.path.isdir(self.test_dir)
+
+ def test_exist_dir(self):
+ os.mkdir(self.test_dir)
+ assert os.path.isdir(self.test_dir)
+ verify_or_create_dir(self.test_dir, 'msg')
+ assert os.path.isdir(self.test_dir)
+
+ def test_exist_file(self):
+ f = open(self.test_dir, 'w')
+ f.write('foo')
+ f.close()
+ with self.assertRaises(IOError):
+ verify_or_create_dir(self.test_dir, 'msg')
+
+ def tearDown(self):
+ remove_path(self.test_dir)
+
+class TestCacheCreation(unittest.TestCase):
+ def setUp(self):
+ self.cache_dir = random_tempdir('chroma_cache_test')
+
+ def test_creation(self):
+ assert not os.path.isdir(self.cache_dir)
+ cache = Cache(self.cache_dir)
+ assert os.path.isdir(self.cache_dir)
+
+ def test_recreation(self):
+ assert not os.path.isdir(self.cache_dir)
+ cache = Cache(self.cache_dir)
+ del cache
+ assert os.path.isdir(self.cache_dir)
+ cache = Cache(self.cache_dir)
+ assert os.path.isdir(self.cache_dir)
+
+ def tearDown(self):
+ remove_path(self.cache_dir)
+
+class TestCacheGeometry(unittest.TestCase):
+ def setUp(self):
+ self.cache_dir = random_tempdir('chroma_cache_test')
+ self.cache = Cache(self.cache_dir)
+
+ self.a = Geometry()
+ self.a.add_solid(Solid(box(1,1,1)))
+ self.a.add_solid(Solid(box(1,1,1)), displacement=(10,10,10))
+ self.a.flatten()
+
+ self.b = Geometry()
+ self.b.add_solid(Solid(box(2,2,2)))
+ self.b.add_solid(Solid(box(2,2,2)), displacement=(10,10,10))
+ self.b.add_solid(Solid(box(2,2,2)), displacement=(-10,-10,-10))
+ self.b.flatten()
+
+ def test_list_geometry(self):
+ self.assertEqual(len(self.cache.list_geometry()), 0)
+
+ self.cache.save_geometry('a', self.a)
+ l = self.cache.list_geometry()
+ self.assertEqual(len(l), 1)
+ self.assertIn('a', l)
+
+ self.cache.save_geometry('b', self.b)
+ l = self.cache.list_geometry()
+ self.assertEquals(len(l), 2)
+ self.assertIn('a', l)
+ self.assertIn('b', l)
+
+ self.cache.save_geometry('a', self.a)
+ l = self.cache.list_geometry()
+ self.assertEquals(len(l), 2)
+ self.assertIn('a', l)
+ self.assertIn('b', l)
+
+ def test_load_geometry_not_found(self):
+ with self.assertRaises(GeometryNotFoundError):
+ self.cache.load_geometry('a')
+
+ def test_save_load_new_geometry(self):
+ self.cache.save_geometry('b', self.b)
+ b = self.cache.load_geometry('b')
+
+ def test_replace_geometry(self):
+ self.cache.save_geometry('b', self.b)
+ b = self.cache.load_geometry('b')
+ self.assertEqual(b.mesh.md5(), self.b.mesh.md5())
+
+ self.cache.save_geometry('b', self.b)
+ b = self.cache.load_geometry('b')
+ self.assertEqual(b.mesh.md5(), self.b.mesh.md5())
+
+ def test_remove_geometry(self):
+ self.cache.save_geometry('b', self.b)
+ self.assertIn('b', self.cache.list_geometry())
+ self.cache.remove_geometry('b')
+ self.assertNotIn('b', self.cache.list_geometry())
+
+ def test_get_geometry_hash(self):
+ self.cache.save_geometry('b', self.b)
+ self.assertEqual(self.cache.get_geometry_hash('b'), self.b.mesh.md5())
+
+ def test_get_geometry_hash_not_found(self):
+ with self.assertRaises(GeometryNotFoundError):
+ self.cache.get_geometry_hash('a')
+
+ def test_default_geometry(self):
+ self.cache.save_geometry('a', self.a)
+ self.cache.save_geometry('b', self.b)
+
+ with self.assertRaises(GeometryNotFoundError):
+ self.cache.set_default_geometry('c')
+
+ self.cache.set_default_geometry('b')
+ b = self.cache.load_default_geometry()
+
+ self.cache.set_default_geometry('a')
+ a = self.cache.load_default_geometry()
+
+ def test_default_geometry_corruption(self):
+ self.cache.save_geometry('a', self.a)
+ self.cache.save_geometry('b', self.b)
+
+ # Put a file where a symlink should be
+ default_symlink_path = self.cache.get_geometry_filename('.default')
+ with open(default_symlink_path, 'w') as f:
+ f.write('foo')
+
+ with self.assertRaises(IOError):
+ self.cache.set_default_geometry('b')
+
+ # Verify file not modified
+ assert os.path.isfile(default_symlink_path)
+ with open(default_symlink_path) as f:
+ self.assertEqual(f.read(), 'foo')
+
+ def tearDown(self):
+ remove_path(self.cache_dir)