Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'utils/gst-plugin-togglerecord')
-rw-r--r--utils/gst-plugin-togglerecord/Cargo.toml34
-rw-r--r--utils/gst-plugin-togglerecord/LICENSE502
-rw-r--r--utils/gst-plugin-togglerecord/build.rs5
-rw-r--r--utils/gst-plugin-togglerecord/examples/gtk_recording.rs358
-rw-r--r--utils/gst-plugin-togglerecord/src/lib.rs48
-rw-r--r--utils/gst-plugin-togglerecord/src/togglerecord.rs1749
-rw-r--r--utils/gst-plugin-togglerecord/tests/tests.rs1173
7 files changed, 3869 insertions, 0 deletions
diff --git a/utils/gst-plugin-togglerecord/Cargo.toml b/utils/gst-plugin-togglerecord/Cargo.toml
new file mode 100644
index 000000000..ac25dba01
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/Cargo.toml
@@ -0,0 +1,34 @@
+[package]
+name = "gst-plugin-togglerecord"
+version = "0.6.0"
+authors = ["Sebastian Dröge <sebastian@centricular.com>"]
+license = "LGPL-2.1+"
+description = "Toggle Record Plugin"
+repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
+edition = "2018"
+
+[dependencies]
+glib = { git = "https://github.com/gtk-rs/glib" }
+gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gstreamer-audio = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
+gtk = { git = "https://github.com/gtk-rs/gtk", optional = true }
+gio = { git = "https://github.com/gtk-rs/gio", optional = true }
+parking_lot = "0.10"
+lazy_static = "1.0"
+
+[dev-dependencies]
+either = "1.0"
+
+[lib]
+name = "gsttogglerecord"
+crate-type = ["cdylib", "rlib"]
+path = "src/lib.rs"
+
+[[example]]
+name = "gtk-recording"
+path = "examples/gtk_recording.rs"
+required-features = ["gtk", "gio"]
+
+[build-dependencies]
+gst-plugin-version-helper = { path="../../gst-plugin-version-helper" }
diff --git a/utils/gst-plugin-togglerecord/LICENSE b/utils/gst-plugin-togglerecord/LICENSE
new file mode 100644
index 000000000..4362b4915
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/LICENSE
@@ -0,0 +1,502 @@
+ GNU LESSER GENERAL PUBLIC LICENSE
+ Version 2.1, February 1999
+
+ Copyright (C) 1991, 1999 Free Software Foundation, Inc.
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+[This is the first released version of the Lesser GPL. It also counts
+ as the successor of the GNU Library Public License, version 2, hence
+ the version number 2.1.]
+
+ Preamble
+
+ The licenses for most software are designed to take away your
+freedom to share and change it. By contrast, the GNU General Public
+Licenses are intended to guarantee your freedom to share and change
+free software--to make sure the software is free for all its users.
+
+ This license, the Lesser General Public License, applies to some
+specially designated software packages--typically libraries--of the
+Free Software Foundation and other authors who decide to use it. You
+can use it too, but we suggest you first think carefully about whether
+this license or the ordinary General Public License is the better
+strategy to use in any particular case, based on the explanations below.
+
+ When we speak of free software, we are referring to freedom of use,
+not price. Our General Public Licenses are designed to make sure that
+you have the freedom to distribute copies of free software (and charge
+for this service if you wish); that you receive source code or can get
+it if you want it; that you can change the software and use pieces of
+it in new free programs; and that you are informed that you can do
+these things.
+
+ To protect your rights, we need to make restrictions that forbid
+distributors to deny you these rights or to ask you to surrender these
+rights. These restrictions translate to certain responsibilities for
+you if you distribute copies of the library or if you modify it.
+
+ For example, if you distribute copies of the library, whether gratis
+or for a fee, you must give the recipients all the rights that we gave
+you. You must make sure that they, too, receive or can get the source
+code. If you link other code with the library, you must provide
+complete object files to the recipients, so that they can relink them
+with the library after making changes to the library and recompiling
+it. And you must show them these terms so they know their rights.
+
+ We protect your rights with a two-step method: (1) we copyright the
+library, and (2) we offer you this license, which gives you legal
+permission to copy, distribute and/or modify the library.
+
+ To protect each distributor, we want to make it very clear that
+there is no warranty for the free library. Also, if the library is
+modified by someone else and passed on, the recipients should know
+that what they have is not the original version, so that the original
+author's reputation will not be affected by problems that might be
+introduced by others.
+
+ Finally, software patents pose a constant threat to the existence of
+any free program. We wish to make sure that a company cannot
+effectively restrict the users of a free program by obtaining a
+restrictive license from a patent holder. Therefore, we insist that
+any patent license obtained for a version of the library must be
+consistent with the full freedom of use specified in this license.
+
+ Most GNU software, including some libraries, is covered by the
+ordinary GNU General Public License. This license, the GNU Lesser
+General Public License, applies to certain designated libraries, and
+is quite different from the ordinary General Public License. We use
+this license for certain libraries in order to permit linking those
+libraries into non-free programs.
+
+ When a program is linked with a library, whether statically or using
+a shared library, the combination of the two is legally speaking a
+combined work, a derivative of the original library. The ordinary
+General Public License therefore permits such linking only if the
+entire combination fits its criteria of freedom. The Lesser General
+Public License permits more lax criteria for linking other code with
+the library.
+
+ We call this license the "Lesser" General Public License because it
+does Less to protect the user's freedom than the ordinary General
+Public License. It also provides other free software developers Less
+of an advantage over competing non-free programs. These disadvantages
+are the reason we use the ordinary General Public License for many
+libraries. However, the Lesser license provides advantages in certain
+special circumstances.
+
+ For example, on rare occasions, there may be a special need to
+encourage the widest possible use of a certain library, so that it becomes
+a de-facto standard. To achieve this, non-free programs must be
+allowed to use the library. A more frequent case is that a free
+library does the same job as widely used non-free libraries. In this
+case, there is little to gain by limiting the free library to free
+software only, so we use the Lesser General Public License.
+
+ In other cases, permission to use a particular library in non-free
+programs enables a greater number of people to use a large body of
+free software. For example, permission to use the GNU C Library in
+non-free programs enables many more people to use the whole GNU
+operating system, as well as its variant, the GNU/Linux operating
+system.
+
+ Although the Lesser General Public License is Less protective of the
+users' freedom, it does ensure that the user of a program that is
+linked with the Library has the freedom and the wherewithal to run
+that program using a modified version of the Library.
+
+ The precise terms and conditions for copying, distribution and
+modification follow. Pay close attention to the difference between a
+"work based on the library" and a "work that uses the library". The
+former contains code derived from the library, whereas the latter must
+be combined with the library in order to run.
+
+ GNU LESSER GENERAL PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. This License Agreement applies to any software library or other
+program which contains a notice placed by the copyright holder or
+other authorized party saying it may be distributed under the terms of
+this Lesser General Public License (also called "this License").
+Each licensee is addressed as "you".
+
+ A "library" means a collection of software functions and/or data
+prepared so as to be conveniently linked with application programs
+(which use some of those functions and data) to form executables.
+
+ The "Library", below, refers to any such software library or work
+which has been distributed under these terms. A "work based on the
+Library" means either the Library or any derivative work under
+copyright law: that is to say, a work containing the Library or a
+portion of it, either verbatim or with modifications and/or translated
+straightforwardly into another language. (Hereinafter, translation is
+included without limitation in the term "modification".)
+
+ "Source code" for a work means the preferred form of the work for
+making modifications to it. For a library, complete source code means
+all the source code for all modules it contains, plus any associated
+interface definition files, plus the scripts used to control compilation
+and installation of the library.
+
+ Activities other than copying, distribution and modification are not
+covered by this License; they are outside its scope. The act of
+running a program using the Library is not restricted, and output from
+such a program is covered only if its contents constitute a work based
+on the Library (independent of the use of the Library in a tool for
+writing it). Whether that is true depends on what the Library does
+and what the program that uses the Library does.
+
+ 1. You may copy and distribute verbatim copies of the Library's
+complete source code as you receive it, in any medium, provided that
+you conspicuously and appropriately publish on each copy an
+appropriate copyright notice and disclaimer of warranty; keep intact
+all the notices that refer to this License and to the absence of any
+warranty; and distribute a copy of this License along with the
+Library.
+
+ You may charge a fee for the physical act of transferring a copy,
+and you may at your option offer warranty protection in exchange for a
+fee.
+
+ 2. You may modify your copy or copies of the Library or any portion
+of it, thus forming a work based on the Library, and copy and
+distribute such modifications or work under the terms of Section 1
+above, provided that you also meet all of these conditions:
+
+ a) The modified work must itself be a software library.
+
+ b) You must cause the files modified to carry prominent notices
+ stating that you changed the files and the date of any change.
+
+ c) You must cause the whole of the work to be licensed at no
+ charge to all third parties under the terms of this License.
+
+ d) If a facility in the modified Library refers to a function or a
+ table of data to be supplied by an application program that uses
+ the facility, other than as an argument passed when the facility
+ is invoked, then you must make a good faith effort to ensure that,
+ in the event an application does not supply such function or
+ table, the facility still operates, and performs whatever part of
+ its purpose remains meaningful.
+
+ (For example, a function in a library to compute square roots has
+ a purpose that is entirely well-defined independent of the
+ application. Therefore, Subsection 2d requires that any
+ application-supplied function or table used by this function must
+ be optional: if the application does not supply it, the square
+ root function must still compute square roots.)
+
+These requirements apply to the modified work as a whole. If
+identifiable sections of that work are not derived from the Library,
+and can be reasonably considered independent and separate works in
+themselves, then this License, and its terms, do not apply to those
+sections when you distribute them as separate works. But when you
+distribute the same sections as part of a whole which is a work based
+on the Library, the distribution of the whole must be on the terms of
+this License, whose permissions for other licensees extend to the
+entire whole, and thus to each and every part regardless of who wrote
+it.
+
+Thus, it is not the intent of this section to claim rights or contest
+your rights to work written entirely by you; rather, the intent is to
+exercise the right to control the distribution of derivative or
+collective works based on the Library.
+
+In addition, mere aggregation of another work not based on the Library
+with the Library (or with a work based on the Library) on a volume of
+a storage or distribution medium does not bring the other work under
+the scope of this License.
+
+ 3. You may opt to apply the terms of the ordinary GNU General Public
+License instead of this License to a given copy of the Library. To do
+this, you must alter all the notices that refer to this License, so
+that they refer to the ordinary GNU General Public License, version 2,
+instead of to this License. (If a newer version than version 2 of the
+ordinary GNU General Public License has appeared, then you can specify
+that version instead if you wish.) Do not make any other change in
+these notices.
+
+ Once this change is made in a given copy, it is irreversible for
+that copy, so the ordinary GNU General Public License applies to all
+subsequent copies and derivative works made from that copy.
+
+ This option is useful when you wish to copy part of the code of
+the Library into a program that is not a library.
+
+ 4. You may copy and distribute the Library (or a portion or
+derivative of it, under Section 2) in object code or executable form
+under the terms of Sections 1 and 2 above provided that you accompany
+it with the complete corresponding machine-readable source code, which
+must be distributed under the terms of Sections 1 and 2 above on a
+medium customarily used for software interchange.
+
+ If distribution of object code is made by offering access to copy
+from a designated place, then offering equivalent access to copy the
+source code from the same place satisfies the requirement to
+distribute the source code, even though third parties are not
+compelled to copy the source along with the object code.
+
+ 5. A program that contains no derivative of any portion of the
+Library, but is designed to work with the Library by being compiled or
+linked with it, is called a "work that uses the Library". Such a
+work, in isolation, is not a derivative work of the Library, and
+therefore falls outside the scope of this License.
+
+ However, linking a "work that uses the Library" with the Library
+creates an executable that is a derivative of the Library (because it
+contains portions of the Library), rather than a "work that uses the
+library". The executable is therefore covered by this License.
+Section 6 states terms for distribution of such executables.
+
+ When a "work that uses the Library" uses material from a header file
+that is part of the Library, the object code for the work may be a
+derivative work of the Library even though the source code is not.
+Whether this is true is especially significant if the work can be
+linked without the Library, or if the work is itself a library. The
+threshold for this to be true is not precisely defined by law.
+
+ If such an object file uses only numerical parameters, data
+structure layouts and accessors, and small macros and small inline
+functions (ten lines or less in length), then the use of the object
+file is unrestricted, regardless of whether it is legally a derivative
+work. (Executables containing this object code plus portions of the
+Library will still fall under Section 6.)
+
+ Otherwise, if the work is a derivative of the Library, you may
+distribute the object code for the work under the terms of Section 6.
+Any executables containing that work also fall under Section 6,
+whether or not they are linked directly with the Library itself.
+
+ 6. As an exception to the Sections above, you may also combine or
+link a "work that uses the Library" with the Library to produce a
+work containing portions of the Library, and distribute that work
+under terms of your choice, provided that the terms permit
+modification of the work for the customer's own use and reverse
+engineering for debugging such modifications.
+
+ You must give prominent notice with each copy of the work that the
+Library is used in it and that the Library and its use are covered by
+this License. You must supply a copy of this License. If the work
+during execution displays copyright notices, you must include the
+copyright notice for the Library among them, as well as a reference
+directing the user to the copy of this License. Also, you must do one
+of these things:
+
+ a) Accompany the work with the complete corresponding
+ machine-readable source code for the Library including whatever
+ changes were used in the work (which must be distributed under
+ Sections 1 and 2 above); and, if the work is an executable linked
+ with the Library, with the complete machine-readable "work that
+ uses the Library", as object code and/or source code, so that the
+ user can modify the Library and then relink to produce a modified
+ executable containing the modified Library. (It is understood
+ that the user who changes the contents of definitions files in the
+ Library will not necessarily be able to recompile the application
+ to use the modified definitions.)
+
+ b) Use a suitable shared library mechanism for linking with the
+ Library. A suitable mechanism is one that (1) uses at run time a
+ copy of the library already present on the user's computer system,
+ rather than copying library functions into the executable, and (2)
+ will operate properly with a modified version of the library, if
+ the user installs one, as long as the modified version is
+ interface-compatible with the version that the work was made with.
+
+ c) Accompany the work with a written offer, valid for at
+ least three years, to give the same user the materials
+ specified in Subsection 6a, above, for a charge no more
+ than the cost of performing this distribution.
+
+ d) If distribution of the work is made by offering access to copy
+ from a designated place, offer equivalent access to copy the above
+ specified materials from the same place.
+
+ e) Verify that the user has already received a copy of these
+ materials or that you have already sent this user a copy.
+
+ For an executable, the required form of the "work that uses the
+Library" must include any data and utility programs needed for
+reproducing the executable from it. However, as a special exception,
+the materials to be distributed need not include anything that is
+normally distributed (in either source or binary form) with the major
+components (compiler, kernel, and so on) of the operating system on
+which the executable runs, unless that component itself accompanies
+the executable.
+
+ It may happen that this requirement contradicts the license
+restrictions of other proprietary libraries that do not normally
+accompany the operating system. Such a contradiction means you cannot
+use both them and the Library together in an executable that you
+distribute.
+
+ 7. You may place library facilities that are a work based on the
+Library side-by-side in a single library together with other library
+facilities not covered by this License, and distribute such a combined
+library, provided that the separate distribution of the work based on
+the Library and of the other library facilities is otherwise
+permitted, and provided that you do these two things:
+
+ a) Accompany the combined library with a copy of the same work
+ based on the Library, uncombined with any other library
+ facilities. This must be distributed under the terms of the
+ Sections above.
+
+ b) Give prominent notice with the combined library of the fact
+ that part of it is a work based on the Library, and explaining
+ where to find the accompanying uncombined form of the same work.
+
+ 8. You may not copy, modify, sublicense, link with, or distribute
+the Library except as expressly provided under this License. Any
+attempt otherwise to copy, modify, sublicense, link with, or
+distribute the Library is void, and will automatically terminate your
+rights under this License. However, parties who have received copies,
+or rights, from you under this License will not have their licenses
+terminated so long as such parties remain in full compliance.
+
+ 9. You are not required to accept this License, since you have not
+signed it. However, nothing else grants you permission to modify or
+distribute the Library or its derivative works. These actions are
+prohibited by law if you do not accept this License. Therefore, by
+modifying or distributing the Library (or any work based on the
+Library), you indicate your acceptance of this License to do so, and
+all its terms and conditions for copying, distributing or modifying
+the Library or works based on it.
+
+ 10. Each time you redistribute the Library (or any work based on the
+Library), the recipient automatically receives a license from the
+original licensor to copy, distribute, link with or modify the Library
+subject to these terms and conditions. You may not impose any further
+restrictions on the recipients' exercise of the rights granted herein.
+You are not responsible for enforcing compliance by third parties with
+this License.
+
+ 11. If, as a consequence of a court judgment or allegation of patent
+infringement or for any other reason (not limited to patent issues),
+conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot
+distribute so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you
+may not distribute the Library at all. For example, if a patent
+license would not permit royalty-free redistribution of the Library by
+all those who receive copies directly or indirectly through you, then
+the only way you could satisfy both it and this License would be to
+refrain entirely from distribution of the Library.
+
+If any portion of this section is held invalid or unenforceable under any
+particular circumstance, the balance of the section is intended to apply,
+and the section as a whole is intended to apply in other circumstances.
+
+It is not the purpose of this section to induce you to infringe any
+patents or other property right claims or to contest validity of any
+such claims; this section has the sole purpose of protecting the
+integrity of the free software distribution system which is
+implemented by public license practices. Many people have made
+generous contributions to the wide range of software distributed
+through that system in reliance on consistent application of that
+system; it is up to the author/donor to decide if he or she is willing
+to distribute software through any other system and a licensee cannot
+impose that choice.
+
+This section is intended to make thoroughly clear what is believed to
+be a consequence of the rest of this License.
+
+ 12. If the distribution and/or use of the Library is restricted in
+certain countries either by patents or by copyrighted interfaces, the
+original copyright holder who places the Library under this License may add
+an explicit geographical distribution limitation excluding those countries,
+so that distribution is permitted only in or among countries not thus
+excluded. In such case, this License incorporates the limitation as if
+written in the body of this License.
+
+ 13. The Free Software Foundation may publish revised and/or new
+versions of the Lesser General Public License from time to time.
+Such new versions will be similar in spirit to the present version,
+but may differ in detail to address new problems or concerns.
+
+Each version is given a distinguishing version number. If the Library
+specifies a version number of this License which applies to it and
+"any later version", you have the option of following the terms and
+conditions either of that version or of any later version published by
+the Free Software Foundation. If the Library does not specify a
+license version number, you may choose any version ever published by
+the Free Software Foundation.
+
+ 14. If you wish to incorporate parts of the Library into other free
+programs whose distribution conditions are incompatible with these,
+write to the author to ask for permission. For software which is
+copyrighted by the Free Software Foundation, write to the Free
+Software Foundation; we sometimes make exceptions for this. Our
+decision will be guided by the two goals of preserving the free status
+of all derivatives of our free software and of promoting the sharing
+and reuse of software generally.
+
+ NO WARRANTY
+
+ 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO
+WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW.
+EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR
+OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY
+KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE
+LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME
+THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
+
+ 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN
+WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY
+AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU
+FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR
+CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE
+LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
+RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
+FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
+SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
+DAMAGES.
+
+ END OF TERMS AND CONDITIONS
+
+ How to Apply These Terms to Your New Libraries
+
+ If you develop a new library, and you want it to be of the greatest
+possible use to the public, we recommend making it free software that
+everyone can redistribute and change. You can do so by permitting
+redistribution under these terms (or, alternatively, under the terms of the
+ordinary General Public License).
+
+ To apply these terms, attach the following notices to the library. It is
+safest to attach them to the start of each source file to most effectively
+convey the exclusion of warranty; and each file should have at least the
+"copyright" line and a pointer to where the full notice is found.
+
+ <one line to give the library's name and a brief idea of what it does.>
+ Copyright (C) <year> <name of author>
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 2.1 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+Also add information on how to contact you by electronic and paper mail.
+
+You should also get your employer (if you work as a programmer) or your
+school, if any, to sign a "copyright disclaimer" for the library, if
+necessary. Here is a sample; alter the names:
+
+ Yoyodyne, Inc., hereby disclaims all copyright interest in the
+ library `Frob' (a library for tweaking knobs) written by James Random Hacker.
+
+ <signature of Ty Coon>, 1 April 1990
+ Ty Coon, President of Vice
+
+That's all there is to it!
diff --git a/utils/gst-plugin-togglerecord/build.rs b/utils/gst-plugin-togglerecord/build.rs
new file mode 100644
index 000000000..0d1ddb61d
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/build.rs
@@ -0,0 +1,5 @@
+extern crate gst_plugin_version_helper;
+
+fn main() {
+ gst_plugin_version_helper::get_info()
+}
diff --git a/utils/gst-plugin-togglerecord/examples/gtk_recording.rs b/utils/gst-plugin-togglerecord/examples/gtk_recording.rs
new file mode 100644
index 000000000..8f0bc0bb1
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/examples/gtk_recording.rs
@@ -0,0 +1,358 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+extern crate glib;
+use glib::prelude::*;
+extern crate gio;
+use gio::prelude::*;
+
+extern crate gstreamer as gst;
+use gst::prelude::*;
+
+extern crate gsttogglerecord;
+
+extern crate gtk;
+use gtk::prelude::*;
+use std::cell::RefCell;
+use std::env;
+
+fn create_pipeline() -> (
+ gst::Pipeline,
+ gst::Pad,
+ gst::Pad,
+ gst::Element,
+ gst::Element,
+ gtk::Widget,
+) {
+ let pipeline = gst::Pipeline::new(None);
+
+ let video_src = gst::ElementFactory::make("videotestsrc", None).unwrap();
+ video_src.set_property("is-live", &true).unwrap();
+ video_src.set_property_from_str("pattern", "ball");
+
+ let timeoverlay = gst::ElementFactory::make("timeoverlay", None).unwrap();
+ timeoverlay
+ .set_property("font-desc", &"Monospace 20")
+ .unwrap();
+
+ let video_tee = gst::ElementFactory::make("tee", None).unwrap();
+ let video_queue1 = gst::ElementFactory::make("queue", None).unwrap();
+ let video_queue2 = gst::ElementFactory::make("queue", None).unwrap();
+
+ let video_convert1 = gst::ElementFactory::make("videoconvert", None).unwrap();
+ let video_convert2 = gst::ElementFactory::make("videoconvert", None).unwrap();
+
+ let (video_sink, video_widget) =
+ if let Ok(gtkglsink) = gst::ElementFactory::make("gtkglsink", None) {
+ let glsinkbin = gst::ElementFactory::make("glsinkbin", None).unwrap();
+ glsinkbin.set_property("sink", &gtkglsink).unwrap();
+
+ let widget = gtkglsink.get_property("widget").unwrap();
+ (glsinkbin, widget.get::<gtk::Widget>().unwrap().unwrap())
+ } else {
+ let sink = gst::ElementFactory::make("gtksink", None).unwrap();
+ let widget = sink.get_property("widget").unwrap();
+ (sink, widget.get::<gtk::Widget>().unwrap().unwrap())
+ };
+
+ let video_enc = gst::ElementFactory::make("x264enc", None).unwrap();
+ video_enc.set_property("rc-lookahead", &10i32).unwrap();
+ video_enc.set_property("key-int-max", &30u32).unwrap();
+ let video_parse = gst::ElementFactory::make("h264parse", None).unwrap();
+
+ let audio_src = gst::ElementFactory::make("audiotestsrc", None).unwrap();
+ audio_src.set_property("is-live", &true).unwrap();
+ audio_src.set_property_from_str("wave", "ticks");
+
+ let audio_tee = gst::ElementFactory::make("tee", None).unwrap();
+ let audio_queue1 = gst::ElementFactory::make("queue", None).unwrap();
+ let audio_queue2 = gst::ElementFactory::make("queue", None).unwrap();
+
+ let audio_convert1 = gst::ElementFactory::make("audioconvert", None).unwrap();
+ let audio_convert2 = gst::ElementFactory::make("audioconvert", None).unwrap();
+
+ let audio_sink = gst::ElementFactory::make("autoaudiosink", None).unwrap();
+
+ let audio_enc = gst::ElementFactory::make("lamemp3enc", None).unwrap();
+ let audio_parse = gst::ElementFactory::make("mpegaudioparse", None).unwrap();
+
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+
+ let mux_queue1 = gst::ElementFactory::make("queue", None).unwrap();
+ let mux_queue2 = gst::ElementFactory::make("queue", None).unwrap();
+
+ let mux = gst::ElementFactory::make("mp4mux", None).unwrap();
+
+ let file_sink = gst::ElementFactory::make("filesink", None).unwrap();
+ file_sink
+ .set_property("location", &"recording.mp4")
+ .unwrap();
+ file_sink.set_property("async", &false).unwrap();
+ file_sink.set_property("sync", &false).unwrap();
+
+ pipeline
+ .add_many(&[
+ &video_src,
+ &timeoverlay,
+ &video_tee,
+ &video_queue1,
+ &video_queue2,
+ &video_convert1,
+ &video_convert2,
+ &video_sink,
+ &video_enc,
+ &video_parse,
+ &audio_src,
+ &audio_tee,
+ &audio_queue1,
+ &audio_queue2,
+ &audio_convert1,
+ &audio_convert2,
+ &audio_sink,
+ &audio_enc,
+ &audio_parse,
+ &togglerecord,
+ &mux_queue1,
+ &mux_queue2,
+ &mux,
+ &file_sink,
+ ])
+ .unwrap();
+
+ gst::Element::link_many(&[
+ &video_src,
+ &timeoverlay,
+ &video_tee,
+ &video_queue1,
+ &video_convert1,
+ &video_sink,
+ ])
+ .unwrap();
+
+ gst::Element::link_many(&[
+ &video_tee,
+ &video_queue2,
+ &video_convert2,
+ &video_enc,
+ &video_parse,
+ ])
+ .unwrap();
+
+ video_parse
+ .link_pads(Some("src"), &togglerecord, Some("sink"))
+ .unwrap();
+ togglerecord
+ .link_pads(Some("src"), &mux_queue1, Some("sink"))
+ .unwrap();
+ mux_queue1
+ .link_pads(Some("src"), &mux, Some("video_%u"))
+ .unwrap();
+
+ gst::Element::link_many(&[
+ &audio_src,
+ &audio_tee,
+ &audio_queue1,
+ &audio_convert1,
+ &audio_sink,
+ ])
+ .unwrap();
+
+ gst::Element::link_many(&[
+ &audio_tee,
+ &audio_queue2,
+ &audio_convert2,
+ &audio_enc,
+ &audio_parse,
+ ])
+ .unwrap();
+
+ audio_parse
+ .link_pads(Some("src"), &togglerecord, Some("sink_0"))
+ .unwrap();
+ togglerecord
+ .link_pads(Some("src_0"), &mux_queue2, Some("sink"))
+ .unwrap();
+ mux_queue2
+ .link_pads(Some("src"), &mux, Some("audio_%u"))
+ .unwrap();
+
+ gst::Element::link_many(&[&mux, &file_sink]).unwrap();
+
+ (
+ pipeline,
+ video_queue2.get_static_pad("sink").unwrap(),
+ audio_queue2.get_static_pad("sink").unwrap(),
+ togglerecord,
+ video_sink,
+ video_widget,
+ )
+}
+
+fn create_ui(app: &gtk::Application) {
+ let (pipeline, video_pad, audio_pad, togglerecord, video_sink, video_widget) =
+ create_pipeline();
+
+ let window = gtk::Window::new(gtk::WindowType::Toplevel);
+ window.set_default_size(320, 240);
+ let vbox = gtk::Box::new(gtk::Orientation::Vertical, 0);
+ vbox.pack_start(&video_widget, true, true, 0);
+
+ let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0);
+ let position_label = gtk::Label::new(Some("Position: 00:00:00"));
+ hbox.pack_start(&position_label, true, true, 5);
+ let recorded_duration_label = gtk::Label::new(Some("Recorded: 00:00:00"));
+ hbox.pack_start(&recorded_duration_label, true, true, 5);
+ vbox.pack_start(&hbox, false, false, 5);
+
+ let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0);
+ let record_button = gtk::Button::new_with_label("Record");
+ hbox.pack_start(&record_button, true, true, 5);
+ let finish_button = gtk::Button::new_with_label("Finish");
+ hbox.pack_start(&finish_button, true, true, 5);
+ vbox.pack_start(&hbox, false, false, 5);
+
+ window.add(&vbox);
+ window.show_all();
+
+ app.add_window(&window);
+
+ let video_sink_weak = video_sink.downgrade();
+ let togglerecord_weak = togglerecord.downgrade();
+ let timeout_id = gtk::timeout_add(100, move || {
+ let video_sink = match video_sink_weak.upgrade() {
+ Some(video_sink) => video_sink,
+ None => return glib::Continue(true),
+ };
+
+ let togglerecord = match togglerecord_weak.upgrade() {
+ Some(togglerecord) => togglerecord,
+ None => return glib::Continue(true),
+ };
+
+ let position = video_sink
+ .query_position::<gst::ClockTime>()
+ .unwrap_or_else(|| 0.into());
+ position_label.set_text(&format!("Position: {:.1}", position));
+
+ let recording_duration = togglerecord
+ .get_static_pad("src")
+ .unwrap()
+ .query_position::<gst::ClockTime>()
+ .unwrap_or_else(|| 0.into());
+ recorded_duration_label.set_text(&format!("Recorded: {:.1}", recording_duration));
+
+ glib::Continue(true)
+ });
+
+ let togglerecord_weak = togglerecord.downgrade();
+ record_button.connect_clicked(move |button| {
+ let togglerecord = match togglerecord_weak.upgrade() {
+ Some(togglerecord) => togglerecord,
+ None => return,
+ };
+
+ let recording = !togglerecord
+ .get_property("record")
+ .unwrap()
+ .get_some::<bool>()
+ .unwrap();
+ togglerecord.set_property("record", &recording).unwrap();
+
+ button.set_label(if recording { "Stop" } else { "Record" });
+ });
+
+ let record_button_weak = record_button.downgrade();
+ finish_button.connect_clicked(move |button| {
+ let record_button = match record_button_weak.upgrade() {
+ Some(record_button) => record_button,
+ None => return,
+ };
+
+ record_button.set_sensitive(false);
+ button.set_sensitive(false);
+
+ video_pad.send_event(gst::Event::new_eos().build());
+ audio_pad.send_event(gst::Event::new_eos().build());
+ });
+
+ let app_weak = app.downgrade();
+ window.connect_delete_event(move |_, _| {
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return Inhibit(false),
+ };
+
+ app.quit();
+ Inhibit(false)
+ });
+
+ let bus = pipeline.get_bus().unwrap();
+ let app_weak = app.downgrade();
+ bus.add_watch_local(move |_, msg| {
+ use gst::MessageView;
+
+ let app = match app_weak.upgrade() {
+ Some(app) => app,
+ None => return glib::Continue(false),
+ };
+
+ match msg.view() {
+ MessageView::Eos(..) => app.quit(),
+ MessageView::Error(err) => {
+ println!(
+ "Error from {:?}: {} ({:?})",
+ msg.get_src().map(|s| s.get_path_string()),
+ err.get_error(),
+ err.get_debug()
+ );
+ app.quit();
+ }
+ _ => (),
+ };
+
+ glib::Continue(true)
+ })
+ .expect("Failed to add bus watch");
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ // Pipeline reference is owned by the closure below, so will be
+ // destroyed once the app is destroyed
+ let timeout_id = RefCell::new(Some(timeout_id));
+ app.connect_shutdown(move |_| {
+ pipeline.set_state(gst::State::Null).unwrap();
+
+ bus.remove_watch().unwrap();
+
+ if let Some(timeout_id) = timeout_id.borrow_mut().take() {
+ glib::source_remove(timeout_id);
+ }
+ });
+}
+
+fn main() {
+ gst::init().unwrap();
+ gtk::init().unwrap();
+
+ gsttogglerecord::plugin_register_static().expect("Failed to register togglerecord plugin");
+
+ let app = gtk::Application::new(None, gio::ApplicationFlags::FLAGS_NONE).unwrap();
+
+ app.connect_activate(create_ui);
+ let args = env::args().collect::<Vec<_>>();
+ app.run(&args);
+}
diff --git a/utils/gst-plugin-togglerecord/src/lib.rs b/utils/gst-plugin-togglerecord/src/lib.rs
new file mode 100644
index 000000000..9f2154662
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/src/lib.rs
@@ -0,0 +1,48 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+#![crate_type = "cdylib"]
+
+#[macro_use]
+extern crate glib;
+#[macro_use]
+extern crate gstreamer as gst;
+extern crate gstreamer_audio as gst_audio;
+extern crate gstreamer_video as gst_video;
+
+#[macro_use]
+extern crate lazy_static;
+
+extern crate parking_lot;
+
+mod togglerecord;
+
+fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ togglerecord::register(plugin)
+}
+
+gst_plugin_define!(
+ togglerecord,
+ env!("CARGO_PKG_DESCRIPTION"),
+ plugin_init,
+ concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
+ "LGPL",
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_NAME"),
+ env!("CARGO_PKG_REPOSITORY"),
+ env!("BUILD_REL_DATE")
+);
diff --git a/utils/gst-plugin-togglerecord/src/togglerecord.rs b/utils/gst-plugin-togglerecord/src/togglerecord.rs
new file mode 100644
index 000000000..07872b73d
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/src/togglerecord.rs
@@ -0,0 +1,1749 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use glib;
+use glib::prelude::*;
+use glib::subclass;
+use glib::subclass::prelude::*;
+use gst;
+use gst::prelude::*;
+use gst::subclass::prelude::*;
+use gst_audio;
+use gst_video;
+
+use parking_lot::{Condvar, Mutex};
+use std::cmp;
+use std::collections::HashMap;
+use std::f64;
+use std::iter;
+use std::sync::Arc;
+
+const DEFAULT_RECORD: bool = false;
+
+#[derive(Debug, Clone, Copy)]
+struct Settings {
+ record: bool,
+}
+
+impl Default for Settings {
+ fn default() -> Self {
+ Settings {
+ record: DEFAULT_RECORD,
+ }
+ }
+}
+
+static PROPERTIES: [subclass::Property; 2] = [
+ subclass::Property("record", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Record",
+ "Enable/disable recording",
+ DEFAULT_RECORD,
+ glib::ParamFlags::READWRITE,
+ )
+ }),
+ subclass::Property("recording", |name| {
+ glib::ParamSpec::boolean(
+ name,
+ "Recording",
+ "Whether recording is currently taking place",
+ DEFAULT_RECORD,
+ glib::ParamFlags::READABLE,
+ )
+ }),
+];
+
+#[derive(Clone)]
+struct Stream {
+ sinkpad: gst::Pad,
+ srcpad: gst::Pad,
+ state: Arc<Mutex<StreamState>>,
+}
+
+impl PartialEq for Stream {
+ fn eq(&self, other: &Self) -> bool {
+ self.sinkpad == other.sinkpad && self.srcpad == other.srcpad
+ }
+}
+
+impl Eq for Stream {}
+
+impl Stream {
+ fn new(sinkpad: gst::Pad, srcpad: gst::Pad) -> Self {
+ Self {
+ sinkpad,
+ srcpad,
+ state: Arc::new(Mutex::new(StreamState::default())),
+ }
+ }
+}
+
+struct StreamState {
+ in_segment: gst::FormattedSegment<gst::ClockTime>,
+ out_segment: gst::FormattedSegment<gst::ClockTime>,
+ segment_seqnum: gst::Seqnum,
+ current_running_time: gst::ClockTime,
+ eos: bool,
+ flushing: bool,
+ segment_pending: bool,
+ pending_events: Vec<gst::Event>,
+ audio_info: Option<gst_audio::AudioInfo>,
+ video_info: Option<gst_video::VideoInfo>,
+}
+
+impl Default for StreamState {
+ fn default() -> Self {
+ Self {
+ in_segment: gst::FormattedSegment::new(),
+ out_segment: gst::FormattedSegment::new(),
+ segment_seqnum: gst::Seqnum::next(),
+ current_running_time: gst::CLOCK_TIME_NONE,
+ eos: false,
+ flushing: false,
+ segment_pending: false,
+ pending_events: Vec::new(),
+ audio_info: None,
+ video_info: None,
+ }
+ }
+}
+
+// Recording behaviour:
+//
+// Secondary streams are *always* behind main stream
+// Main stream EOS stops recording (-> Stopping), makes secondary streams go EOS
+//
+// Recording: Passing through all data
+// Stopping: Main stream remembering current last_recording_stop, waiting for all
+// other streams to reach this position
+// Stopped: Dropping all data
+// Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting
+// for all other streams to reach this position
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+enum RecordingState {
+ Recording,
+ Stopping,
+ Stopped,
+ Starting,
+}
+
+#[derive(Debug)]
+struct State {
+ recording_state: RecordingState,
+ last_recording_start: gst::ClockTime,
+ last_recording_stop: gst::ClockTime,
+ // Accumulated duration of previous recording segments,
+ // updated whenever going to Stopped
+ recording_duration: gst::ClockTime,
+ // Updated whenever going to Recording
+ running_time_offset: gst::ClockTime,
+}
+
+impl Default for State {
+ fn default() -> Self {
+ Self {
+ recording_state: RecordingState::Stopped,
+ last_recording_start: gst::CLOCK_TIME_NONE,
+ last_recording_stop: gst::CLOCK_TIME_NONE,
+ recording_duration: 0.into(),
+ running_time_offset: gst::CLOCK_TIME_NONE,
+ }
+ }
+}
+
+#[derive(Debug, PartialEq, Eq)]
+enum HandleResult<T> {
+ Pass(T),
+ Drop,
+ Eos,
+ Flushing,
+}
+
+trait HandleData: Sized {
+ fn get_pts(&self) -> gst::ClockTime;
+ fn get_dts(&self) -> gst::ClockTime;
+ fn get_dts_or_pts(&self) -> gst::ClockTime {
+ let dts = self.get_dts();
+ if dts.is_some() {
+ dts
+ } else {
+ self.get_pts()
+ }
+ }
+ fn get_duration(&self, state: &StreamState) -> gst::ClockTime;
+ fn is_keyframe(&self) -> bool;
+ fn can_clip(&self, state: &StreamState) -> bool;
+ fn clip(
+ self,
+ state: &StreamState,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Self>;
+}
+
+impl HandleData for (gst::ClockTime, gst::ClockTime) {
+ fn get_pts(&self) -> gst::ClockTime {
+ self.0
+ }
+
+ fn get_dts(&self) -> gst::ClockTime {
+ self.0
+ }
+
+ fn get_duration(&self, _state: &StreamState) -> gst::ClockTime {
+ self.1
+ }
+
+ fn is_keyframe(&self) -> bool {
+ true
+ }
+
+ fn can_clip(&self, _state: &StreamState) -> bool {
+ true
+ }
+
+ fn clip(
+ self,
+ _state: &StreamState,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Self> {
+ let stop = if self.1.is_some() {
+ self.0 + self.1
+ } else {
+ self.0
+ };
+
+ segment
+ .clip(self.0, stop)
+ .map(|(start, stop)| (start, stop - start))
+ }
+}
+
+impl HandleData for gst::Buffer {
+ fn get_pts(&self) -> gst::ClockTime {
+ gst::BufferRef::get_pts(self)
+ }
+
+ fn get_dts(&self) -> gst::ClockTime {
+ gst::BufferRef::get_dts(self)
+ }
+
+ fn get_duration(&self, state: &StreamState) -> gst::ClockTime {
+ let duration = gst::BufferRef::get_duration(self);
+
+ if duration.is_some() {
+ duration
+ } else if let Some(ref video_info) = state.video_info {
+ if video_info.fps() != 0.into() {
+ gst::SECOND
+ .mul_div_floor(
+ *video_info.fps().denom() as u64,
+ *video_info.fps().numer() as u64,
+ )
+ .unwrap_or(gst::CLOCK_TIME_NONE)
+ } else {
+ gst::CLOCK_TIME_NONE
+ }
+ } else if let Some(ref audio_info) = state.audio_info {
+ if audio_info.bpf() == 0 || audio_info.rate() == 0 {
+ return gst::CLOCK_TIME_NONE;
+ }
+
+ let size = self.get_size() as u64;
+ let num_samples = size / audio_info.bpf() as u64;
+ gst::SECOND
+ .mul_div_floor(num_samples, audio_info.rate() as u64)
+ .unwrap_or(gst::CLOCK_TIME_NONE)
+ } else {
+ gst::CLOCK_TIME_NONE
+ }
+ }
+
+ fn is_keyframe(&self) -> bool {
+ !gst::BufferRef::get_flags(self).contains(gst::BufferFlags::DELTA_UNIT)
+ }
+
+ fn can_clip(&self, state: &StreamState) -> bool {
+ // Only do actual clipping for raw audio/video
+ if let Some(ref audio_info) = state.audio_info {
+ if audio_info.format() == gst_audio::AudioFormat::Unknown
+ || audio_info.format() == gst_audio::AudioFormat::Encoded
+ || audio_info.rate() == 0
+ || audio_info.bpf() == 0
+ {
+ return false;
+ }
+ } else if let Some(ref video_info) = state.video_info {
+ if video_info.format() == gst_video::VideoFormat::Unknown
+ || video_info.format() == gst_video::VideoFormat::Encoded
+ || self.get_dts_or_pts() != self.get_pts()
+ {
+ return false;
+ }
+ } else {
+ return false;
+ }
+
+ true
+ }
+
+ fn clip(
+ mut self,
+ state: &StreamState,
+ segment: &gst::FormattedSegment<gst::ClockTime>,
+ ) -> Option<Self> {
+ // Only do actual clipping for raw audio/video
+ if !self.can_clip(state) {
+ return Some(self);
+ }
+
+ let pts = HandleData::get_pts(&self);
+ let duration = HandleData::get_duration(&self, state);
+ let stop = if duration.is_some() {
+ pts + duration
+ } else {
+ pts
+ };
+
+ if let Some(ref audio_info) = state.audio_info {
+ gst_audio::audio_buffer_clip(
+ self,
+ segment.upcast_ref(),
+ audio_info.rate(),
+ audio_info.bpf(),
+ )
+ } else if state.video_info.is_some() {
+ segment.clip(pts, stop).map(move |(start, stop)| {
+ {
+ let buffer = self.make_mut();
+ buffer.set_pts(start);
+ buffer.set_dts(start);
+ buffer.set_duration(stop - start);
+ }
+
+ self
+ })
+ } else {
+ unreachable!();
+ }
+ }
+}
+
+struct ToggleRecord {
+ settings: Mutex<Settings>,
+ state: Mutex<State>,
+ main_stream: Stream,
+ // Always must have main_stream.state locked!
+ // If multiple stream states have to be locked, the
+ // main_stream always comes first
+ main_stream_cond: Condvar,
+ other_streams: Mutex<(Vec<Stream>, u32)>,
+ pads: Mutex<HashMap<gst::Pad, Stream>>,
+}
+
+lazy_static! {
+ static ref CAT: gst::DebugCategory = gst::DebugCategory::new(
+ "togglerecord",
+ gst::DebugColorFlags::empty(),
+ Some("Toggle Record Element"),
+ );
+}
+
+impl ToggleRecord {
+ fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) {
+ sinkpad.set_chain_function(|pad, parent, buffer| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || Err(gst::FlowError::Error),
+ |togglerecord, element| togglerecord.sink_chain(pad, element, buffer),
+ )
+ });
+ sinkpad.set_event_function(|pad, parent, event| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.sink_event(pad, element, event),
+ )
+ });
+ sinkpad.set_query_function(|pad, parent, query| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.sink_query(pad, element, query),
+ )
+ });
+ sinkpad.set_iterate_internal_links_function(|pad, parent| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |togglerecord, element| togglerecord.iterate_internal_links(pad, element),
+ )
+ });
+
+ srcpad.set_event_function(|pad, parent, event| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.src_event(pad, element, event),
+ )
+ });
+ srcpad.set_query_function(|pad, parent, query| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || false,
+ |togglerecord, element| togglerecord.src_query(pad, element, query),
+ )
+ });
+ srcpad.set_iterate_internal_links_function(|pad, parent| {
+ ToggleRecord::catch_panic_pad_function(
+ parent,
+ || gst::Iterator::from_vec(vec![]),
+ |togglerecord, element| togglerecord.iterate_internal_links(pad, element),
+ )
+ });
+ }
+
+ fn handle_main_stream<T: HandleData>(
+ &self,
+ element: &gst::Element,
+ pad: &gst::Pad,
+ stream: &Stream,
+ data: T,
+ ) -> Result<HandleResult<T>, gst::FlowError> {
+ let mut state = stream.state.lock();
+
+ let mut dts_or_pts = data.get_dts_or_pts();
+ let duration = data.get_duration(&state);
+
+ if !dts_or_pts.is_some() {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Buffer without DTS or PTS"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ let mut dts_or_pts_end = if duration.is_some() {
+ dts_or_pts + duration
+ } else {
+ dts_or_pts
+ };
+
+ let data = match data.clip(&state, &state.in_segment) {
+ None => {
+ gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
+ return Ok(HandleResult::Drop);
+ }
+ Some(data) => data,
+ };
+
+ // This will only do anything for non-raw data
+ dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts);
+ dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end);
+ if state.in_segment.get_stop().is_some() {
+ dts_or_pts = cmp::min(state.in_segment.get_stop(), dts_or_pts);
+ dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end);
+ }
+
+ let current_running_time = state.in_segment.to_running_time(dts_or_pts);
+ let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end);
+ state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
+
+ // Wake up everybody, we advanced a bit
+ // Important: They will only be able to advance once we're done with this
+ // function or waiting for them to catch up below, otherwise they might
+ // get the wrong state
+ self.main_stream_cond.notify_all();
+
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Main stream current running time {}-{} (position: {}-{})",
+ current_running_time,
+ current_running_time_end,
+ dts_or_pts,
+ dts_or_pts_end
+ );
+
+ let settings = *self.settings.lock();
+
+ // First check if we have to update our recording state
+ let mut rec_state = self.state.lock();
+ let settings_changed = match rec_state.recording_state {
+ RecordingState::Recording if !settings.record => {
+ gst_debug!(CAT, obj: pad, "Stopping recording");
+ rec_state.recording_state = RecordingState::Stopping;
+ true
+ }
+ RecordingState::Stopped if settings.record => {
+ gst_debug!(CAT, obj: pad, "Starting recording");
+ rec_state.recording_state = RecordingState::Starting;
+ true
+ }
+ _ => false,
+ };
+
+ match rec_state.recording_state {
+ RecordingState::Recording => {
+ // Remember where we stopped last, in case of EOS
+ rec_state.last_recording_stop = current_running_time_end;
+ gst_log!(CAT, obj: pad, "Passing buffer (recording)");
+ Ok(HandleResult::Pass(data))
+ }
+ RecordingState::Stopping => {
+ if !data.is_keyframe() {
+ // Remember where we stopped last, in case of EOS
+ rec_state.last_recording_stop = current_running_time_end;
+ gst_log!(CAT, obj: pad, "Passing non-keyframe buffer (stopping)");
+
+ drop(rec_state);
+ drop(state);
+ if settings_changed {
+ gst_debug!(CAT, obj: pad, "Requesting a new keyframe");
+ stream
+ .sinkpad
+ .push_event(gst_video::new_upstream_force_key_unit_event().build());
+ }
+
+ return Ok(HandleResult::Pass(data));
+ }
+
+ // Remember the time when we stopped: now, i.e. right before the current buffer!
+ rec_state.last_recording_stop = current_running_time;
+ gst_debug!(CAT, obj: pad, "Stopping at {}", current_running_time);
+
+ // Then unlock and wait for all other streams to reach it or go EOS instead.
+ drop(rec_state);
+
+ while !self.other_streams.lock().0.iter().all(|s| {
+ let s = s.state.lock();
+ s.eos
+ || (s.current_running_time.is_some()
+ && s.current_running_time >= current_running_time_end)
+ }) {
+ gst_log!(CAT, obj: pad, "Waiting for other streams to stop");
+ self.main_stream_cond.wait(&mut state);
+ }
+
+ if state.flushing {
+ gst_debug!(CAT, obj: pad, "Flushing");
+ return Ok(HandleResult::Flushing);
+ }
+
+ let mut rec_state = self.state.lock();
+ rec_state.recording_state = RecordingState::Stopped;
+ let advance_by = rec_state.last_recording_stop - rec_state.last_recording_start;
+ rec_state.recording_duration += advance_by;
+ rec_state.last_recording_start = gst::CLOCK_TIME_NONE;
+ rec_state.last_recording_stop = gst::CLOCK_TIME_NONE;
+
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Stopped at {}, recording duration {}",
+ current_running_time,
+ rec_state.recording_duration
+ );
+
+ // Then become Stopped and drop this buffer. We always stop right before
+ // a keyframe
+ gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
+
+ drop(rec_state);
+ drop(state);
+ element.notify("recording");
+
+ Ok(HandleResult::Drop)
+ }
+ RecordingState::Stopped => {
+ gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
+ Ok(HandleResult::Drop)
+ }
+ RecordingState::Starting => {
+ // If this is no keyframe, we can directly go out again here and drop the frame
+ if !data.is_keyframe() {
+ gst_log!(CAT, obj: pad, "Dropping non-keyframe buffer (starting)");
+
+ drop(rec_state);
+ drop(state);
+ if settings_changed {
+ gst_debug!(CAT, obj: pad, "Requesting a new keyframe");
+ stream
+ .sinkpad
+ .push_event(gst_video::new_upstream_force_key_unit_event().build());
+ }
+
+ return Ok(HandleResult::Drop);
+ }
+
+ // Remember the time when we started: now!
+ rec_state.last_recording_start = current_running_time;
+ rec_state.running_time_offset = current_running_time - rec_state.recording_duration;
+ gst_debug!(CAT, obj: pad, "Starting at {}", current_running_time);
+
+ state.segment_pending = true;
+ for other_stream in &self.other_streams.lock().0 {
+ other_stream.state.lock().segment_pending = true;
+ }
+
+ // Then unlock and wait for all other streams to reach
+ // it or go EOS instead
+ drop(rec_state);
+
+ while !self.other_streams.lock().0.iter().all(|s| {
+ let s = s.state.lock();
+ s.eos
+ || (s.current_running_time.is_some()
+ && s.current_running_time >= current_running_time_end)
+ }) {
+ gst_log!(CAT, obj: pad, "Waiting for other streams to start");
+ self.main_stream_cond.wait(&mut state);
+ }
+
+ if state.flushing {
+ gst_debug!(CAT, obj: pad, "Flushing");
+ return Ok(HandleResult::Flushing);
+ }
+
+ let mut rec_state = self.state.lock();
+ rec_state.recording_state = RecordingState::Recording;
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Started at {}, recording duration {}",
+ current_running_time,
+ rec_state.recording_duration
+ );
+
+ gst_log!(CAT, obj: pad, "Passing buffer (recording)");
+
+ drop(rec_state);
+ drop(state);
+ element.notify("recording");
+
+ Ok(HandleResult::Pass(data))
+ }
+ }
+ }
+
+ fn handle_secondary_stream<T: HandleData>(
+ &self,
+ element: &gst::Element,
+ pad: &gst::Pad,
+ stream: &Stream,
+ data: T,
+ ) -> Result<HandleResult<T>, gst::FlowError> {
+ // Calculate end pts & current running time and make sure we stay in the segment
+ let mut state = stream.state.lock();
+
+ let mut pts = data.get_pts();
+ let duration = data.get_duration(&state);
+
+ if pts.is_none() {
+ gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]);
+ return Err(gst::FlowError::Error);
+ }
+
+ let dts = data.get_dts();
+ if dts.is_some() && pts.is_some() && dts != pts {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ ["DTS != PTS not supported for secondary streams"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ if !data.is_keyframe() {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ ["Delta-units not supported for secondary streams"]
+ );
+ return Err(gst::FlowError::Error);
+ }
+
+ let mut pts_end = if duration.is_some() {
+ pts + duration
+ } else {
+ pts
+ };
+
+ let data = match data.clip(&state, &state.in_segment) {
+ None => {
+ gst_log!(CAT, obj: pad, "Dropping raw data outside segment");
+ return Ok(HandleResult::Drop);
+ }
+ Some(data) => data,
+ };
+
+ // This will only do anything for non-raw data
+ pts = cmp::max(state.in_segment.get_start(), pts);
+ pts_end = cmp::max(state.in_segment.get_start(), pts_end);
+ if state.in_segment.get_stop().is_some() {
+ pts = cmp::min(state.in_segment.get_stop(), pts);
+ pts_end = cmp::min(state.in_segment.get_stop(), pts_end);
+ }
+
+ let current_running_time = state.in_segment.to_running_time(pts);
+ let current_running_time_end = state.in_segment.to_running_time(pts_end);
+ state.current_running_time = cmp::max(current_running_time_end, state.current_running_time);
+
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Secondary stream current running time {}-{} (position: {}-{}",
+ current_running_time,
+ current_running_time_end,
+ pts,
+ pts_end
+ );
+
+ drop(state);
+
+ let mut main_state = self.main_stream.state.lock();
+
+ // Wake up, in case the main stream is waiting for us to progress up to here. We progressed
+ // above but all notifying must happen while the main_stream state is locked as per above.
+ self.main_stream_cond.notify_all();
+
+ while (main_state.current_running_time == gst::CLOCK_TIME_NONE
+ || main_state.current_running_time < current_running_time_end)
+ && !main_state.eos
+ && !stream.state.lock().flushing
+ {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Waiting for reaching {} / EOS / flushing, main stream at {}",
+ current_running_time,
+ main_state.current_running_time
+ );
+
+ self.main_stream_cond.wait(&mut main_state);
+ }
+
+ state = stream.state.lock();
+
+ if state.flushing {
+ gst_debug!(CAT, obj: pad, "Flushing");
+ return Ok(HandleResult::Flushing);
+ }
+
+ let rec_state = self.state.lock();
+
+ // If the main stream is EOS, we are also EOS unless we are
+ // before the final last recording stop running time
+ if main_state.eos {
+ // If we have no start or stop position (we never recorded) then we're EOS too now
+ if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() {
+ gst_debug!(CAT, obj: pad, "Main stream EOS and recording never started",);
+ return Ok(HandleResult::Eos);
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_start
+ && current_running_time_end > rec_state.last_recording_start
+ {
+ // Otherwise if we're before the recording start but the end of the buffer is after
+ // the start and we can clip, clip the buffer and pass it onwards.
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_start,
+ current_running_time_end
+ );
+
+ let mut clip_start = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_start);
+ if clip_start.is_none() {
+ clip_start = state.in_segment.get_start();
+ }
+ let mut clip_stop = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_stop);
+ if clip_stop.is_none() {
+ clip_stop = state.in_segment.get_stop();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_start(clip_start);
+ segment.set_stop(clip_stop);
+
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+
+ if let Some(data) = data.clip(&*state, &segment) {
+ return Ok(HandleResult::Pass(data));
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ return Ok(HandleResult::Drop);
+ }
+ } else if current_running_time < rec_state.last_recording_start {
+ // Otherwise if the buffer starts before the recording start, drop it. This
+ // means that we either can't clip, or that the end is also before the
+ // recording start
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (before recording start, {} < {})",
+ current_running_time,
+ rec_state.last_recording_start
+ );
+ return Ok(HandleResult::Drop);
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_stop
+ && current_running_time_end > rec_state.last_recording_stop
+ {
+ // Similarly if the end is after the recording stop but the start is before and we
+ // can clip, clip the buffer and pass it through.
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_stop,
+ current_running_time_end
+ );
+
+ let mut clip_start = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_start);
+ if clip_start.is_none() {
+ clip_start = state.in_segment.get_start();
+ }
+ let mut clip_stop = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_stop);
+ if clip_stop.is_none() {
+ clip_stop = state.in_segment.get_stop();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_start(clip_start);
+ segment.set_stop(clip_stop);
+
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+
+ if let Some(data) = data.clip(&*state, &segment) {
+ return Ok(HandleResult::Pass(data));
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ return Ok(HandleResult::Eos);
+ }
+ } else if current_running_time_end > rec_state.last_recording_stop {
+ // Otherwise if the end of the buffer is after the recording stop, we're EOS
+ // now. This means that we either couldn't clip or that the start is also after
+ // the recording stop
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're EOS too (after recording end, {} > {})",
+ current_running_time_end,
+ rec_state.last_recording_stop
+ );
+ return Ok(HandleResult::Eos);
+ } else {
+ // In all other cases the buffer is fully between recording start and end and
+ // can be passed through as is
+ assert!(current_running_time >= rec_state.last_recording_start);
+ assert!(current_running_time_end <= rec_state.last_recording_stop);
+
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})",
+ rec_state.last_recording_start,
+ current_running_time,
+ rec_state.last_recording_stop
+ );
+ return Ok(HandleResult::Pass(data));
+ }
+ }
+
+ // The end of our buffer is before the end of the previous buffer of the main stream
+ assert!(main_state.current_running_time >= current_running_time_end);
+
+ match rec_state.recording_state {
+ RecordingState::Recording => {
+ // We're properly started, must have a start position and
+ // be actually after that start position
+ assert!(rec_state.last_recording_start.is_some());
+ assert!(current_running_time >= rec_state.last_recording_start);
+ gst_log!(CAT, obj: pad, "Passing buffer (recording)");
+ Ok(HandleResult::Pass(data))
+ }
+ RecordingState::Stopping => {
+ // If we have no start position yet, the main stream is waiting for a key-frame
+ if rec_state.last_recording_stop.is_none() {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (stopping: waiting for keyframe)",
+ );
+ Ok(HandleResult::Pass(data))
+ } else if current_running_time_end <= rec_state.last_recording_stop {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (stopping: {} <= {})",
+ current_running_time_end,
+ rec_state.last_recording_stop
+ );
+ Ok(HandleResult::Pass(data))
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_stop
+ && current_running_time_end > rec_state.last_recording_stop
+ {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (stopping: {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_stop,
+ current_running_time_end,
+ );
+
+ let mut clip_stop = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_stop);
+ if clip_stop.is_none() {
+ clip_stop = state.in_segment.get_stop();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_stop(clip_stop);
+
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+
+ if let Some(data) = data.clip(&*state, &segment) {
+ Ok(HandleResult::Pass(data))
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ Ok(HandleResult::Drop)
+ }
+ } else {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Dropping buffer (stopping: {} > {})",
+ current_running_time_end,
+ rec_state.last_recording_stop
+ );
+ Ok(HandleResult::Drop)
+ }
+ }
+ RecordingState::Stopped => {
+ // We're properly stopped
+ gst_log!(CAT, obj: pad, "Dropping buffer (stopped)");
+ Ok(HandleResult::Drop)
+ }
+ RecordingState::Starting => {
+ // If we have no start position yet, the main stream is waiting for a key-frame
+ if rec_state.last_recording_start.is_none() {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Dropping buffer (starting: waiting for keyframe)",
+ );
+ Ok(HandleResult::Drop)
+ } else if current_running_time >= rec_state.last_recording_start {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (starting: {} >= {})",
+ current_running_time,
+ rec_state.last_recording_start
+ );
+ Ok(HandleResult::Pass(data))
+ } else if data.can_clip(&*state)
+ && current_running_time < rec_state.last_recording_start
+ && current_running_time_end > rec_state.last_recording_start
+ {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Passing buffer (starting: {} < {} < {})",
+ current_running_time,
+ rec_state.last_recording_start,
+ current_running_time_end,
+ );
+
+ let mut clip_start = state
+ .in_segment
+ .position_from_running_time(rec_state.last_recording_start);
+ if clip_start.is_none() {
+ clip_start = state.in_segment.get_start();
+ }
+ let mut segment = state.in_segment.clone();
+ segment.set_start(clip_start);
+
+ gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,);
+
+ if let Some(data) = data.clip(&*state, &segment) {
+ Ok(HandleResult::Pass(data))
+ } else {
+ gst_warning!(CAT, obj: pad, "Complete buffer clipped!");
+ Ok(HandleResult::Drop)
+ }
+ } else {
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Dropping buffer (starting: {} < {})",
+ current_running_time,
+ rec_state.last_recording_start
+ );
+ Ok(HandleResult::Drop)
+ }
+ }
+ }
+ }
+
+ fn sink_chain(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ buffer: gst::Buffer,
+ ) -> Result<gst::FlowSuccess, gst::FlowError> {
+ let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ gst::FlowError::Error
+ })?;
+
+ gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
+
+ {
+ let state = stream.state.lock();
+ if state.eos {
+ return Err(gst::FlowError::Eos);
+ }
+ }
+
+ let handle_result = if stream != self.main_stream {
+ self.handle_secondary_stream(element, pad, &stream, buffer)
+ } else {
+ self.handle_main_stream(element, pad, &stream, buffer)
+ }?;
+
+ let buffer = match handle_result {
+ HandleResult::Drop => {
+ return Ok(gst::FlowSuccess::Ok);
+ }
+ HandleResult::Flushing => {
+ return Err(gst::FlowError::Flushing);
+ }
+ HandleResult::Eos => {
+ stream.srcpad.push_event(
+ gst::Event::new_eos()
+ .seqnum(stream.state.lock().segment_seqnum)
+ .build(),
+ );
+ return Err(gst::FlowError::Eos);
+ }
+ HandleResult::Pass(buffer) => {
+ // Pass through and actually push the buffer
+ buffer
+ }
+ };
+
+ let out_running_time = {
+ let mut state = stream.state.lock();
+ let mut events = Vec::with_capacity(state.pending_events.len() + 1);
+
+ if state.segment_pending {
+ let rec_state = self.state.lock();
+
+ // Adjust so that last_recording_start has running time of
+ // recording_duration
+
+ state.out_segment = state.in_segment.clone();
+ let offset = rec_state.running_time_offset.unwrap_or(0);
+ state
+ .out_segment
+ .offset_running_time(-(offset as i64))
+ .expect("Adjusting record duration");
+ events.push(
+ gst::Event::new_segment(&state.out_segment)
+ .seqnum(state.segment_seqnum)
+ .build(),
+ );
+ state.segment_pending = false;
+ gst_debug!(CAT, obj: pad, "Pending Segment {:?}", &state.out_segment);
+ }
+
+ if !state.pending_events.is_empty() {
+ gst_debug!(CAT, obj: pad, "Pushing pending events");
+ }
+
+ events.append(&mut state.pending_events);
+
+ let out_running_time = state.out_segment.to_running_time(buffer.get_pts());
+
+ // Unlock before pushing
+ drop(state);
+
+ for e in events.drain(..) {
+ stream.srcpad.push_event(e);
+ }
+
+ out_running_time
+ };
+
+ gst_log!(
+ CAT,
+ obj: pad,
+ "Pushing buffer with running time {}: {:?}",
+ out_running_time,
+ buffer
+ );
+ stream.srcpad.push(buffer)
+ }
+
+ fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
+ use gst::EventView;
+
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+
+ gst_log!(CAT, obj: pad, "Handling event {:?}", event);
+
+ let mut forward = true;
+ let mut send_pending = false;
+
+ match event.view() {
+ EventView::FlushStart(..) => {
+ let _main_state = if stream != self.main_stream {
+ Some(self.main_stream.state.lock())
+ } else {
+ None
+ };
+ let mut state = stream.state.lock();
+
+ state.flushing = true;
+ self.main_stream_cond.notify_all();
+ }
+ EventView::FlushStop(..) => {
+ let mut state = stream.state.lock();
+
+ state.eos = false;
+ state.flushing = false;
+ state.segment_pending = false;
+ state.current_running_time = gst::CLOCK_TIME_NONE;
+ }
+ EventView::Caps(c) => {
+ let mut state = stream.state.lock();
+ let caps = c.get_caps();
+ let s = caps.get_structure(0).unwrap();
+ if s.get_name().starts_with("audio/") {
+ state.audio_info = gst_audio::AudioInfo::from_caps(caps).ok();
+ gst_log!(CAT, obj: pad, "Got audio caps {:?}", state.audio_info);
+ state.video_info = None;
+ } else if s.get_name().starts_with("video/") {
+ state.audio_info = None;
+ state.video_info = gst_video::VideoInfo::from_caps(caps).ok();
+ gst_log!(CAT, obj: pad, "Got video caps {:?}", state.video_info);
+ } else {
+ state.audio_info = None;
+ state.video_info = None;
+ }
+ }
+ EventView::Segment(e) => {
+ let mut state = stream.state.lock();
+
+ let segment = match e.get_segment().clone().downcast::<gst::ClockTime>() {
+ Err(segment) => {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ [
+ "Only Time segments supported, got {:?}",
+ segment.get_format(),
+ ]
+ );
+ return false;
+ }
+ Ok(segment) => segment,
+ };
+
+ if (segment.get_rate() - 1.0).abs() > f64::EPSILON {
+ gst_element_error!(
+ element,
+ gst::StreamError::Format,
+ [
+ "Only rate==1.0 segments supported, got {:?}",
+ segment.get_rate(),
+ ]
+ );
+ return false;
+ }
+
+ state.in_segment = segment;
+ state.segment_seqnum = event.get_seqnum();
+ state.segment_pending = true;
+ state.current_running_time = gst::CLOCK_TIME_NONE;
+
+ gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment);
+
+ forward = false;
+ }
+ EventView::Gap(e) => {
+ gst_debug!(CAT, obj: pad, "Handling Gap event {:?}", event);
+ let (pts, duration) = e.get();
+ let handle_result = if stream == self.main_stream {
+ self.handle_main_stream(element, pad, &stream, (pts, duration))
+ } else {
+ self.handle_secondary_stream(element, pad, &stream, (pts, duration))
+ };
+
+ forward = match handle_result {
+ Ok(HandleResult::Pass((new_pts, new_duration))) if new_pts.is_some() => {
+ if new_pts != pts || new_duration != duration {
+ event = gst::Event::new_gap(new_pts, new_duration).build();
+ }
+ true
+ }
+ Ok(_) => false,
+ Err(_) => false,
+ };
+ }
+ EventView::Eos(..) => {
+ let _main_state = if stream != self.main_stream {
+ Some(self.main_stream.state.lock())
+ } else {
+ None
+ };
+ let mut state = stream.state.lock();
+
+ state.eos = true;
+ self.main_stream_cond.notify_all();
+ gst_debug!(
+ CAT,
+ obj: pad,
+ "Stream is EOS now, sending any pending events"
+ );
+
+ send_pending = true;
+ }
+ _ => (),
+ };
+
+ // If a serialized event and coming after Segment and a new Segment is pending,
+ // queue up and send at a later time (buffer/gap) after we sent the Segment
+ let type_ = event.get_type();
+ if forward
+ && type_ != gst::EventType::Eos
+ && type_.is_serialized()
+ && type_.partial_cmp(&gst::EventType::Segment) == Some(cmp::Ordering::Greater)
+ {
+ let mut state = stream.state.lock();
+ if state.segment_pending {
+ gst_log!(CAT, obj: pad, "Storing event for later pushing");
+ state.pending_events.push(event);
+ return true;
+ }
+ }
+
+ if send_pending {
+ let mut state = stream.state.lock();
+ let mut events = Vec::with_capacity(state.pending_events.len() + 1);
+
+ // Got not a single buffer on this stream before EOS, forward
+ // the input segment
+ if state.segment_pending {
+ events.push(
+ gst::Event::new_segment(&state.in_segment)
+ .seqnum(state.segment_seqnum)
+ .build(),
+ );
+ }
+ events.append(&mut state.pending_events);
+ drop(state);
+
+ for e in events.drain(..) {
+ stream.srcpad.push_event(e);
+ }
+ }
+
+ if forward {
+ gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
+ stream.srcpad.push_event(event)
+ } else {
+ gst_log!(CAT, obj: pad, "Dropping event {:?}", event);
+ true
+ }
+ }
+
+ fn sink_query(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ query: &mut gst::QueryRef,
+ ) -> bool {
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+
+ gst_log!(CAT, obj: pad, "Handling query {:?}", query);
+
+ stream.srcpad.peer_query(query)
+ }
+
+ fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool {
+ use gst::EventView;
+
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+
+ gst_log!(CAT, obj: pad, "Handling event {:?}", event);
+
+ let forward = match event.view() {
+ EventView::Seek(..) => false,
+ _ => true,
+ };
+
+ let rec_state = self.state.lock();
+ let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64;
+ let offset = event.get_running_time_offset();
+ event
+ .make_mut()
+ .set_running_time_offset(offset + running_time_offset);
+ drop(rec_state);
+
+ if forward {
+ gst_log!(CAT, obj: pad, "Forwarding event {:?}", event);
+ stream.sinkpad.push_event(event)
+ } else {
+ gst_log!(CAT, obj: pad, "Dropping event {:?}", event);
+ false
+ }
+ }
+
+ fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool {
+ use gst::QueryView;
+
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return false;
+ }
+ Some(stream) => stream.clone(),
+ };
+
+ gst_log!(CAT, obj: pad, "Handling query {:?}", query);
+ match query.view_mut() {
+ QueryView::Scheduling(ref mut q) => {
+ let mut new_query = gst::Query::new_scheduling();
+ let res = stream.sinkpad.peer_query(&mut new_query);
+ if !res {
+ return res;
+ }
+
+ gst_log!(CAT, obj: pad, "Downstream returned {:?}", new_query);
+
+ let (flags, min, max, align) = new_query.get_result();
+ q.set(flags, min, max, align);
+ q.add_scheduling_modes(
+ &new_query
+ .get_scheduling_modes()
+ .iter()
+ .cloned()
+ .filter(|m| m != &gst::PadMode::Pull)
+ .collect::<Vec<_>>(),
+ );
+ gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
+ true
+ }
+ QueryView::Seeking(ref mut q) => {
+ // Seeking is not possible here
+ let format = q.get_format();
+ q.set(
+ false,
+ gst::GenericFormattedValue::new(format, -1),
+ gst::GenericFormattedValue::new(format, -1),
+ );
+
+ gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query());
+ true
+ }
+ // Position and duration is always the current recording position
+ QueryView::Position(ref mut q) => {
+ if q.get_format() == gst::Format::Time {
+ let state = stream.state.lock();
+ let rec_state = self.state.lock();
+ let mut recording_duration = rec_state.recording_duration;
+ if rec_state.recording_state == RecordingState::Recording
+ || rec_state.recording_state == RecordingState::Stopping
+ {
+ recording_duration +=
+ state.current_running_time - rec_state.last_recording_start;
+ }
+ q.set(recording_duration);
+ true
+ } else {
+ false
+ }
+ }
+ QueryView::Duration(ref mut q) => {
+ if q.get_format() == gst::Format::Time {
+ let state = stream.state.lock();
+ let rec_state = self.state.lock();
+ let mut recording_duration = rec_state.recording_duration;
+ if rec_state.recording_state == RecordingState::Recording
+ || rec_state.recording_state == RecordingState::Stopping
+ {
+ recording_duration +=
+ state.current_running_time - rec_state.last_recording_start;
+ }
+ q.set(recording_duration);
+ true
+ } else {
+ false
+ }
+ }
+ _ => {
+ gst_log!(CAT, obj: pad, "Forwarding query {:?}", query);
+ stream.sinkpad.peer_query(query)
+ }
+ }
+ }
+
+ fn iterate_internal_links(
+ &self,
+ pad: &gst::Pad,
+ element: &gst::Element,
+ ) -> gst::Iterator<gst::Pad> {
+ let stream = match self.pads.lock().get(pad) {
+ None => {
+ gst_element_error!(
+ element,
+ gst::CoreError::Pad,
+ ["Unknown pad {:?}", pad.get_name()]
+ );
+ return gst::Iterator::from_vec(vec![]);
+ }
+ Some(stream) => stream.clone(),
+ };
+
+ if pad == &stream.srcpad {
+ gst::Iterator::from_vec(vec![stream.sinkpad])
+ } else {
+ gst::Iterator::from_vec(vec![stream.srcpad])
+ }
+ }
+}
+
+impl ObjectSubclass for ToggleRecord {
+ const NAME: &'static str = "RsToggleRecord";
+ type ParentType = gst::Element;
+ type Instance = gst::subclass::ElementInstanceStruct<Self>;
+ type Class = subclass::simple::ClassStruct<Self>;
+
+ glib_object_subclass!();
+
+ fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self {
+ let templ = klass.get_pad_template("sink").unwrap();
+ let sinkpad = gst::Pad::new_from_template(&templ, Some("sink"));
+ let templ = klass.get_pad_template("src").unwrap();
+ let srcpad = gst::Pad::new_from_template(&templ, Some("src"));
+
+ ToggleRecord::set_pad_functions(&sinkpad, &srcpad);
+
+ let main_stream = Stream::new(sinkpad, srcpad);
+
+ let mut pads = HashMap::new();
+ pads.insert(main_stream.sinkpad.clone(), main_stream.clone());
+ pads.insert(main_stream.srcpad.clone(), main_stream.clone());
+
+ Self {
+ settings: Mutex::new(Settings::default()),
+ state: Mutex::new(State::default()),
+ main_stream,
+ main_stream_cond: Condvar::new(),
+ other_streams: Mutex::new((Vec::new(), 0)),
+ pads: Mutex::new(pads),
+ }
+ }
+
+ fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) {
+ klass.install_properties(&PROPERTIES);
+
+ klass.set_metadata(
+ "Toggle Record",
+ "Generic",
+ "Valve that ensures multiple streams start/end at the same time",
+ "Sebastian Dröge <sebastian@centricular.com>",
+ );
+
+ let caps = gst::Caps::new_any();
+ let src_pad_template = gst::PadTemplate::new(
+ "src",
+ gst::PadDirection::Src,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Always,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+
+ let src_pad_template = gst::PadTemplate::new(
+ "src_%u",
+ gst::PadDirection::Src,
+ gst::PadPresence::Sometimes,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(src_pad_template);
+
+ let sink_pad_template = gst::PadTemplate::new(
+ "sink_%u",
+ gst::PadDirection::Sink,
+ gst::PadPresence::Request,
+ &caps,
+ )
+ .unwrap();
+ klass.add_pad_template(sink_pad_template);
+ }
+}
+
+impl ObjectImpl for ToggleRecord {
+ glib_object_impl!();
+
+ fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) {
+ let prop = &PROPERTIES[id];
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+
+ match *prop {
+ subclass::Property("record", ..) => {
+ let mut settings = self.settings.lock();
+ let record = value.get_some().expect("type checked upstream");
+ gst_debug!(
+ CAT,
+ obj: element,
+ "Setting record from {:?} to {:?}",
+ settings.record,
+ record
+ );
+ settings.record = record;
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> {
+ let prop = &PROPERTIES[id];
+
+ match *prop {
+ subclass::Property("record", ..) => {
+ let settings = self.settings.lock();
+ Ok(settings.record.to_value())
+ }
+ subclass::Property("recording", ..) => {
+ let rec_state = self.state.lock();
+ Ok((rec_state.recording_state == RecordingState::Recording).to_value())
+ }
+ _ => unimplemented!(),
+ }
+ }
+
+ fn constructed(&self, obj: &glib::Object) {
+ self.parent_constructed(obj);
+
+ let element = obj.downcast_ref::<gst::Element>().unwrap();
+ element.add_pad(&self.main_stream.sinkpad).unwrap();
+ element.add_pad(&self.main_stream.srcpad).unwrap();
+ }
+}
+
+impl ElementImpl for ToggleRecord {
+ fn change_state(
+ &self,
+ element: &gst::Element,
+ transition: gst::StateChange,
+ ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
+ gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
+
+ match transition {
+ gst::StateChange::ReadyToPaused => {
+ for s in self
+ .other_streams
+ .lock()
+ .0
+ .iter()
+ .chain(iter::once(&self.main_stream))
+ {
+ let mut state = s.state.lock();
+ *state = StreamState::default();
+ }
+
+ let mut rec_state = self.state.lock();
+ *rec_state = State::default();
+ }
+ gst::StateChange::PausedToReady => {
+ for s in &self.other_streams.lock().0 {
+ let mut state = s.state.lock();
+ state.flushing = true;
+ }
+
+ let mut state = self.main_stream.state.lock();
+ state.flushing = true;
+ self.main_stream_cond.notify_all();
+ }
+ _ => (),
+ }
+
+ let success = self.parent_change_state(element, transition)?;
+
+ if transition == gst::StateChange::PausedToReady {
+ for s in self
+ .other_streams
+ .lock()
+ .0
+ .iter()
+ .chain(iter::once(&self.main_stream))
+ {
+ let mut state = s.state.lock();
+
+ state.pending_events.clear();
+ }
+
+ let mut rec_state = self.state.lock();
+ *rec_state = State::default();
+ drop(rec_state);
+ element.notify("recording");
+ }
+
+ Ok(success)
+ }
+
+ fn request_new_pad(
+ &self,
+ element: &gst::Element,
+ _templ: &gst::PadTemplate,
+ _name: Option<String>,
+ _caps: Option<&gst::Caps>,
+ ) -> Option<gst::Pad> {
+ let mut other_streams_guard = self.other_streams.lock();
+ let (ref mut other_streams, ref mut pad_count) = *other_streams_guard;
+ let mut pads = self.pads.lock();
+
+ let id = *pad_count;
+ *pad_count += 1;
+
+ let templ = element.get_pad_template("sink_%u").unwrap();
+ let sinkpad = gst::Pad::new_from_template(&templ, Some(format!("sink_{}", id).as_str()));
+
+ let templ = element.get_pad_template("src_%u").unwrap();
+ let srcpad = gst::Pad::new_from_template(&templ, Some(format!("src_{}", id).as_str()));
+
+ ToggleRecord::set_pad_functions(&sinkpad, &srcpad);
+
+ sinkpad.set_active(true).unwrap();
+ srcpad.set_active(true).unwrap();
+
+ let stream = Stream::new(sinkpad.clone(), srcpad.clone());
+
+ pads.insert(stream.sinkpad.clone(), stream.clone());
+ pads.insert(stream.srcpad.clone(), stream.clone());
+
+ other_streams.push(stream);
+
+ drop(pads);
+ drop(other_streams_guard);
+
+ element.add_pad(&sinkpad).unwrap();
+ element.add_pad(&srcpad).unwrap();
+
+ Some(sinkpad)
+ }
+
+ fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) {
+ let mut other_streams_guard = self.other_streams.lock();
+ let (ref mut other_streams, _) = *other_streams_guard;
+ let mut pads = self.pads.lock();
+
+ let stream = match pads.get(pad) {
+ None => return,
+ Some(stream) => stream.clone(),
+ };
+
+ stream.srcpad.set_active(false).unwrap();
+ stream.sinkpad.set_active(false).unwrap();
+
+ pads.remove(&stream.sinkpad).unwrap();
+ pads.remove(&stream.srcpad).unwrap();
+
+ // TODO: Replace with Vec::remove_item() once stable
+ let pos = other_streams.iter().position(|x| *x == stream);
+ pos.map(|pos| other_streams.swap_remove(pos));
+
+ drop(pads);
+ drop(other_streams_guard);
+
+ element.remove_pad(&stream.sinkpad).unwrap();
+ element.remove_pad(&stream.srcpad).unwrap();
+ }
+}
+
+pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
+ gst::Element::register(
+ Some(plugin),
+ "togglerecord",
+ gst::Rank::None,
+ ToggleRecord::get_type(),
+ )
+}
diff --git a/utils/gst-plugin-togglerecord/tests/tests.rs b/utils/gst-plugin-togglerecord/tests/tests.rs
new file mode 100644
index 000000000..c0eaf6745
--- /dev/null
+++ b/utils/gst-plugin-togglerecord/tests/tests.rs
@@ -0,0 +1,1173 @@
+// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+extern crate glib;
+use glib::prelude::*;
+
+extern crate gstreamer as gst;
+use gst::prelude::*;
+
+extern crate either;
+use either::*;
+
+use std::sync::{mpsc, Mutex};
+use std::thread;
+
+extern crate gsttogglerecord;
+
+fn init() {
+ use std::sync::Once;
+ static INIT: Once = Once::new();
+
+ INIT.call_once(|| {
+ gst::init().unwrap();
+ gsttogglerecord::plugin_register_static().expect("gsttogglerecord tests");
+ });
+}
+
+enum SendData {
+ Buffers(usize),
+ BuffersDelta(usize),
+ Gaps(usize),
+ Eos,
+}
+
+#[allow(clippy::type_complexity)]
+fn setup_sender_receiver(
+ pipeline: &gst::Pipeline,
+ togglerecord: &gst::Element,
+ pad: &str,
+ offset: gst::ClockTime,
+) -> (
+ mpsc::Sender<SendData>,
+ mpsc::Receiver<()>,
+ mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
+ thread::JoinHandle<()>,
+) {
+ let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
+ fakesink.set_property("async", &false).unwrap();
+ pipeline.add(&fakesink).unwrap();
+
+ let main_stream = pad == "src";
+
+ let (srcpad, sinkpad) = if main_stream {
+ (
+ togglerecord.get_static_pad("src").unwrap(),
+ togglerecord.get_static_pad("sink").unwrap(),
+ )
+ } else {
+ let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap();
+ let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap();
+ (srcpad, sinkpad)
+ };
+
+ let fakesink_sinkpad = fakesink.get_static_pad("sink").unwrap();
+ srcpad.link(&fakesink_sinkpad).unwrap();
+
+ let (sender_output, receiver_output) = mpsc::channel::<Either<gst::Buffer, gst::Event>>();
+ let sender_output = Mutex::new(sender_output);
+ srcpad.add_probe(
+ gst::PadProbeType::BUFFER | gst::PadProbeType::EVENT_DOWNSTREAM,
+ move |_, ref probe_info| {
+ match probe_info.data {
+ Some(gst::PadProbeData::Buffer(ref buffer)) => {
+ sender_output
+ .lock()
+ .unwrap()
+ .send(Left(buffer.clone()))
+ .unwrap();
+ }
+ Some(gst::PadProbeData::Event(ref event)) => {
+ sender_output
+ .lock()
+ .unwrap()
+ .send(Right(event.clone()))
+ .unwrap();
+ }
+ _ => {
+ unreachable!();
+ }
+ }
+
+ gst::PadProbeReturn::Ok
+ },
+ );
+
+ let (sender_input, receiver_input) = mpsc::channel::<SendData>();
+ let (sender_input_done, receiver_input_done) = mpsc::channel::<()>();
+ let thread = thread::spawn(move || {
+ let mut i = 0;
+ let mut first = true;
+ while let Ok(send_data) = receiver_input.recv() {
+ if first {
+ assert!(sinkpad.send_event(gst::Event::new_stream_start("test").build()));
+ let caps = if main_stream {
+ gst::Caps::builder("video/x-raw")
+ .field("format", &"ARGB")
+ .field("width", &320i32)
+ .field("height", &240i32)
+ .field("framerate", &gst::Fraction::new(50, 1))
+ .build()
+ } else {
+ gst::Caps::builder("audio/x-raw")
+ .field("format", &"U8")
+ .field("layout", &"interleaved")
+ .field("rate", &8000i32)
+ .field("channels", &1i32)
+ .build()
+ };
+ assert!(sinkpad.send_event(gst::Event::new_caps(&caps).build()));
+
+ let segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ assert!(sinkpad.send_event(gst::Event::new_segment(&segment).build()));
+
+ let mut tags = gst::TagList::new();
+ tags.get_mut()
+ .unwrap()
+ .add::<gst::tags::Title>(&"some title", gst::TagMergeMode::Append);
+ assert!(sinkpad.send_event(gst::Event::new_tag(tags).build()));
+
+ first = false;
+ }
+
+ let buffer = if main_stream {
+ gst::Buffer::with_size(320 * 240 * 4).unwrap()
+ } else {
+ gst::Buffer::with_size(160).unwrap()
+ };
+
+ match send_data {
+ SendData::Eos => {
+ break;
+ }
+ SendData::Buffers(n) => {
+ for _ in 0..n {
+ let mut buffer = buffer.clone();
+ {
+ let buffer = buffer.make_mut();
+ buffer.set_pts(offset + i * 20 * gst::MSECOND);
+ buffer.set_duration(20 * gst::MSECOND);
+ }
+ let _ = sinkpad.chain(buffer);
+ i += 1;
+ }
+ }
+ SendData::BuffersDelta(n) => {
+ for _ in 0..n {
+ let mut buffer = gst::Buffer::new();
+ buffer
+ .get_mut()
+ .unwrap()
+ .set_pts(offset + i * 20 * gst::MSECOND);
+ buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND);
+ buffer
+ .get_mut()
+ .unwrap()
+ .set_flags(gst::BufferFlags::DELTA_UNIT);
+ let _ = sinkpad.chain(buffer);
+ i += 1;
+ }
+ }
+ SendData::Gaps(n) => {
+ for _ in 0..n {
+ let event =
+ gst::Event::new_gap(offset + i * 20 * gst::MSECOND, 20 * gst::MSECOND)
+ .build();
+ let _ = sinkpad.send_event(event);
+ i += 1;
+ }
+ }
+ }
+
+ let _ = sender_input_done.send(());
+ }
+
+ let _ = sinkpad.send_event(gst::Event::new_eos().build());
+ let _ = sender_input_done.send(());
+ });
+
+ (sender_input, receiver_input_done, receiver_output, thread)
+}
+
+fn recv_buffers(
+ receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>,
+ segment: &mut gst::FormattedSegment<gst::ClockTime>,
+ wait_buffers: usize,
+) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> {
+ let mut res = Vec::new();
+ let mut n_buffers = 0;
+ while let Ok(val) = receiver_output.recv() {
+ match val {
+ Left(buffer) => {
+ res.push((
+ segment.to_running_time(buffer.get_pts()),
+ buffer.get_pts(),
+ buffer.get_duration(),
+ ));
+ n_buffers += 1;
+ if wait_buffers > 0 && n_buffers == wait_buffers {
+ return res;
+ }
+ }
+ Right(event) => {
+ use gst::EventView;
+
+ match event.view() {
+ EventView::Gap(ref e) => {
+ let (ts, duration) = e.get();
+
+ res.push((segment.to_running_time(ts), ts, duration));
+ n_buffers += 1;
+ if wait_buffers > 0 && n_buffers == wait_buffers {
+ return res;
+ }
+ }
+ EventView::Eos(..) => {
+ return res;
+ }
+ EventView::Segment(ref e) => {
+ *segment = e.get_segment().clone().downcast().unwrap();
+ }
+ _ => (),
+ }
+ }
+ }
+ }
+
+ res
+}
+
+#[test]
+fn test_create() {
+ init();
+ assert!(gst::ElementFactory::make("togglerecord", None).is_ok());
+}
+
+#[test]
+fn test_create_pads() {
+ init();
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+
+ let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap();
+ let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap();
+
+ assert_eq!(sinkpad.get_name(), "sink_0");
+ assert_eq!(srcpad.get_name(), "src_0");
+
+ togglerecord.release_request_pad(&sinkpad);
+ assert!(sinkpad.get_parent().is_none());
+ assert!(srcpad.get_parent().is_none());
+}
+
+#[test]
+fn test_one_stream_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, _, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_one_stream_gaps_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, _, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(5)).unwrap();
+ sender_input.send(SendData::Gaps(5)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_one_stream_close_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, receiver_input_done, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_one_stream_open_close() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, receiver_input_done, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 10);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_one_stream_open_close_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input, receiver_input_done, receiver_output, thread) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done.recv().unwrap();
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input.send(SendData::Buffers(10)).unwrap();
+ drop(sender_input);
+
+ let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers = recv_buffers(&receiver_output, &mut segment, 0);
+ assert_eq!(buffers.len(), 20);
+ for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+
+ thread.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_1.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_open_shift() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND);
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_1.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+
+ // Second to last buffer should be clipped from second stream, last should be dropped
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ if index == 9 {
+ assert_eq!(duration, 15 * gst::MSECOND);
+ } else {
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ }
+ assert_eq!(buffers_2.len(), 10);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_open_shift_main() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND);
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(12)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_1.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // PTS 5 maps to running time 0 now
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+
+ // First and second last buffer should be clipped from second stream,
+ // last buffer should be dropped
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ if index == 0 {
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 15 * gst::MSECOND);
+ } else if index == 10 {
+ assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 5 * gst::MSECOND);
+ } else {
+ assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ }
+ assert_eq!(buffers_2.len(), 11);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_open_close() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_close_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &false).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+
+ // Start recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (10 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_open_close_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another buffer to sender 2, this will block until sender 1 advances
+ // but must not be dropped, although we're not recording (yet)
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 20);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 20);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_open_close_open_gaps() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(3)).unwrap();
+ sender_input_1.send(SendData::Gaps(3)).unwrap();
+ sender_input_1.send(SendData::Buffers(4)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another 4 gaps and 5 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Gaps(4)).unwrap();
+ sender_input_2.send(SendData::Buffers(5)).unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another gap to sender 2, this will block until sender 1 advances
+ // but must not be dropped, although we're not recording (yet)
+ sender_input_2.send(SendData::Gaps(1)).unwrap();
+
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 20);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 20);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_two_stream_close_open_close_delta() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &false).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1 is finished
+ receiver_input_done_1.recv().unwrap();
+
+ // Start recording and push new buffers to sender 1. The first one is a delta frame,
+ // so will be dropped, and as such the next frame of sender 2 will also be dropped
+ // Sender 2 is empty now
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::BuffersDelta(1)).unwrap();
+ sender_input_1.send(SendData::Buffers(9)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another 9 buffers to sender 2, both are the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+
+ // Wait until all 20 buffers of both senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another buffer to sender 2, this will block until sender 1 advances
+ // but must not be dropped, and we're still recording
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+
+ // Stop recording again and send another set of buffers to both senders
+ // The first one is a delta frame, so we only actually stop recording
+ // after recording another frame
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::BuffersDelta(1)).unwrap();
+ sender_input_1.send(SendData::Buffers(9)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 10);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, (11 + index) * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 10);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}
+
+#[test]
+fn test_three_stream_open_close_open() {
+ init();
+
+ let pipeline = gst::Pipeline::new(None);
+ let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap();
+ pipeline.add(&togglerecord).unwrap();
+
+ let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into());
+ let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+ let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) =
+ setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into());
+
+ pipeline.set_state(gst::State::Playing).unwrap();
+
+ togglerecord.set_property("record", &true).unwrap();
+
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(11)).unwrap();
+ sender_input_3.send(SendData::Buffers(10)).unwrap();
+
+ // Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+
+ // Stop recording and push new buffers to sender 1, which will advance
+ // it and release the 11th buffer of sender 2 above
+ togglerecord.set_property("record", &false).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ receiver_input_done_2.recv().unwrap();
+
+ // Send another 9 buffers to sender 2, 1/2 are at the same position now
+ sender_input_2.send(SendData::Buffers(9)).unwrap();
+
+ // Send the remaining 10 buffers to sender 3, all are at the same position now
+ sender_input_3.send(SendData::Buffers(10)).unwrap();
+
+ // Wait until all 20 buffers of all senders are done
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+
+ // Send another buffer to sender 2, this will block until sender 1 advances
+ // but must not be dropped, although we're not recording (yet)
+ sender_input_2.send(SendData::Buffers(1)).unwrap();
+
+ // Start recording again and send another set of buffers to both senders
+ togglerecord.set_property("record", &true).unwrap();
+ sender_input_1.send(SendData::Buffers(10)).unwrap();
+ sender_input_2.send(SendData::Buffers(10)).unwrap();
+ sender_input_3.send(SendData::Buffers(5)).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ // The single buffer above for sender 1 should be handled now
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+
+ sender_input_3.send(SendData::Buffers(5)).unwrap();
+ receiver_input_done_3.recv().unwrap();
+
+ // Send EOS and wait for it to be handled
+ sender_input_1.send(SendData::Eos).unwrap();
+ sender_input_2.send(SendData::Eos).unwrap();
+ sender_input_3.send(SendData::Eos).unwrap();
+ receiver_input_done_1.recv().unwrap();
+ receiver_input_done_2.recv().unwrap();
+ receiver_input_done_3.recv().unwrap();
+
+ let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0);
+ for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_1.len(), 20);
+
+ // Last buffer should be dropped from second stream
+ let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0);
+ for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_2.len(), 20);
+
+ let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new();
+ let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0);
+ for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() {
+ let pts_off = if index >= 10 {
+ 10 * 20 * gst::MSECOND
+ } else {
+ 0.into()
+ };
+
+ let index = index as u64;
+ assert_eq!(running_time, index * 20 * gst::MSECOND);
+ assert_eq!(pts, pts_off + index * 20 * gst::MSECOND);
+ assert_eq!(duration, 20 * gst::MSECOND);
+ }
+ assert_eq!(buffers_3.len(), 20);
+
+ thread_1.join().unwrap();
+ thread_2.join().unwrap();
+ thread_3.join().unwrap();
+
+ pipeline.set_state(gst::State::Null).unwrap();
+}