diff options
Diffstat (limited to 'extern/openvdb/internal/openvdb/io')
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Archive.cc | 22 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Archive.h | 11 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Compression.h | 42 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/File.cc | 35 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/File.h | 40 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/GridDescriptor.cc | 5 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Queue.cc | 337 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Queue.h | 277 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Stream.cc | 16 | ||||
-rw-r--r-- | extern/openvdb/internal/openvdb/io/Stream.h | 53 |
10 files changed, 794 insertions, 44 deletions
diff --git a/extern/openvdb/internal/openvdb/io/Archive.cc b/extern/openvdb/internal/openvdb/io/Archive.cc index 9b178ef99f1..c05ed0f8889 100644 --- a/extern/openvdb/internal/openvdb/io/Archive.cc +++ b/extern/openvdb/internal/openvdb/io/Archive.cc @@ -54,6 +54,7 @@ struct StreamState static const long MAGIC_NUMBER; StreamState(); + ~StreamState(); int magicNumber; int fileVersion; @@ -125,6 +126,14 @@ StreamState::StreamState(): magicNumber(std::ios_base::xalloc()) } +StreamState::~StreamState() +{ + // Ensure that this StreamState struct can no longer be accessed. + std::cout.iword(magicNumber) = 0; + std::cout.pword(magicNumber) = NULL; +} + + //////////////////////////////////////// @@ -146,6 +155,13 @@ Archive::~Archive() } +boost::shared_ptr<Archive> +Archive::copy() const +{ + return boost::shared_ptr<Archive>(new Archive(*this)); +} + + //////////////////////////////////////// @@ -596,8 +612,12 @@ Archive::write(std::ostream& os, const GridCPtrVec& grids, bool seekable, for (GridCPtrVecCIter i = grids.begin(), e = grids.end(); i != e; ++i) { if (const GridBase::ConstPtr& grid = *i) { - // Ensure that the grid's descriptor has a unique grid name. + // Ensure that the grid's descriptor has a unique grid name, by appending + // a number to it if a grid with the same name was already written. + // Always add a number if the grid name is empty, so that the grid can be + // properly identified as an instance parent, if necessary. std::string name = grid->getName(); + if (name.empty()) name = GridDescriptor::addSuffix(name, 0); for (int n = 1; uniqueNames.find(name) != uniqueNames.end(); ++n) { name = GridDescriptor::addSuffix(grid->getName(), n); } diff --git a/extern/openvdb/internal/openvdb/io/Archive.h b/extern/openvdb/internal/openvdb/io/Archive.h index ecfc65b6df0..03270a79415 100644 --- a/extern/openvdb/internal/openvdb/io/Archive.h +++ b/extern/openvdb/internal/openvdb/io/Archive.h @@ -37,6 +37,7 @@ #include <string> #include <boost/uuid/uuid.hpp> #include <boost/cstdint.hpp> +#include <boost/shared_ptr.hpp> #include <openvdb/Grid.h> #include <openvdb/metadata/MetaMap.h> #include <openvdb/version.h> // for VersionId @@ -117,6 +118,9 @@ public: Archive(); virtual ~Archive(); + /// @brief Return a copy of this archive. + virtual boost::shared_ptr<Archive> copy() const; + /// @brief Return the UUID that was most recently written (or read, /// if no UUID has been written yet). std::string getUniqueTag() const; @@ -163,6 +167,9 @@ public: /// bounding box, etc.) should be computed and written as grid metadata. void setGridStatsMetadataEnabled(bool b) { mEnableGridStats = b; } + /// @brief Write the grids in the given container to this archive's output stream. + virtual void write(const GridCPtrVec&, const MetaMap& = MetaMap()) const {} + protected: /// @brief Return @c true if the input stream contains grid offsets /// that allow for random access or partial reading. @@ -198,7 +205,7 @@ protected: void setWriteGridStatsMetadata(std::ostream&); /// Read in and return the number of grids on the input stream. - static int readGridCount(std::istream&); + static int32_t readGridCount(std::istream&); /// Populate the given grid from the input stream. static void readGrid(GridBase::Ptr, const GridDescriptor&, std::istream&); @@ -242,7 +249,7 @@ private: /// The version of the library that was used to create the file that was read VersionId mLibraryVersion; /// 16-byte (128-bit) UUID - mutable boost::uuids::uuid mUuid;// needs to mutable since writeHeader is const! + mutable boost::uuids::uuid mUuid;// needs to be mutable since writeHeader is const! /// Flag indicating whether the input stream contains grid offsets /// and therefore supports partial reading bool mInputHasGridOffsets; diff --git a/extern/openvdb/internal/openvdb/io/Compression.h b/extern/openvdb/internal/openvdb/io/Compression.h index acb5b72673f..ab2d9582afe 100644 --- a/extern/openvdb/internal/openvdb/io/Compression.h +++ b/extern/openvdb/internal/openvdb/io/Compression.h @@ -39,6 +39,7 @@ #include <string> #include <vector> + namespace openvdb { OPENVDB_USE_VERSION_NAMESPACE namespace OPENVDB_VERSION_NAME { @@ -85,7 +86,8 @@ enum { /*2*/ NO_MASK_AND_ONE_INACTIVE_VAL, // all inactive vals have the same non-background val /*3*/ MASK_AND_NO_INACTIVE_VALS, // mask selects between -background and +background /*4*/ MASK_AND_ONE_INACTIVE_VAL, // mask selects between backgd and one other inactive val - /*5*/ MASK_AND_TWO_INACTIVE_VALS // mask selects between two non-background inactive vals + /*5*/ MASK_AND_TWO_INACTIVE_VALS, // mask selects between two non-background inactive vals + /*6*/ NO_MASK_AND_ALL_VALS // > 2 inactive vals, so no mask compression at all }; @@ -281,7 +283,7 @@ readCompressedValues(std::istream& is, ValueT* destBuf, Index destCount, zipped = compression & COMPRESS_ZIP, maskCompressed = compression & COMPRESS_ACTIVE_MASK; - int8_t metadata = NO_MASK_OR_INACTIVE_VALS; + int8_t metadata = NO_MASK_AND_ALL_VALS; if (getFormatVersion(is) >= OPENVDB_FILE_VERSION_NODE_MASK_COMPRESSION) { // Read the flag that specifies what, if any, additional metadata // (selection mask and/or inactive value(s)) is saved. @@ -294,11 +296,11 @@ readCompressedValues(std::istream& is, ValueT* destBuf, Index destCount, } ValueT inactiveVal1 = background; ValueT inactiveVal0 = - ((metadata == NO_MASK_OR_INACTIVE_VALS) ? background : negative(background)); + ((metadata == NO_MASK_OR_INACTIVE_VALS) ? background : math::negative(background)); - if (metadata != NO_MASK_OR_INACTIVE_VALS && - metadata != NO_MASK_AND_MINUS_BG && - metadata != MASK_AND_NO_INACTIVE_VALS) + if (metadata == NO_MASK_AND_ONE_INACTIVE_VAL || + metadata == MASK_AND_ONE_INACTIVE_VAL || + metadata == MASK_AND_TWO_INACTIVE_VALS) { // Read one of at most two distinct inactive values. is.read(reinterpret_cast<char*>(&inactiveVal0), sizeof(ValueT)); @@ -309,9 +311,9 @@ readCompressedValues(std::istream& is, ValueT* destBuf, Index destCount, } MaskT selectionMask; - if (metadata != NO_MASK_OR_INACTIVE_VALS && - metadata != NO_MASK_AND_MINUS_BG && - metadata != NO_MASK_AND_ONE_INACTIVE_VAL) + if (metadata == MASK_AND_NO_INACTIVE_VALS || + metadata == MASK_AND_ONE_INACTIVE_VAL || + metadata == MASK_AND_TWO_INACTIVE_VALS) { // For use in mask compression (only), read the bitmask that selects // between two distinct inactive values. @@ -322,7 +324,9 @@ readCompressedValues(std::istream& is, ValueT* destBuf, Index destCount, boost::scoped_array<ValueT> scopedTempBuf; Index tempCount = destCount; - if (maskCompressed && getFormatVersion(is) >= OPENVDB_FILE_VERSION_NODE_MASK_COMPRESSION) { + if (maskCompressed && metadata != NO_MASK_AND_ALL_VALS + && getFormatVersion(is) >= OPENVDB_FILE_VERSION_NODE_MASK_COMPRESSION) + { tempCount = valueMask.countOn(); if (tempCount != destCount) { // If this node has inactive voxels, allocate a temporary buffer @@ -394,7 +398,7 @@ writeCompressedValues(std::ostream& os, ValueT* srcBuf, Index srcCount, ValueT* tempBuf = srcBuf; boost::scoped_array<ValueT> scopedTempBuf; - int8_t metadata = NO_MASK_OR_INACTIVE_VALS; + int8_t metadata = NO_MASK_AND_ALL_VALS; if (!maskCompress) { os.write(reinterpret_cast<const char*>(&metadata), /*bytes=*/1); @@ -437,7 +441,7 @@ writeCompressedValues(std::ostream& os, ValueT* srcBuf, Index srcCount, if (numUniqueInactiveVals == 1) { if (!Local::eq(inactiveVal[0], background)) { - if (Local::eq(inactiveVal[0], negative(background))) { + if (Local::eq(inactiveVal[0], math::negative(background))) { metadata = NO_MASK_AND_MINUS_BG; } else { metadata = NO_MASK_AND_ONE_INACTIVE_VAL; @@ -451,7 +455,7 @@ writeCompressedValues(std::ostream& os, ValueT* srcBuf, Index srcCount, metadata = MASK_AND_TWO_INACTIVE_VALS; } else if (Local::eq(inactiveVal[1], background)) { - if (Local::eq(inactiveVal[0], negative(background))) { + if (Local::eq(inactiveVal[0], math::negative(background))) { // If the second inactive value is equal to the background and // the first is equal to -background, neither value needs to be saved, // but save a mask that selects between -background and +background. @@ -463,7 +467,7 @@ writeCompressedValues(std::ostream& os, ValueT* srcBuf, Index srcCount, metadata = MASK_AND_ONE_INACTIVE_VAL; } } else if (Local::eq(inactiveVal[0], background)) { - if (Local::eq(inactiveVal[1], negative(background))) { + if (Local::eq(inactiveVal[1], math::negative(background))) { // If the first inactive value is equal to the background and // the second is equal to -background, neither value needs to be saved, // but save a mask that selects between -background and +background. @@ -477,13 +481,15 @@ writeCompressedValues(std::ostream& os, ValueT* srcBuf, Index srcCount, metadata = MASK_AND_ONE_INACTIVE_VAL; } } + } else if (numUniqueInactiveVals > 2) { + metadata = NO_MASK_AND_ALL_VALS; } os.write(reinterpret_cast<const char*>(&metadata), /*bytes=*/1); - if (metadata != NO_MASK_OR_INACTIVE_VALS && - metadata != NO_MASK_AND_MINUS_BG && - metadata != MASK_AND_NO_INACTIVE_VALS) + if (metadata == NO_MASK_AND_ONE_INACTIVE_VAL || + metadata == MASK_AND_ONE_INACTIVE_VAL || + metadata == MASK_AND_TWO_INACTIVE_VALS) { if (!toHalf) { // Write one of at most two distinct inactive values. @@ -504,7 +510,7 @@ writeCompressedValues(std::ostream& os, ValueT* srcBuf, Index srcCount, } } - if (metadata == NO_MASK_OR_INACTIVE_VALS && numUniqueInactiveVals > 2) { + if (metadata == NO_MASK_AND_ALL_VALS) { // If there are more than two unique inactive values, the entire input buffer // needs to be saved (both active and inactive values). /// @todo Save the selection mask as long as most of the inactive values diff --git a/extern/openvdb/internal/openvdb/io/File.cc b/extern/openvdb/internal/openvdb/io/File.cc index 5ca4c76ad3b..cb071560528 100644 --- a/extern/openvdb/internal/openvdb/io/File.cc +++ b/extern/openvdb/internal/openvdb/io/File.cc @@ -57,6 +57,41 @@ File::~File() } +File::File(const File& other) + : Archive(other) + , mFilename(other.mFilename) + , mMeta(other.mMeta) + , mIsOpen(false) // don't want two file objects reading from the same stream + , mGridDescriptors(other.mGridDescriptors) + , mNamedGrids(other.mNamedGrids) + , mGrids(other.mGrids) +{ +} + + +File& +File::operator=(const File& other) +{ + if (&other != this) { + Archive::operator=(other); + mFilename = other.mFilename; + mMeta = other.mMeta; + mIsOpen = false; // don't want two file objects reading from the same stream + mGridDescriptors = other.mGridDescriptors; + mNamedGrids = other.mNamedGrids; + mGrids = other.mGrids; + } + return *this; +} + + +boost::shared_ptr<Archive> +File::copy() const +{ + return boost::shared_ptr<Archive>(new File(*this)); +} + + //////////////////////////////////////// diff --git a/extern/openvdb/internal/openvdb/io/File.h b/extern/openvdb/internal/openvdb/io/File.h index a00fd0d7858..e1bb60b6704 100644 --- a/extern/openvdb/internal/openvdb/io/File.h +++ b/extern/openvdb/internal/openvdb/io/File.h @@ -57,8 +57,24 @@ public: typedef NameMap::const_iterator NameMapCIter; explicit File(const std::string& filename); - ~File(); + virtual ~File(); + /// @brief Copy constructor + /// @details The copy will be closed and will not reference the same + /// file descriptor as the original. + File(const File& other); + /// @brief Assignment + /// @details After assignment, this File will be closed and will not + /// reference the same file descriptor as the source File. + File& operator=(const File& other); + + /// @brief Return a copy of this archive. + /// @details The copy will be closed and will not reference the same + /// file descriptor as the original. + virtual boost::shared_ptr<Archive> copy() const; + + /// @brief Return the name of the file with which this archive is associated. + /// @details The file does not necessarily exist on disk yet. const std::string& filename() const { return mFilename; } /// Open the file, read the file header and the file-level metadata, and @@ -67,7 +83,7 @@ public: /// @return @c true if the file's UUID has changed since it was last read. bool open(); - /// Return @c true if the file has been opened for reading, false otherwise. + /// Return @c true if the file has been opened for reading. bool isOpen() const { return mIsOpen; } /// Close the file once we are done reading from it. @@ -109,6 +125,10 @@ public: /// @brief Write the grids in the given container to the file whose name /// was given in the constructor. + virtual void write(const GridCPtrVec&, const MetaMap& = MetaMap()) const; + + /// @brief Write the grids in the given container to the file whose name + /// was given in the constructor. template<typename GridPtrContainerT> void write(const GridPtrContainerT&, const MetaMap& = MetaMap()) const; @@ -165,10 +185,6 @@ private: void writeGrids(const GridCPtrVec&, const MetaMap&) const; - // Disallow copying of instances of this class. - File(const File& other); - File& operator=(const File& other); - friend class ::TestFile; friend class ::TestStream; @@ -193,19 +209,19 @@ private: //////////////////////////////////////// -template<typename GridPtrContainerT> inline void -File::write(const GridPtrContainerT& container, const MetaMap& metadata) const +File::write(const GridCPtrVec& grids, const MetaMap& metadata) const { - GridCPtrVec grids; - std::copy(container.begin(), container.end(), std::back_inserter(grids)); this->writeGrids(grids, metadata); } -template<> + +template<typename GridPtrContainerT> inline void -File::write<GridCPtrVec>(const GridCPtrVec& grids, const MetaMap& metadata) const +File::write(const GridPtrContainerT& container, const MetaMap& metadata) const { + GridCPtrVec grids; + std::copy(container.begin(), container.end(), std::back_inserter(grids)); this->writeGrids(grids, metadata); } diff --git a/extern/openvdb/internal/openvdb/io/GridDescriptor.cc b/extern/openvdb/internal/openvdb/io/GridDescriptor.cc index e0dcbcabbe5..93d87a1a756 100644 --- a/extern/openvdb/internal/openvdb/io/GridDescriptor.cc +++ b/extern/openvdb/internal/openvdb/io/GridDescriptor.cc @@ -211,8 +211,9 @@ GridDescriptor::stringAsUniqueName(const std::string& s) std::string::size_type pos = ret.find("["); // Replace "[N]" with SEP "N". if (pos != std::string::npos) { - if (ret.substr(pos) == "[0]") { - // "name[0]" is equivalent to "name". + if (pos != 0 && ret.substr(pos) == "[0]") { + // "name[0]" is equivalent to "name", except in the case of + // the empty name "[0]". ret.erase(pos); } else { ret.resize(ret.size() - 1); // drop trailing ']' diff --git a/extern/openvdb/internal/openvdb/io/Queue.cc b/extern/openvdb/internal/openvdb/io/Queue.cc new file mode 100644 index 00000000000..6d76cd66d6d --- /dev/null +++ b/extern/openvdb/internal/openvdb/io/Queue.cc @@ -0,0 +1,337 @@ +/////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2012-2013 DreamWorks Animation LLC +// +// All rights reserved. This software is distributed under the +// Mozilla Public License 2.0 ( http://www.mozilla.org/MPL/2.0/ ) +// +// Redistributions of source code must retain the above copyright +// and license notice and the following restrictions and disclaimer. +// +// * Neither the name of DreamWorks Animation nor the names of +// its contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// IN NO EVENT SHALL THE COPYRIGHT HOLDERS' AND CONTRIBUTORS' AGGREGATE +// LIABILITY FOR ALL CLAIMS REGARDLESS OF THEIR BASIS EXCEED US$250.00. +// +/////////////////////////////////////////////////////////////////////////// +// +/// @file Queue.cc +/// @author Peter Cucka + +#include "Queue.h" + +#include "File.h" +#include "Stream.h" +#include <openvdb/Exceptions.h> +#include <openvdb/util/logging.h> +#include <boost/bind.hpp> +#include <tbb/atomic.h> +#include <tbb/concurrent_hash_map.h> +#include <tbb/mutex.h> +#include <tbb/task.h> +#include <tbb/tbb_thread.h> // for tbb::this_tbb_thread::sleep() +#include <tbb/tick_count.h> +#include <algorithm> // for std::max() +#include <iostream> +#include <map> + + +namespace openvdb { +OPENVDB_USE_VERSION_NAMESPACE +namespace OPENVDB_VERSION_NAME { +namespace io { + +namespace { + +typedef tbb::mutex Mutex; +typedef Mutex::scoped_lock Lock; + + +// Abstract base class for queuable TBB tasks that adds a task completion callback +class Task: public tbb::task +{ +public: + Task(Queue::Id id): mId(id) {} + virtual ~Task() {} + + Queue::Id id() const { return mId; } + + void setNotifier(Queue::Notifier& notifier) { mNotify = notifier; } + +protected: + void notify(Queue::Status status) { if (mNotify) mNotify(this->id(), status); } + +private: + Queue::Id mId; + Queue::Notifier mNotify; +}; + + +// Queuable TBB task that writes one or more grids to a .vdb file or an output stream +class OutputTask: public Task +{ +public: + OutputTask(Queue::Id id, const GridCPtrVec& grids, const Archive& archive, + const MetaMap& metadata) + : Task(id) + , mGrids(grids) + , mArchive(archive.copy()) + , mMetadata(metadata) + {} + + virtual tbb::task* execute() + { + Queue::Status status = Queue::FAILED; + try { + mArchive->write(mGrids, mMetadata); + status = Queue::SUCCEEDED; + } catch (std::exception& e) { + if (const char* msg = e.what()) { + OPENVDB_LOG_ERROR(msg); + } + } catch (...) { + } + this->notify(status); + return NULL; // no successor to this task + } + +private: + GridCPtrVec mGrids; + boost::shared_ptr<Archive> mArchive; + MetaMap mMetadata; +}; + +} // unnamed namespace + + +//////////////////////////////////////// + + +// Private implementation details of a Queue +struct Queue::Impl +{ + typedef std::map<Queue::Id, Queue::Notifier> NotifierMap; + /// @todo Provide more information than just "succeeded" or "failed"? + typedef tbb::concurrent_hash_map<Queue::Id, Queue::Status> StatusMap; + + + Impl() + : mTimeout(Queue::DEFAULT_TIMEOUT) + , mCapacity(Queue::DEFAULT_CAPACITY) + , mNextId(1) + , mNextNotifierId(1) + { + mNumTasks = 0; // note: must explicitly zero-initialize atomics + } + ~Impl() {} + + // Disallow copying of instances of this class. + Impl(const Impl&); + Impl& operator=(const Impl&); + + // This method might be called from multiple threads. + void setStatus(Queue::Id id, Queue::Status status) + { + StatusMap::accessor acc; + mStatus.insert(acc, id); + acc->second = status; + } + + // This method might be called from multiple threads. + void setStatusWithNotification(Queue::Id id, Queue::Status status) + { + const bool completed = (status == SUCCEEDED || status == FAILED); + + // Update the task's entry in the status map with the new status. + this->setStatus(id, status); + + // If the client registered any callbacks, call them now. + bool didNotify = false; + { + // tbb::concurrent_hash_map does not support concurrent iteration + // (i.e., iteration concurrent with insertion or deletion), + // so we use a mutex-protected STL map instead. But if a callback + // invokes a notifier method such as removeNotifier() on this queue, + // the result will be a deadlock. + /// @todo Is it worth trying to avoid such deadlocks? + Lock lock(mNotifierMutex); + if (!mNotifiers.empty()) { + didNotify = true; + for (NotifierMap::const_iterator it = mNotifiers.begin(); + it != mNotifiers.end(); ++it) + { + it->second(id, status); + } + } + } + // If the task completed and callbacks were called, remove + // the task's entry from the status map. + if (completed) { + if (didNotify) { + StatusMap::accessor acc; + if (mStatus.find(acc, id)) { + mStatus.erase(acc); + } + } + --mNumTasks; + } + } + + bool canEnqueue() const { return mNumTasks < Int64(mCapacity); } + + void enqueue(Task& task) + { + tbb::tick_count start = tbb::tick_count::now(); + while (!canEnqueue()) { + tbb::this_tbb_thread::sleep(tbb::tick_count::interval_t(0.5/*sec*/)); + if ((tbb::tick_count::now() - start).seconds() > double(mTimeout)) { + OPENVDB_THROW(RuntimeError, + "unable to queue I/O task; " << mTimeout << "-second time limit expired"); + } + } + Queue::Notifier notify = boost::bind(&Impl::setStatusWithNotification, this, _1, _2); + task.setNotifier(notify); + this->setStatus(task.id(), Queue::PENDING); + tbb::task::enqueue(task); + ++mNumTasks; + } + + Index32 mTimeout; + Index32 mCapacity; + tbb::atomic<Int32> mNumTasks; + Index32 mNextId; + StatusMap mStatus; + NotifierMap mNotifiers; + Index32 mNextNotifierId; + Mutex mNotifierMutex; +}; + + +//////////////////////////////////////// + + +Queue::Queue(Index32 capacity): mImpl(new Impl) +{ + mImpl->mCapacity = capacity; +} + + +Queue::~Queue() +{ + // Wait for all queued tasks to complete (successfully or unsuccessfully). + /// @todo Allow the queue to be destroyed while there are uncompleted tasks + /// (e.g., by keeping a static registry of queues that also dispatches + /// or blocks notifications)? + while (mImpl->mNumTasks > 0) { + tbb::this_tbb_thread::sleep(tbb::tick_count::interval_t(0.5/*sec*/)); + } +} + + +//////////////////////////////////////// + + +bool Queue::empty() const { return (mImpl->mNumTasks == 0); } +Index32 Queue::size() const { return Index32(std::max<Int32>(0, mImpl->mNumTasks)); } +Index32 Queue::capacity() const { return mImpl->mCapacity; } +void Queue::setCapacity(Index32 n) { mImpl->mCapacity = std::max<Index32>(1, n); } + +/// @todo void Queue::setCapacity(Index64 bytes); + +/// @todo Provide a way to limit the number of tasks in flight +/// (e.g., by enqueueing tbb::tasks that pop Tasks off a concurrent_queue)? + +/// @todo Remove any tasks from the queue that are not currently executing. +//void clear() const; + +Index32 Queue::timeout() const { return mImpl->mTimeout; } +void Queue::setTimeout(Index32 sec) { mImpl->mTimeout = sec; } + + +//////////////////////////////////////// + + +Queue::Status +Queue::status(Id id) const +{ + Impl::StatusMap::const_accessor acc; + if (mImpl->mStatus.find(acc, id)) { + const Status status = acc->second; + if (status == SUCCEEDED || status == FAILED) { + mImpl->mStatus.erase(acc); + } + return status; + } + return UNKNOWN; +} + + +Queue::Id +Queue::addNotifier(Notifier notify) +{ + Lock lock(mImpl->mNotifierMutex); + Queue::Id id = mImpl->mNextNotifierId++; + mImpl->mNotifiers[id] = notify; + return id; +} + + +void +Queue::removeNotifier(Id id) +{ + Lock lock(mImpl->mNotifierMutex); + Impl::NotifierMap::iterator it = mImpl->mNotifiers.find(id); + if (it != mImpl->mNotifiers.end()) { + mImpl->mNotifiers.erase(it); + } +} + + +void +Queue::clearNotifiers() +{ + Lock lock(mImpl->mNotifierMutex); + mImpl->mNotifiers.clear(); +} + + +//////////////////////////////////////// + + +Queue::Id +Queue::writeGrid(GridBase::ConstPtr grid, const Archive& archive, const MetaMap& metadata) +{ + return writeGridVec(GridCPtrVec(1, grid), archive, metadata); +} + + +Queue::Id +Queue::writeGridVec(const GridCPtrVec& grids, const Archive& archive, const MetaMap& metadata) +{ + // From the "GUI Thread" chapter in the TBB Design Patterns guide + OutputTask* task = + new(tbb::task::allocate_root()) OutputTask(mImpl->mNextId++, grids, archive, metadata); + mImpl->enqueue(*task); + return task->id(); +} + +} // namespace io +} // namespace OPENVDB_VERSION_NAME +} // namespace openvdb + +// Copyright (c) 2012-2013 DreamWorks Animation LLC +// All rights reserved. This software is distributed under the +// Mozilla Public License 2.0 ( http://www.mozilla.org/MPL/2.0/ ) diff --git a/extern/openvdb/internal/openvdb/io/Queue.h b/extern/openvdb/internal/openvdb/io/Queue.h new file mode 100644 index 00000000000..ac91feaa3e6 --- /dev/null +++ b/extern/openvdb/internal/openvdb/io/Queue.h @@ -0,0 +1,277 @@ +/////////////////////////////////////////////////////////////////////////// +// +// Copyright (c) 2012-2013 DreamWorks Animation LLC +// +// All rights reserved. This software is distributed under the +// Mozilla Public License 2.0 ( http://www.mozilla.org/MPL/2.0/ ) +// +// Redistributions of source code must retain the above copyright +// and license notice and the following restrictions and disclaimer. +// +// * Neither the name of DreamWorks Animation nor the names of +// its contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// IN NO EVENT SHALL THE COPYRIGHT HOLDERS' AND CONTRIBUTORS' AGGREGATE +// LIABILITY FOR ALL CLAIMS REGARDLESS OF THEIR BASIS EXCEED US$250.00. +// +/////////////////////////////////////////////////////////////////////////// +// +/// @file Queue.h +/// @author Peter Cucka + +#ifndef OPENVDB_IO_QUEUE_HAS_BEEN_INCLUDED +#define OPENVDB_IO_QUEUE_HAS_BEEN_INCLUDED + +#include <openvdb/Types.h> +#include <openvdb/Grid.h> +#include <boost/function.hpp> +#include <boost/shared_ptr.hpp> +#include <algorithm> // for std::copy +#include <iterator> // for std::back_inserter + + +namespace openvdb { +OPENVDB_USE_VERSION_NAMESPACE +namespace OPENVDB_VERSION_NAME { +namespace io { + +class Archive; + +/// @brief Queue for asynchronous output of grids to files or streams +/// +/// @warning The queue holds shared pointers to grids. It is not safe +/// to modify a grid that has been placed in the queue. Instead, +/// make a deep copy of the grid (Grid::deepCopy()). +/// +/// @par Example: +/// @code +/// #include <boost/bind.hpp> +/// #include <tbb/concurrent_hash_map.h> +/// #include <openvdb/openvdb.h> +/// #include <openvdb/io/Queue.h> +/// +/// using openvdb::io::Queue; +/// +/// struct MyNotifier +/// { +/// // Use a concurrent container, because queue callback functions +/// // must be thread-safe. +/// typedef tbb::concurrent_hash_map<Queue::Id, std::string> FilenameMap; +/// FilenameMap filenames; +/// +/// // Callback function that prints the status of a completed task. +/// void callback(Queue::Id id, Queue::Status status) +/// { +/// const bool ok = (status == Queue::SUCCEEDED); +/// FilenameMap::accessor acc; +/// if (filenames.find(acc, id)) { +/// std::cout << (ok ? "wrote " : "failed to write ") +/// << acc->second << std::endl; +/// filenames.erase(acc); +/// } +/// } +/// }; +/// +/// int main() +/// { +/// // Construct an object to receive notifications from the queue. +/// // The object's lifetime must exceed the queue's. +/// MyNotifier notifier; +/// +/// Queue queue; +/// +/// // Register the callback() method of the MyNotifier object +/// // to receive notifications of completed tasks. +/// queue.addNotifier(boost::bind(&MyNotifier::callback, ¬ifier, _1, _2)); +/// +/// // Queue grids for output (e.g., for each step of a simulation). +/// for (int step = 1; step <= 10; ++step) { +/// openvdb::FloatGrid::Ptr grid = ...; +/// +/// std::ostringstream os; +/// os << "mygrid." << step << ".vdb"; +/// const std::string filename = os.str(); +/// +/// Queue::Id id = queue.writeGrid(grid, openvdb::io::File(filename)); +/// +/// // Associate the filename with the ID of the queued task. +/// MyNotifier::FilenameMap::accessor acc; +/// notifier.filenames.insert(acc, id); +/// acc->second = filename; +/// } +/// } +/// @endcode +/// Output: +/// @code +/// wrote mygrid.1.vdb +/// wrote mygrid.2.vdb +/// wrote mygrid.4.vdb +/// wrote mygrid.3.vdb +/// ... +/// wrote mygrid.10.vdb +/// @endcode +/// Note that tasks do not necessarily complete in the order in which they were queued. +class OPENVDB_API Queue +{ +public: + /// Default maximum queue length (see setCapacity()) + static const Index32 DEFAULT_CAPACITY = 100; + /// @brief Default maximum time in seconds to wait to queue a task + /// when the queue is full (see setTimeout()) + static const Index32 DEFAULT_TIMEOUT = 120; // seconds + + /// ID number of a queued task or of a registered notification callback + typedef Index32 Id; + + /// Status of a queued task + enum Status { UNKNOWN, PENDING, SUCCEEDED, FAILED }; + + + /// Construct a queue with the given capacity. + explicit Queue(Index32 capacity = DEFAULT_CAPACITY); + /// Block until all queued tasks complete (successfully or unsuccessfully). + ~Queue(); + + /// @brief Return @c true if the queue is empty. + bool empty() const; + /// @brief Return the number of tasks currently in the queue. + Index32 size() const; + + /// @brief Return the maximum number of tasks allowed in the queue. + /// @details Once the queue has reached its maximum size, adding + /// a new task will block until an existing task has executed. + Index32 capacity() const; + /// Set the maximum number of tasks allowed in the queue. + void setCapacity(Index32); + + /// Return the maximum number of seconds to wait to queue a task when the queue is full. + Index32 timeout() const; + /// Set the maximum number of seconds to wait to queue a task when the queue is full. + void setTimeout(Index32 seconds = DEFAULT_TIMEOUT); + + /// @brief Return the status of the task with the given ID. + /// @note Querying the status of a task that has already completed + /// (whether successfully or not) removes the task from the status registry. + /// Subsequent queries of its status will return UNKNOWN. + Status status(Id) const; + + typedef boost::function<void (Id, Status)> Notifier; + /// @brief Register a function that will be called with a task's ID + /// and status when that task completes, whether successfully or not. + /// @return an ID that can be passed to removeNotifier() to deregister the function + /// @details When multiple notifiers are registered, they are called + /// in the order in which they were registered. + /// @warning Notifiers are called from worker threads, so they must be thread-safe + /// and their lifetimes must exceed that of the queue. They must also not call, + /// directly or indirectly, addNotifier(), removeNotifier() or clearNotifiers(), + /// as that can result in a deadlock. + Id addNotifier(Notifier); + /// Deregister the notifier with the given ID. + void removeNotifier(Id); + /// Deregister all notifiers. + void clearNotifiers(); + + /// @brief Queue a single grid for output to a file or stream. + /// @param grid the grid to be serialized + /// @param archive the io::File or io::Stream to which to output the grid + /// @param fileMetadata optional file-level metadata + /// @return an ID with which the status of the queued task can be queried + /// @throw RuntimeError if the task cannot be queued within the time limit + /// (see setTimeout()) because the queue is full + /// @par Example: + /// @code + /// openvdb::FloatGrid::Ptr grid = ...; + /// + /// openvdb::io::Queue queue; + /// + /// // Write the grid to the file mygrid.vdb. + /// queue.writeGrid(grid, openvdb::io::File("mygrid.vdb")); + /// + /// // Stream the grid to a binary string. + /// std::ostringstream ostr(std::ios_base::binary); + /// queue.writeGrid(grid, openvdb::io::Stream(ostr)); + /// @endcode + Id writeGrid(GridBase::ConstPtr grid, const Archive& archive, + const MetaMap& fileMetadata = MetaMap()); + + /// @brief Queue a container of grids for output to a file. + /// @param grids any iterable container of grid pointers + /// (e.g., a GridPtrVec or GridPtrSet) + /// @param archive the io::File or io::Stream to which to output the grids + /// @param fileMetadata optional file-level metadata + /// @return an ID with which the status of the queued task can be queried + /// @throw RuntimeError if the task cannot be queued within the time limit + /// (see setTimeout()) because the queue is full + /// @par Example: + /// @code + /// openvdb::FloatGrid::Ptr floatGrid = ...; + /// openvdb::BoolGrid::Ptr boolGrid = ...; + /// openvdb::GridPtrVec grids; + /// grids.push_back(floatGrid); + /// grids.push_back(boolGrid); + /// + /// openvdb::io::Queue queue; + /// + /// // Write the grids to the file mygrid.vdb. + /// queue.write(grids, openvdb::io::File("mygrid.vdb")); + /// + /// // Stream the grids to a (binary) string. + /// std::ostringstream ostr(std::ios_base::binary); + /// queue.write(grids, openvdb::io::Stream(ostr)); + /// @endcode + template<typename GridPtrContainer> + Id write(const GridPtrContainer& grids, const Archive& archive, + const MetaMap& fileMetadata = MetaMap()); + +private: + // Disallow copying of instances of this class. + Queue(const Queue&); + Queue& operator=(const Queue&); + + Id writeGridVec(const GridCPtrVec&, const Archive&, const MetaMap&); + + class Impl; + boost::shared_ptr<Impl> mImpl; +}; // class Queue + + +template<typename GridPtrContainer> +inline Queue::Id +Queue::write(const GridPtrContainer& container, + const Archive& archive, const MetaMap& metadata) +{ + GridCPtrVec grids; + std::copy(container.begin(), container.end(), std::back_inserter(grids)); + return this->writeGridVec(grids, archive, metadata); +} + +// Specialization for vectors of const Grid pointers; no copying necessary +template<> +inline Queue::Id +Queue::write<GridCPtrVec>(const GridCPtrVec& grids, + const Archive& archive, const MetaMap& metadata) +{ + return this->writeGridVec(grids, archive, metadata); +} + +} // namespace io +} // namespace OPENVDB_VERSION_NAME +} // namespace openvdb + +#endif // OPENVDB_IO_QUEUE_HAS_BEEN_INCLUDED + +// Copyright (c) 2012-2013 DreamWorks Animation LLC +// All rights reserved. This software is distributed under the +// Mozilla Public License 2.0 ( http://www.mozilla.org/MPL/2.0/ ) diff --git a/extern/openvdb/internal/openvdb/io/Stream.cc b/extern/openvdb/internal/openvdb/io/Stream.cc index 70bf486cee7..eb28cda631c 100644 --- a/extern/openvdb/internal/openvdb/io/Stream.cc +++ b/extern/openvdb/internal/openvdb/io/Stream.cc @@ -42,7 +42,7 @@ OPENVDB_USE_VERSION_NAMESPACE namespace OPENVDB_VERSION_NAME { namespace io { -Stream::Stream(std::istream& is) +Stream::Stream(std::istream& is): mOutputStream(NULL) { if (!is) return; @@ -81,7 +81,12 @@ Stream::Stream(std::istream& is) } -Stream::Stream() +Stream::Stream(): mOutputStream(NULL) +{ +} + + +Stream::Stream(std::ostream& os): mOutputStream(&os) { } @@ -91,6 +96,13 @@ Stream::~Stream() } +boost::shared_ptr<Archive> +Stream::copy() const +{ + return boost::shared_ptr<Archive>(new Stream(*this)); +} + + //////////////////////////////////////// diff --git a/extern/openvdb/internal/openvdb/io/Stream.h b/extern/openvdb/internal/openvdb/io/Stream.h index cd57779b245..c786e463d5d 100644 --- a/extern/openvdb/internal/openvdb/io/Stream.h +++ b/extern/openvdb/internal/openvdb/io/Stream.h @@ -49,8 +49,16 @@ class OPENVDB_API Stream: public Archive public: /// Read grids from an input stream. explicit Stream(std::istream&); + + /// Construct an archive for stream output. Stream(); - ~Stream(); + /// Construct an archive for output to the given stream. + explicit Stream(std::ostream&); + + virtual ~Stream(); + + /// @brief Return a copy of this archive. + virtual boost::shared_ptr<Archive> copy() const; /// Return the file-level metadata in a newly created MetaMap. MetaMap::Ptr getMetadata() const; @@ -58,9 +66,20 @@ public: /// Return pointers to the grids that were read from the input stream. GridPtrVecPtr getGrids() { return mGrids; } - /// Write the grids in the given container to an output stream. + /// @brief Write the grids in the given container to this archive's output stream. + /// @throw ValueError if this archive was constructed without specifying an output stream. + virtual void write(const GridCPtrVec&, const MetaMap& = MetaMap()) const; + + /// @brief Write the grids in the given container to this archive's output stream. + /// @throw ValueError if this archive was constructed without specifying an output stream. + template<typename GridPtrContainerT> + void write(const GridPtrContainerT&, const MetaMap& = MetaMap()) const; + + /// @brief Write the grids in the given container to an output stream. + /// @deprecated Use Stream(os).write(grids) instead. template<typename GridPtrContainerT> - void write(std::ostream&, const GridPtrContainerT&, const MetaMap& = MetaMap()) const; + OPENVDB_DEPRECATED void write(std::ostream&, + const GridPtrContainerT&, const MetaMap& = MetaMap()) const; private: /// Create a new grid of the type specified by the given descriptor, @@ -70,19 +89,39 @@ private: void writeGrids(std::ostream&, const GridCPtrVec&, const MetaMap&) const; - // Disallow copying of instances of this class. - Stream(const Stream&); - Stream& operator=(const Stream&); - MetaMap::Ptr mMeta; GridPtrVecPtr mGrids; + std::ostream* mOutputStream; }; //////////////////////////////////////// +inline void +Stream::write(const GridCPtrVec& grids, const MetaMap& metadata) const +{ + if (mOutputStream == NULL) { + OPENVDB_THROW(ValueError, "no output stream was specified"); + } + this->writeGrids(*mOutputStream, grids, metadata); +} + + +template<typename GridPtrContainerT> +inline void +Stream::write(const GridPtrContainerT& container, const MetaMap& metadata) const +{ + if (mOutputStream == NULL) { + OPENVDB_THROW(ValueError, "no output stream was specified"); + } + GridCPtrVec grids; + std::copy(container.begin(), container.end(), std::back_inserter(grids)); + this->writeGrids(*mOutputStream, grids, metadata); +} + + template<typename GridPtrContainerT> inline void Stream::write(std::ostream& os, const GridPtrContainerT& container, |