diff options
Diffstat (limited to 'utils/gst-plugin-togglerecord')
-rw-r--r-- | utils/gst-plugin-togglerecord/Cargo.toml | 34 | ||||
-rw-r--r-- | utils/gst-plugin-togglerecord/LICENSE | 502 | ||||
-rw-r--r-- | utils/gst-plugin-togglerecord/build.rs | 5 | ||||
-rw-r--r-- | utils/gst-plugin-togglerecord/examples/gtk_recording.rs | 358 | ||||
-rw-r--r-- | utils/gst-plugin-togglerecord/src/lib.rs | 48 | ||||
-rw-r--r-- | utils/gst-plugin-togglerecord/src/togglerecord.rs | 1749 | ||||
-rw-r--r-- | utils/gst-plugin-togglerecord/tests/tests.rs | 1173 |
7 files changed, 3869 insertions, 0 deletions
diff --git a/utils/gst-plugin-togglerecord/Cargo.toml b/utils/gst-plugin-togglerecord/Cargo.toml new file mode 100644 index 000000000..ac25dba01 --- /dev/null +++ b/utils/gst-plugin-togglerecord/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "gst-plugin-togglerecord" +version = "0.6.0" +authors = ["Sebastian Dröge <sebastian@centricular.com>"] +license = "LGPL-2.1+" +description = "Toggle Record Plugin" +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +edition = "2018" + +[dependencies] +glib = { git = "https://github.com/gtk-rs/glib" } +gstreamer = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gstreamer-audio = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gstreamer-video = { git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gtk = { git = "https://github.com/gtk-rs/gtk", optional = true } +gio = { git = "https://github.com/gtk-rs/gio", optional = true } +parking_lot = "0.10" +lazy_static = "1.0" + +[dev-dependencies] +either = "1.0" + +[lib] +name = "gsttogglerecord" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[[example]] +name = "gtk-recording" +path = "examples/gtk_recording.rs" +required-features = ["gtk", "gio"] + +[build-dependencies] +gst-plugin-version-helper = { path="../../gst-plugin-version-helper" } diff --git a/utils/gst-plugin-togglerecord/LICENSE b/utils/gst-plugin-togglerecord/LICENSE new file mode 100644 index 000000000..4362b4915 --- /dev/null +++ b/utils/gst-plugin-togglerecord/LICENSE @@ -0,0 +1,502 @@ + GNU LESSER GENERAL PUBLIC LICENSE + Version 2.1, February 1999 + + Copyright (C) 1991, 1999 Free Software Foundation, Inc. + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + +[This is the first released version of the Lesser GPL. It also counts + as the successor of the GNU Library Public License, version 2, hence + the version number 2.1.] + + Preamble + + The licenses for most software are designed to take away your +freedom to share and change it. By contrast, the GNU General Public +Licenses are intended to guarantee your freedom to share and change +free software--to make sure the software is free for all its users. + + This license, the Lesser General Public License, applies to some +specially designated software packages--typically libraries--of the +Free Software Foundation and other authors who decide to use it. You +can use it too, but we suggest you first think carefully about whether +this license or the ordinary General Public License is the better +strategy to use in any particular case, based on the explanations below. + + When we speak of free software, we are referring to freedom of use, +not price. Our General Public Licenses are designed to make sure that +you have the freedom to distribute copies of free software (and charge +for this service if you wish); that you receive source code or can get +it if you want it; that you can change the software and use pieces of +it in new free programs; and that you are informed that you can do +these things. + + To protect your rights, we need to make restrictions that forbid +distributors to deny you these rights or to ask you to surrender these +rights. These restrictions translate to certain responsibilities for +you if you distribute copies of the library or if you modify it. + + For example, if you distribute copies of the library, whether gratis +or for a fee, you must give the recipients all the rights that we gave +you. You must make sure that they, too, receive or can get the source +code. If you link other code with the library, you must provide +complete object files to the recipients, so that they can relink them +with the library after making changes to the library and recompiling +it. And you must show them these terms so they know their rights. + + We protect your rights with a two-step method: (1) we copyright the +library, and (2) we offer you this license, which gives you legal +permission to copy, distribute and/or modify the library. + + To protect each distributor, we want to make it very clear that +there is no warranty for the free library. Also, if the library is +modified by someone else and passed on, the recipients should know +that what they have is not the original version, so that the original +author's reputation will not be affected by problems that might be +introduced by others. + + Finally, software patents pose a constant threat to the existence of +any free program. We wish to make sure that a company cannot +effectively restrict the users of a free program by obtaining a +restrictive license from a patent holder. Therefore, we insist that +any patent license obtained for a version of the library must be +consistent with the full freedom of use specified in this license. + + Most GNU software, including some libraries, is covered by the +ordinary GNU General Public License. This license, the GNU Lesser +General Public License, applies to certain designated libraries, and +is quite different from the ordinary General Public License. We use +this license for certain libraries in order to permit linking those +libraries into non-free programs. + + When a program is linked with a library, whether statically or using +a shared library, the combination of the two is legally speaking a +combined work, a derivative of the original library. The ordinary +General Public License therefore permits such linking only if the +entire combination fits its criteria of freedom. The Lesser General +Public License permits more lax criteria for linking other code with +the library. + + We call this license the "Lesser" General Public License because it +does Less to protect the user's freedom than the ordinary General +Public License. It also provides other free software developers Less +of an advantage over competing non-free programs. These disadvantages +are the reason we use the ordinary General Public License for many +libraries. However, the Lesser license provides advantages in certain +special circumstances. + + For example, on rare occasions, there may be a special need to +encourage the widest possible use of a certain library, so that it becomes +a de-facto standard. To achieve this, non-free programs must be +allowed to use the library. A more frequent case is that a free +library does the same job as widely used non-free libraries. In this +case, there is little to gain by limiting the free library to free +software only, so we use the Lesser General Public License. + + In other cases, permission to use a particular library in non-free +programs enables a greater number of people to use a large body of +free software. For example, permission to use the GNU C Library in +non-free programs enables many more people to use the whole GNU +operating system, as well as its variant, the GNU/Linux operating +system. + + Although the Lesser General Public License is Less protective of the +users' freedom, it does ensure that the user of a program that is +linked with the Library has the freedom and the wherewithal to run +that program using a modified version of the Library. + + The precise terms and conditions for copying, distribution and +modification follow. Pay close attention to the difference between a +"work based on the library" and a "work that uses the library". The +former contains code derived from the library, whereas the latter must +be combined with the library in order to run. + + GNU LESSER GENERAL PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. This License Agreement applies to any software library or other +program which contains a notice placed by the copyright holder or +other authorized party saying it may be distributed under the terms of +this Lesser General Public License (also called "this License"). +Each licensee is addressed as "you". + + A "library" means a collection of software functions and/or data +prepared so as to be conveniently linked with application programs +(which use some of those functions and data) to form executables. + + The "Library", below, refers to any such software library or work +which has been distributed under these terms. A "work based on the +Library" means either the Library or any derivative work under +copyright law: that is to say, a work containing the Library or a +portion of it, either verbatim or with modifications and/or translated +straightforwardly into another language. (Hereinafter, translation is +included without limitation in the term "modification".) + + "Source code" for a work means the preferred form of the work for +making modifications to it. For a library, complete source code means +all the source code for all modules it contains, plus any associated +interface definition files, plus the scripts used to control compilation +and installation of the library. + + Activities other than copying, distribution and modification are not +covered by this License; they are outside its scope. The act of +running a program using the Library is not restricted, and output from +such a program is covered only if its contents constitute a work based +on the Library (independent of the use of the Library in a tool for +writing it). Whether that is true depends on what the Library does +and what the program that uses the Library does. + + 1. You may copy and distribute verbatim copies of the Library's +complete source code as you receive it, in any medium, provided that +you conspicuously and appropriately publish on each copy an +appropriate copyright notice and disclaimer of warranty; keep intact +all the notices that refer to this License and to the absence of any +warranty; and distribute a copy of this License along with the +Library. + + You may charge a fee for the physical act of transferring a copy, +and you may at your option offer warranty protection in exchange for a +fee. + + 2. You may modify your copy or copies of the Library or any portion +of it, thus forming a work based on the Library, and copy and +distribute such modifications or work under the terms of Section 1 +above, provided that you also meet all of these conditions: + + a) The modified work must itself be a software library. + + b) You must cause the files modified to carry prominent notices + stating that you changed the files and the date of any change. + + c) You must cause the whole of the work to be licensed at no + charge to all third parties under the terms of this License. + + d) If a facility in the modified Library refers to a function or a + table of data to be supplied by an application program that uses + the facility, other than as an argument passed when the facility + is invoked, then you must make a good faith effort to ensure that, + in the event an application does not supply such function or + table, the facility still operates, and performs whatever part of + its purpose remains meaningful. + + (For example, a function in a library to compute square roots has + a purpose that is entirely well-defined independent of the + application. Therefore, Subsection 2d requires that any + application-supplied function or table used by this function must + be optional: if the application does not supply it, the square + root function must still compute square roots.) + +These requirements apply to the modified work as a whole. If +identifiable sections of that work are not derived from the Library, +and can be reasonably considered independent and separate works in +themselves, then this License, and its terms, do not apply to those +sections when you distribute them as separate works. But when you +distribute the same sections as part of a whole which is a work based +on the Library, the distribution of the whole must be on the terms of +this License, whose permissions for other licensees extend to the +entire whole, and thus to each and every part regardless of who wrote +it. + +Thus, it is not the intent of this section to claim rights or contest +your rights to work written entirely by you; rather, the intent is to +exercise the right to control the distribution of derivative or +collective works based on the Library. + +In addition, mere aggregation of another work not based on the Library +with the Library (or with a work based on the Library) on a volume of +a storage or distribution medium does not bring the other work under +the scope of this License. + + 3. You may opt to apply the terms of the ordinary GNU General Public +License instead of this License to a given copy of the Library. To do +this, you must alter all the notices that refer to this License, so +that they refer to the ordinary GNU General Public License, version 2, +instead of to this License. (If a newer version than version 2 of the +ordinary GNU General Public License has appeared, then you can specify +that version instead if you wish.) Do not make any other change in +these notices. + + Once this change is made in a given copy, it is irreversible for +that copy, so the ordinary GNU General Public License applies to all +subsequent copies and derivative works made from that copy. + + This option is useful when you wish to copy part of the code of +the Library into a program that is not a library. + + 4. You may copy and distribute the Library (or a portion or +derivative of it, under Section 2) in object code or executable form +under the terms of Sections 1 and 2 above provided that you accompany +it with the complete corresponding machine-readable source code, which +must be distributed under the terms of Sections 1 and 2 above on a +medium customarily used for software interchange. + + If distribution of object code is made by offering access to copy +from a designated place, then offering equivalent access to copy the +source code from the same place satisfies the requirement to +distribute the source code, even though third parties are not +compelled to copy the source along with the object code. + + 5. A program that contains no derivative of any portion of the +Library, but is designed to work with the Library by being compiled or +linked with it, is called a "work that uses the Library". Such a +work, in isolation, is not a derivative work of the Library, and +therefore falls outside the scope of this License. + + However, linking a "work that uses the Library" with the Library +creates an executable that is a derivative of the Library (because it +contains portions of the Library), rather than a "work that uses the +library". The executable is therefore covered by this License. +Section 6 states terms for distribution of such executables. + + When a "work that uses the Library" uses material from a header file +that is part of the Library, the object code for the work may be a +derivative work of the Library even though the source code is not. +Whether this is true is especially significant if the work can be +linked without the Library, or if the work is itself a library. The +threshold for this to be true is not precisely defined by law. + + If such an object file uses only numerical parameters, data +structure layouts and accessors, and small macros and small inline +functions (ten lines or less in length), then the use of the object +file is unrestricted, regardless of whether it is legally a derivative +work. (Executables containing this object code plus portions of the +Library will still fall under Section 6.) + + Otherwise, if the work is a derivative of the Library, you may +distribute the object code for the work under the terms of Section 6. +Any executables containing that work also fall under Section 6, +whether or not they are linked directly with the Library itself. + + 6. As an exception to the Sections above, you may also combine or +link a "work that uses the Library" with the Library to produce a +work containing portions of the Library, and distribute that work +under terms of your choice, provided that the terms permit +modification of the work for the customer's own use and reverse +engineering for debugging such modifications. + + You must give prominent notice with each copy of the work that the +Library is used in it and that the Library and its use are covered by +this License. You must supply a copy of this License. If the work +during execution displays copyright notices, you must include the +copyright notice for the Library among them, as well as a reference +directing the user to the copy of this License. Also, you must do one +of these things: + + a) Accompany the work with the complete corresponding + machine-readable source code for the Library including whatever + changes were used in the work (which must be distributed under + Sections 1 and 2 above); and, if the work is an executable linked + with the Library, with the complete machine-readable "work that + uses the Library", as object code and/or source code, so that the + user can modify the Library and then relink to produce a modified + executable containing the modified Library. (It is understood + that the user who changes the contents of definitions files in the + Library will not necessarily be able to recompile the application + to use the modified definitions.) + + b) Use a suitable shared library mechanism for linking with the + Library. A suitable mechanism is one that (1) uses at run time a + copy of the library already present on the user's computer system, + rather than copying library functions into the executable, and (2) + will operate properly with a modified version of the library, if + the user installs one, as long as the modified version is + interface-compatible with the version that the work was made with. + + c) Accompany the work with a written offer, valid for at + least three years, to give the same user the materials + specified in Subsection 6a, above, for a charge no more + than the cost of performing this distribution. + + d) If distribution of the work is made by offering access to copy + from a designated place, offer equivalent access to copy the above + specified materials from the same place. + + e) Verify that the user has already received a copy of these + materials or that you have already sent this user a copy. + + For an executable, the required form of the "work that uses the +Library" must include any data and utility programs needed for +reproducing the executable from it. However, as a special exception, +the materials to be distributed need not include anything that is +normally distributed (in either source or binary form) with the major +components (compiler, kernel, and so on) of the operating system on +which the executable runs, unless that component itself accompanies +the executable. + + It may happen that this requirement contradicts the license +restrictions of other proprietary libraries that do not normally +accompany the operating system. Such a contradiction means you cannot +use both them and the Library together in an executable that you +distribute. + + 7. You may place library facilities that are a work based on the +Library side-by-side in a single library together with other library +facilities not covered by this License, and distribute such a combined +library, provided that the separate distribution of the work based on +the Library and of the other library facilities is otherwise +permitted, and provided that you do these two things: + + a) Accompany the combined library with a copy of the same work + based on the Library, uncombined with any other library + facilities. This must be distributed under the terms of the + Sections above. + + b) Give prominent notice with the combined library of the fact + that part of it is a work based on the Library, and explaining + where to find the accompanying uncombined form of the same work. + + 8. You may not copy, modify, sublicense, link with, or distribute +the Library except as expressly provided under this License. Any +attempt otherwise to copy, modify, sublicense, link with, or +distribute the Library is void, and will automatically terminate your +rights under this License. However, parties who have received copies, +or rights, from you under this License will not have their licenses +terminated so long as such parties remain in full compliance. + + 9. You are not required to accept this License, since you have not +signed it. However, nothing else grants you permission to modify or +distribute the Library or its derivative works. These actions are +prohibited by law if you do not accept this License. Therefore, by +modifying or distributing the Library (or any work based on the +Library), you indicate your acceptance of this License to do so, and +all its terms and conditions for copying, distributing or modifying +the Library or works based on it. + + 10. Each time you redistribute the Library (or any work based on the +Library), the recipient automatically receives a license from the +original licensor to copy, distribute, link with or modify the Library +subject to these terms and conditions. You may not impose any further +restrictions on the recipients' exercise of the rights granted herein. +You are not responsible for enforcing compliance by third parties with +this License. + + 11. If, as a consequence of a court judgment or allegation of patent +infringement or for any other reason (not limited to patent issues), +conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot +distribute so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you +may not distribute the Library at all. For example, if a patent +license would not permit royalty-free redistribution of the Library by +all those who receive copies directly or indirectly through you, then +the only way you could satisfy both it and this License would be to +refrain entirely from distribution of the Library. + +If any portion of this section is held invalid or unenforceable under any +particular circumstance, the balance of the section is intended to apply, +and the section as a whole is intended to apply in other circumstances. + +It is not the purpose of this section to induce you to infringe any +patents or other property right claims or to contest validity of any +such claims; this section has the sole purpose of protecting the +integrity of the free software distribution system which is +implemented by public license practices. Many people have made +generous contributions to the wide range of software distributed +through that system in reliance on consistent application of that +system; it is up to the author/donor to decide if he or she is willing +to distribute software through any other system and a licensee cannot +impose that choice. + +This section is intended to make thoroughly clear what is believed to +be a consequence of the rest of this License. + + 12. If the distribution and/or use of the Library is restricted in +certain countries either by patents or by copyrighted interfaces, the +original copyright holder who places the Library under this License may add +an explicit geographical distribution limitation excluding those countries, +so that distribution is permitted only in or among countries not thus +excluded. In such case, this License incorporates the limitation as if +written in the body of this License. + + 13. The Free Software Foundation may publish revised and/or new +versions of the Lesser General Public License from time to time. +Such new versions will be similar in spirit to the present version, +but may differ in detail to address new problems or concerns. + +Each version is given a distinguishing version number. If the Library +specifies a version number of this License which applies to it and +"any later version", you have the option of following the terms and +conditions either of that version or of any later version published by +the Free Software Foundation. If the Library does not specify a +license version number, you may choose any version ever published by +the Free Software Foundation. + + 14. If you wish to incorporate parts of the Library into other free +programs whose distribution conditions are incompatible with these, +write to the author to ask for permission. For software which is +copyrighted by the Free Software Foundation, write to the Free +Software Foundation; we sometimes make exceptions for this. Our +decision will be guided by the two goals of preserving the free status +of all derivatives of our free software and of promoting the sharing +and reuse of software generally. + + NO WARRANTY + + 15. BECAUSE THE LIBRARY IS LICENSED FREE OF CHARGE, THERE IS NO +WARRANTY FOR THE LIBRARY, TO THE EXTENT PERMITTED BY APPLICABLE LAW. +EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR +OTHER PARTIES PROVIDE THE LIBRARY "AS IS" WITHOUT WARRANTY OF ANY +KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE +LIBRARY IS WITH YOU. SHOULD THE LIBRARY PROVE DEFECTIVE, YOU ASSUME +THE COST OF ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN +WRITING WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY +AND/OR REDISTRIBUTE THE LIBRARY AS PERMITTED ABOVE, BE LIABLE TO YOU +FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR +CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE +LIBRARY (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING +RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A +FAILURE OF THE LIBRARY TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF +SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH +DAMAGES. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Libraries + + If you develop a new library, and you want it to be of the greatest +possible use to the public, we recommend making it free software that +everyone can redistribute and change. You can do so by permitting +redistribution under these terms (or, alternatively, under the terms of the +ordinary General Public License). + + To apply these terms, attach the following notices to the library. It is +safest to attach them to the start of each source file to most effectively +convey the exclusion of warranty; and each file should have at least the +"copyright" line and a pointer to where the full notice is found. + + <one line to give the library's name and a brief idea of what it does.> + Copyright (C) <year> <name of author> + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +Also add information on how to contact you by electronic and paper mail. + +You should also get your employer (if you work as a programmer) or your +school, if any, to sign a "copyright disclaimer" for the library, if +necessary. Here is a sample; alter the names: + + Yoyodyne, Inc., hereby disclaims all copyright interest in the + library `Frob' (a library for tweaking knobs) written by James Random Hacker. + + <signature of Ty Coon>, 1 April 1990 + Ty Coon, President of Vice + +That's all there is to it! diff --git a/utils/gst-plugin-togglerecord/build.rs b/utils/gst-plugin-togglerecord/build.rs new file mode 100644 index 000000000..0d1ddb61d --- /dev/null +++ b/utils/gst-plugin-togglerecord/build.rs @@ -0,0 +1,5 @@ +extern crate gst_plugin_version_helper; + +fn main() { + gst_plugin_version_helper::get_info() +} diff --git a/utils/gst-plugin-togglerecord/examples/gtk_recording.rs b/utils/gst-plugin-togglerecord/examples/gtk_recording.rs new file mode 100644 index 000000000..8f0bc0bb1 --- /dev/null +++ b/utils/gst-plugin-togglerecord/examples/gtk_recording.rs @@ -0,0 +1,358 @@ +// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +extern crate glib; +use glib::prelude::*; +extern crate gio; +use gio::prelude::*; + +extern crate gstreamer as gst; +use gst::prelude::*; + +extern crate gsttogglerecord; + +extern crate gtk; +use gtk::prelude::*; +use std::cell::RefCell; +use std::env; + +fn create_pipeline() -> ( + gst::Pipeline, + gst::Pad, + gst::Pad, + gst::Element, + gst::Element, + gtk::Widget, +) { + let pipeline = gst::Pipeline::new(None); + + let video_src = gst::ElementFactory::make("videotestsrc", None).unwrap(); + video_src.set_property("is-live", &true).unwrap(); + video_src.set_property_from_str("pattern", "ball"); + + let timeoverlay = gst::ElementFactory::make("timeoverlay", None).unwrap(); + timeoverlay + .set_property("font-desc", &"Monospace 20") + .unwrap(); + + let video_tee = gst::ElementFactory::make("tee", None).unwrap(); + let video_queue1 = gst::ElementFactory::make("queue", None).unwrap(); + let video_queue2 = gst::ElementFactory::make("queue", None).unwrap(); + + let video_convert1 = gst::ElementFactory::make("videoconvert", None).unwrap(); + let video_convert2 = gst::ElementFactory::make("videoconvert", None).unwrap(); + + let (video_sink, video_widget) = + if let Ok(gtkglsink) = gst::ElementFactory::make("gtkglsink", None) { + let glsinkbin = gst::ElementFactory::make("glsinkbin", None).unwrap(); + glsinkbin.set_property("sink", >kglsink).unwrap(); + + let widget = gtkglsink.get_property("widget").unwrap(); + (glsinkbin, widget.get::<gtk::Widget>().unwrap().unwrap()) + } else { + let sink = gst::ElementFactory::make("gtksink", None).unwrap(); + let widget = sink.get_property("widget").unwrap(); + (sink, widget.get::<gtk::Widget>().unwrap().unwrap()) + }; + + let video_enc = gst::ElementFactory::make("x264enc", None).unwrap(); + video_enc.set_property("rc-lookahead", &10i32).unwrap(); + video_enc.set_property("key-int-max", &30u32).unwrap(); + let video_parse = gst::ElementFactory::make("h264parse", None).unwrap(); + + let audio_src = gst::ElementFactory::make("audiotestsrc", None).unwrap(); + audio_src.set_property("is-live", &true).unwrap(); + audio_src.set_property_from_str("wave", "ticks"); + + let audio_tee = gst::ElementFactory::make("tee", None).unwrap(); + let audio_queue1 = gst::ElementFactory::make("queue", None).unwrap(); + let audio_queue2 = gst::ElementFactory::make("queue", None).unwrap(); + + let audio_convert1 = gst::ElementFactory::make("audioconvert", None).unwrap(); + let audio_convert2 = gst::ElementFactory::make("audioconvert", None).unwrap(); + + let audio_sink = gst::ElementFactory::make("autoaudiosink", None).unwrap(); + + let audio_enc = gst::ElementFactory::make("lamemp3enc", None).unwrap(); + let audio_parse = gst::ElementFactory::make("mpegaudioparse", None).unwrap(); + + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + + let mux_queue1 = gst::ElementFactory::make("queue", None).unwrap(); + let mux_queue2 = gst::ElementFactory::make("queue", None).unwrap(); + + let mux = gst::ElementFactory::make("mp4mux", None).unwrap(); + + let file_sink = gst::ElementFactory::make("filesink", None).unwrap(); + file_sink + .set_property("location", &"recording.mp4") + .unwrap(); + file_sink.set_property("async", &false).unwrap(); + file_sink.set_property("sync", &false).unwrap(); + + pipeline + .add_many(&[ + &video_src, + &timeoverlay, + &video_tee, + &video_queue1, + &video_queue2, + &video_convert1, + &video_convert2, + &video_sink, + &video_enc, + &video_parse, + &audio_src, + &audio_tee, + &audio_queue1, + &audio_queue2, + &audio_convert1, + &audio_convert2, + &audio_sink, + &audio_enc, + &audio_parse, + &togglerecord, + &mux_queue1, + &mux_queue2, + &mux, + &file_sink, + ]) + .unwrap(); + + gst::Element::link_many(&[ + &video_src, + &timeoverlay, + &video_tee, + &video_queue1, + &video_convert1, + &video_sink, + ]) + .unwrap(); + + gst::Element::link_many(&[ + &video_tee, + &video_queue2, + &video_convert2, + &video_enc, + &video_parse, + ]) + .unwrap(); + + video_parse + .link_pads(Some("src"), &togglerecord, Some("sink")) + .unwrap(); + togglerecord + .link_pads(Some("src"), &mux_queue1, Some("sink")) + .unwrap(); + mux_queue1 + .link_pads(Some("src"), &mux, Some("video_%u")) + .unwrap(); + + gst::Element::link_many(&[ + &audio_src, + &audio_tee, + &audio_queue1, + &audio_convert1, + &audio_sink, + ]) + .unwrap(); + + gst::Element::link_many(&[ + &audio_tee, + &audio_queue2, + &audio_convert2, + &audio_enc, + &audio_parse, + ]) + .unwrap(); + + audio_parse + .link_pads(Some("src"), &togglerecord, Some("sink_0")) + .unwrap(); + togglerecord + .link_pads(Some("src_0"), &mux_queue2, Some("sink")) + .unwrap(); + mux_queue2 + .link_pads(Some("src"), &mux, Some("audio_%u")) + .unwrap(); + + gst::Element::link_many(&[&mux, &file_sink]).unwrap(); + + ( + pipeline, + video_queue2.get_static_pad("sink").unwrap(), + audio_queue2.get_static_pad("sink").unwrap(), + togglerecord, + video_sink, + video_widget, + ) +} + +fn create_ui(app: >k::Application) { + let (pipeline, video_pad, audio_pad, togglerecord, video_sink, video_widget) = + create_pipeline(); + + let window = gtk::Window::new(gtk::WindowType::Toplevel); + window.set_default_size(320, 240); + let vbox = gtk::Box::new(gtk::Orientation::Vertical, 0); + vbox.pack_start(&video_widget, true, true, 0); + + let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0); + let position_label = gtk::Label::new(Some("Position: 00:00:00")); + hbox.pack_start(&position_label, true, true, 5); + let recorded_duration_label = gtk::Label::new(Some("Recorded: 00:00:00")); + hbox.pack_start(&recorded_duration_label, true, true, 5); + vbox.pack_start(&hbox, false, false, 5); + + let hbox = gtk::Box::new(gtk::Orientation::Horizontal, 0); + let record_button = gtk::Button::new_with_label("Record"); + hbox.pack_start(&record_button, true, true, 5); + let finish_button = gtk::Button::new_with_label("Finish"); + hbox.pack_start(&finish_button, true, true, 5); + vbox.pack_start(&hbox, false, false, 5); + + window.add(&vbox); + window.show_all(); + + app.add_window(&window); + + let video_sink_weak = video_sink.downgrade(); + let togglerecord_weak = togglerecord.downgrade(); + let timeout_id = gtk::timeout_add(100, move || { + let video_sink = match video_sink_weak.upgrade() { + Some(video_sink) => video_sink, + None => return glib::Continue(true), + }; + + let togglerecord = match togglerecord_weak.upgrade() { + Some(togglerecord) => togglerecord, + None => return glib::Continue(true), + }; + + let position = video_sink + .query_position::<gst::ClockTime>() + .unwrap_or_else(|| 0.into()); + position_label.set_text(&format!("Position: {:.1}", position)); + + let recording_duration = togglerecord + .get_static_pad("src") + .unwrap() + .query_position::<gst::ClockTime>() + .unwrap_or_else(|| 0.into()); + recorded_duration_label.set_text(&format!("Recorded: {:.1}", recording_duration)); + + glib::Continue(true) + }); + + let togglerecord_weak = togglerecord.downgrade(); + record_button.connect_clicked(move |button| { + let togglerecord = match togglerecord_weak.upgrade() { + Some(togglerecord) => togglerecord, + None => return, + }; + + let recording = !togglerecord + .get_property("record") + .unwrap() + .get_some::<bool>() + .unwrap(); + togglerecord.set_property("record", &recording).unwrap(); + + button.set_label(if recording { "Stop" } else { "Record" }); + }); + + let record_button_weak = record_button.downgrade(); + finish_button.connect_clicked(move |button| { + let record_button = match record_button_weak.upgrade() { + Some(record_button) => record_button, + None => return, + }; + + record_button.set_sensitive(false); + button.set_sensitive(false); + + video_pad.send_event(gst::Event::new_eos().build()); + audio_pad.send_event(gst::Event::new_eos().build()); + }); + + let app_weak = app.downgrade(); + window.connect_delete_event(move |_, _| { + let app = match app_weak.upgrade() { + Some(app) => app, + None => return Inhibit(false), + }; + + app.quit(); + Inhibit(false) + }); + + let bus = pipeline.get_bus().unwrap(); + let app_weak = app.downgrade(); + bus.add_watch_local(move |_, msg| { + use gst::MessageView; + + let app = match app_weak.upgrade() { + Some(app) => app, + None => return glib::Continue(false), + }; + + match msg.view() { + MessageView::Eos(..) => app.quit(), + MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + msg.get_src().map(|s| s.get_path_string()), + err.get_error(), + err.get_debug() + ); + app.quit(); + } + _ => (), + }; + + glib::Continue(true) + }) + .expect("Failed to add bus watch"); + + pipeline.set_state(gst::State::Playing).unwrap(); + + // Pipeline reference is owned by the closure below, so will be + // destroyed once the app is destroyed + let timeout_id = RefCell::new(Some(timeout_id)); + app.connect_shutdown(move |_| { + pipeline.set_state(gst::State::Null).unwrap(); + + bus.remove_watch().unwrap(); + + if let Some(timeout_id) = timeout_id.borrow_mut().take() { + glib::source_remove(timeout_id); + } + }); +} + +fn main() { + gst::init().unwrap(); + gtk::init().unwrap(); + + gsttogglerecord::plugin_register_static().expect("Failed to register togglerecord plugin"); + + let app = gtk::Application::new(None, gio::ApplicationFlags::FLAGS_NONE).unwrap(); + + app.connect_activate(create_ui); + let args = env::args().collect::<Vec<_>>(); + app.run(&args); +} diff --git a/utils/gst-plugin-togglerecord/src/lib.rs b/utils/gst-plugin-togglerecord/src/lib.rs new file mode 100644 index 000000000..9f2154662 --- /dev/null +++ b/utils/gst-plugin-togglerecord/src/lib.rs @@ -0,0 +1,48 @@ +// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +#![crate_type = "cdylib"] + +#[macro_use] +extern crate glib; +#[macro_use] +extern crate gstreamer as gst; +extern crate gstreamer_audio as gst_audio; +extern crate gstreamer_video as gst_video; + +#[macro_use] +extern crate lazy_static; + +extern crate parking_lot; + +mod togglerecord; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + togglerecord::register(plugin) +} + +gst_plugin_define!( + togglerecord, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "LGPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/utils/gst-plugin-togglerecord/src/togglerecord.rs b/utils/gst-plugin-togglerecord/src/togglerecord.rs new file mode 100644 index 000000000..07872b73d --- /dev/null +++ b/utils/gst-plugin-togglerecord/src/togglerecord.rs @@ -0,0 +1,1749 @@ +// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +use glib; +use glib::prelude::*; +use glib::subclass; +use glib::subclass::prelude::*; +use gst; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_audio; +use gst_video; + +use parking_lot::{Condvar, Mutex}; +use std::cmp; +use std::collections::HashMap; +use std::f64; +use std::iter; +use std::sync::Arc; + +const DEFAULT_RECORD: bool = false; + +#[derive(Debug, Clone, Copy)] +struct Settings { + record: bool, +} + +impl Default for Settings { + fn default() -> Self { + Settings { + record: DEFAULT_RECORD, + } + } +} + +static PROPERTIES: [subclass::Property; 2] = [ + subclass::Property("record", |name| { + glib::ParamSpec::boolean( + name, + "Record", + "Enable/disable recording", + DEFAULT_RECORD, + glib::ParamFlags::READWRITE, + ) + }), + subclass::Property("recording", |name| { + glib::ParamSpec::boolean( + name, + "Recording", + "Whether recording is currently taking place", + DEFAULT_RECORD, + glib::ParamFlags::READABLE, + ) + }), +]; + +#[derive(Clone)] +struct Stream { + sinkpad: gst::Pad, + srcpad: gst::Pad, + state: Arc<Mutex<StreamState>>, +} + +impl PartialEq for Stream { + fn eq(&self, other: &Self) -> bool { + self.sinkpad == other.sinkpad && self.srcpad == other.srcpad + } +} + +impl Eq for Stream {} + +impl Stream { + fn new(sinkpad: gst::Pad, srcpad: gst::Pad) -> Self { + Self { + sinkpad, + srcpad, + state: Arc::new(Mutex::new(StreamState::default())), + } + } +} + +struct StreamState { + in_segment: gst::FormattedSegment<gst::ClockTime>, + out_segment: gst::FormattedSegment<gst::ClockTime>, + segment_seqnum: gst::Seqnum, + current_running_time: gst::ClockTime, + eos: bool, + flushing: bool, + segment_pending: bool, + pending_events: Vec<gst::Event>, + audio_info: Option<gst_audio::AudioInfo>, + video_info: Option<gst_video::VideoInfo>, +} + +impl Default for StreamState { + fn default() -> Self { + Self { + in_segment: gst::FormattedSegment::new(), + out_segment: gst::FormattedSegment::new(), + segment_seqnum: gst::Seqnum::next(), + current_running_time: gst::CLOCK_TIME_NONE, + eos: false, + flushing: false, + segment_pending: false, + pending_events: Vec::new(), + audio_info: None, + video_info: None, + } + } +} + +// Recording behaviour: +// +// Secondary streams are *always* behind main stream +// Main stream EOS stops recording (-> Stopping), makes secondary streams go EOS +// +// Recording: Passing through all data +// Stopping: Main stream remembering current last_recording_stop, waiting for all +// other streams to reach this position +// Stopped: Dropping all data +// Starting: Main stream waiting until next keyframe and setting last_recording_start, waiting +// for all other streams to reach this position +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum RecordingState { + Recording, + Stopping, + Stopped, + Starting, +} + +#[derive(Debug)] +struct State { + recording_state: RecordingState, + last_recording_start: gst::ClockTime, + last_recording_stop: gst::ClockTime, + // Accumulated duration of previous recording segments, + // updated whenever going to Stopped + recording_duration: gst::ClockTime, + // Updated whenever going to Recording + running_time_offset: gst::ClockTime, +} + +impl Default for State { + fn default() -> Self { + Self { + recording_state: RecordingState::Stopped, + last_recording_start: gst::CLOCK_TIME_NONE, + last_recording_stop: gst::CLOCK_TIME_NONE, + recording_duration: 0.into(), + running_time_offset: gst::CLOCK_TIME_NONE, + } + } +} + +#[derive(Debug, PartialEq, Eq)] +enum HandleResult<T> { + Pass(T), + Drop, + Eos, + Flushing, +} + +trait HandleData: Sized { + fn get_pts(&self) -> gst::ClockTime; + fn get_dts(&self) -> gst::ClockTime; + fn get_dts_or_pts(&self) -> gst::ClockTime { + let dts = self.get_dts(); + if dts.is_some() { + dts + } else { + self.get_pts() + } + } + fn get_duration(&self, state: &StreamState) -> gst::ClockTime; + fn is_keyframe(&self) -> bool; + fn can_clip(&self, state: &StreamState) -> bool; + fn clip( + self, + state: &StreamState, + segment: &gst::FormattedSegment<gst::ClockTime>, + ) -> Option<Self>; +} + +impl HandleData for (gst::ClockTime, gst::ClockTime) { + fn get_pts(&self) -> gst::ClockTime { + self.0 + } + + fn get_dts(&self) -> gst::ClockTime { + self.0 + } + + fn get_duration(&self, _state: &StreamState) -> gst::ClockTime { + self.1 + } + + fn is_keyframe(&self) -> bool { + true + } + + fn can_clip(&self, _state: &StreamState) -> bool { + true + } + + fn clip( + self, + _state: &StreamState, + segment: &gst::FormattedSegment<gst::ClockTime>, + ) -> Option<Self> { + let stop = if self.1.is_some() { + self.0 + self.1 + } else { + self.0 + }; + + segment + .clip(self.0, stop) + .map(|(start, stop)| (start, stop - start)) + } +} + +impl HandleData for gst::Buffer { + fn get_pts(&self) -> gst::ClockTime { + gst::BufferRef::get_pts(self) + } + + fn get_dts(&self) -> gst::ClockTime { + gst::BufferRef::get_dts(self) + } + + fn get_duration(&self, state: &StreamState) -> gst::ClockTime { + let duration = gst::BufferRef::get_duration(self); + + if duration.is_some() { + duration + } else if let Some(ref video_info) = state.video_info { + if video_info.fps() != 0.into() { + gst::SECOND + .mul_div_floor( + *video_info.fps().denom() as u64, + *video_info.fps().numer() as u64, + ) + .unwrap_or(gst::CLOCK_TIME_NONE) + } else { + gst::CLOCK_TIME_NONE + } + } else if let Some(ref audio_info) = state.audio_info { + if audio_info.bpf() == 0 || audio_info.rate() == 0 { + return gst::CLOCK_TIME_NONE; + } + + let size = self.get_size() as u64; + let num_samples = size / audio_info.bpf() as u64; + gst::SECOND + .mul_div_floor(num_samples, audio_info.rate() as u64) + .unwrap_or(gst::CLOCK_TIME_NONE) + } else { + gst::CLOCK_TIME_NONE + } + } + + fn is_keyframe(&self) -> bool { + !gst::BufferRef::get_flags(self).contains(gst::BufferFlags::DELTA_UNIT) + } + + fn can_clip(&self, state: &StreamState) -> bool { + // Only do actual clipping for raw audio/video + if let Some(ref audio_info) = state.audio_info { + if audio_info.format() == gst_audio::AudioFormat::Unknown + || audio_info.format() == gst_audio::AudioFormat::Encoded + || audio_info.rate() == 0 + || audio_info.bpf() == 0 + { + return false; + } + } else if let Some(ref video_info) = state.video_info { + if video_info.format() == gst_video::VideoFormat::Unknown + || video_info.format() == gst_video::VideoFormat::Encoded + || self.get_dts_or_pts() != self.get_pts() + { + return false; + } + } else { + return false; + } + + true + } + + fn clip( + mut self, + state: &StreamState, + segment: &gst::FormattedSegment<gst::ClockTime>, + ) -> Option<Self> { + // Only do actual clipping for raw audio/video + if !self.can_clip(state) { + return Some(self); + } + + let pts = HandleData::get_pts(&self); + let duration = HandleData::get_duration(&self, state); + let stop = if duration.is_some() { + pts + duration + } else { + pts + }; + + if let Some(ref audio_info) = state.audio_info { + gst_audio::audio_buffer_clip( + self, + segment.upcast_ref(), + audio_info.rate(), + audio_info.bpf(), + ) + } else if state.video_info.is_some() { + segment.clip(pts, stop).map(move |(start, stop)| { + { + let buffer = self.make_mut(); + buffer.set_pts(start); + buffer.set_dts(start); + buffer.set_duration(stop - start); + } + + self + }) + } else { + unreachable!(); + } + } +} + +struct ToggleRecord { + settings: Mutex<Settings>, + state: Mutex<State>, + main_stream: Stream, + // Always must have main_stream.state locked! + // If multiple stream states have to be locked, the + // main_stream always comes first + main_stream_cond: Condvar, + other_streams: Mutex<(Vec<Stream>, u32)>, + pads: Mutex<HashMap<gst::Pad, Stream>>, +} + +lazy_static! { + static ref CAT: gst::DebugCategory = gst::DebugCategory::new( + "togglerecord", + gst::DebugColorFlags::empty(), + Some("Toggle Record Element"), + ); +} + +impl ToggleRecord { + fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) { + sinkpad.set_chain_function(|pad, parent, buffer| { + ToggleRecord::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |togglerecord, element| togglerecord.sink_chain(pad, element, buffer), + ) + }); + sinkpad.set_event_function(|pad, parent, event| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.sink_event(pad, element, event), + ) + }); + sinkpad.set_query_function(|pad, parent, query| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.sink_query(pad, element, query), + ) + }); + sinkpad.set_iterate_internal_links_function(|pad, parent| { + ToggleRecord::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |togglerecord, element| togglerecord.iterate_internal_links(pad, element), + ) + }); + + srcpad.set_event_function(|pad, parent, event| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.src_event(pad, element, event), + ) + }); + srcpad.set_query_function(|pad, parent, query| { + ToggleRecord::catch_panic_pad_function( + parent, + || false, + |togglerecord, element| togglerecord.src_query(pad, element, query), + ) + }); + srcpad.set_iterate_internal_links_function(|pad, parent| { + ToggleRecord::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |togglerecord, element| togglerecord.iterate_internal_links(pad, element), + ) + }); + } + + fn handle_main_stream<T: HandleData>( + &self, + element: &gst::Element, + pad: &gst::Pad, + stream: &Stream, + data: T, + ) -> Result<HandleResult<T>, gst::FlowError> { + let mut state = stream.state.lock(); + + let mut dts_or_pts = data.get_dts_or_pts(); + let duration = data.get_duration(&state); + + if !dts_or_pts.is_some() { + gst_element_error!( + element, + gst::StreamError::Format, + ["Buffer without DTS or PTS"] + ); + return Err(gst::FlowError::Error); + } + + let mut dts_or_pts_end = if duration.is_some() { + dts_or_pts + duration + } else { + dts_or_pts + }; + + let data = match data.clip(&state, &state.in_segment) { + None => { + gst_log!(CAT, obj: pad, "Dropping raw data outside segment"); + return Ok(HandleResult::Drop); + } + Some(data) => data, + }; + + // This will only do anything for non-raw data + dts_or_pts = cmp::max(state.in_segment.get_start(), dts_or_pts); + dts_or_pts_end = cmp::max(state.in_segment.get_start(), dts_or_pts_end); + if state.in_segment.get_stop().is_some() { + dts_or_pts = cmp::min(state.in_segment.get_stop(), dts_or_pts); + dts_or_pts_end = cmp::min(state.in_segment.get_stop(), dts_or_pts_end); + } + + let current_running_time = state.in_segment.to_running_time(dts_or_pts); + let current_running_time_end = state.in_segment.to_running_time(dts_or_pts_end); + state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); + + // Wake up everybody, we advanced a bit + // Important: They will only be able to advance once we're done with this + // function or waiting for them to catch up below, otherwise they might + // get the wrong state + self.main_stream_cond.notify_all(); + + gst_log!( + CAT, + obj: pad, + "Main stream current running time {}-{} (position: {}-{})", + current_running_time, + current_running_time_end, + dts_or_pts, + dts_or_pts_end + ); + + let settings = *self.settings.lock(); + + // First check if we have to update our recording state + let mut rec_state = self.state.lock(); + let settings_changed = match rec_state.recording_state { + RecordingState::Recording if !settings.record => { + gst_debug!(CAT, obj: pad, "Stopping recording"); + rec_state.recording_state = RecordingState::Stopping; + true + } + RecordingState::Stopped if settings.record => { + gst_debug!(CAT, obj: pad, "Starting recording"); + rec_state.recording_state = RecordingState::Starting; + true + } + _ => false, + }; + + match rec_state.recording_state { + RecordingState::Recording => { + // Remember where we stopped last, in case of EOS + rec_state.last_recording_stop = current_running_time_end; + gst_log!(CAT, obj: pad, "Passing buffer (recording)"); + Ok(HandleResult::Pass(data)) + } + RecordingState::Stopping => { + if !data.is_keyframe() { + // Remember where we stopped last, in case of EOS + rec_state.last_recording_stop = current_running_time_end; + gst_log!(CAT, obj: pad, "Passing non-keyframe buffer (stopping)"); + + drop(rec_state); + drop(state); + if settings_changed { + gst_debug!(CAT, obj: pad, "Requesting a new keyframe"); + stream + .sinkpad + .push_event(gst_video::new_upstream_force_key_unit_event().build()); + } + + return Ok(HandleResult::Pass(data)); + } + + // Remember the time when we stopped: now, i.e. right before the current buffer! + rec_state.last_recording_stop = current_running_time; + gst_debug!(CAT, obj: pad, "Stopping at {}", current_running_time); + + // Then unlock and wait for all other streams to reach it or go EOS instead. + drop(rec_state); + + while !self.other_streams.lock().0.iter().all(|s| { + let s = s.state.lock(); + s.eos + || (s.current_running_time.is_some() + && s.current_running_time >= current_running_time_end) + }) { + gst_log!(CAT, obj: pad, "Waiting for other streams to stop"); + self.main_stream_cond.wait(&mut state); + } + + if state.flushing { + gst_debug!(CAT, obj: pad, "Flushing"); + return Ok(HandleResult::Flushing); + } + + let mut rec_state = self.state.lock(); + rec_state.recording_state = RecordingState::Stopped; + let advance_by = rec_state.last_recording_stop - rec_state.last_recording_start; + rec_state.recording_duration += advance_by; + rec_state.last_recording_start = gst::CLOCK_TIME_NONE; + rec_state.last_recording_stop = gst::CLOCK_TIME_NONE; + + gst_debug!( + CAT, + obj: pad, + "Stopped at {}, recording duration {}", + current_running_time, + rec_state.recording_duration + ); + + // Then become Stopped and drop this buffer. We always stop right before + // a keyframe + gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); + + drop(rec_state); + drop(state); + element.notify("recording"); + + Ok(HandleResult::Drop) + } + RecordingState::Stopped => { + gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); + Ok(HandleResult::Drop) + } + RecordingState::Starting => { + // If this is no keyframe, we can directly go out again here and drop the frame + if !data.is_keyframe() { + gst_log!(CAT, obj: pad, "Dropping non-keyframe buffer (starting)"); + + drop(rec_state); + drop(state); + if settings_changed { + gst_debug!(CAT, obj: pad, "Requesting a new keyframe"); + stream + .sinkpad + .push_event(gst_video::new_upstream_force_key_unit_event().build()); + } + + return Ok(HandleResult::Drop); + } + + // Remember the time when we started: now! + rec_state.last_recording_start = current_running_time; + rec_state.running_time_offset = current_running_time - rec_state.recording_duration; + gst_debug!(CAT, obj: pad, "Starting at {}", current_running_time); + + state.segment_pending = true; + for other_stream in &self.other_streams.lock().0 { + other_stream.state.lock().segment_pending = true; + } + + // Then unlock and wait for all other streams to reach + // it or go EOS instead + drop(rec_state); + + while !self.other_streams.lock().0.iter().all(|s| { + let s = s.state.lock(); + s.eos + || (s.current_running_time.is_some() + && s.current_running_time >= current_running_time_end) + }) { + gst_log!(CAT, obj: pad, "Waiting for other streams to start"); + self.main_stream_cond.wait(&mut state); + } + + if state.flushing { + gst_debug!(CAT, obj: pad, "Flushing"); + return Ok(HandleResult::Flushing); + } + + let mut rec_state = self.state.lock(); + rec_state.recording_state = RecordingState::Recording; + gst_debug!( + CAT, + obj: pad, + "Started at {}, recording duration {}", + current_running_time, + rec_state.recording_duration + ); + + gst_log!(CAT, obj: pad, "Passing buffer (recording)"); + + drop(rec_state); + drop(state); + element.notify("recording"); + + Ok(HandleResult::Pass(data)) + } + } + } + + fn handle_secondary_stream<T: HandleData>( + &self, + element: &gst::Element, + pad: &gst::Pad, + stream: &Stream, + data: T, + ) -> Result<HandleResult<T>, gst::FlowError> { + // Calculate end pts & current running time and make sure we stay in the segment + let mut state = stream.state.lock(); + + let mut pts = data.get_pts(); + let duration = data.get_duration(&state); + + if pts.is_none() { + gst_element_error!(element, gst::StreamError::Format, ["Buffer without PTS"]); + return Err(gst::FlowError::Error); + } + + let dts = data.get_dts(); + if dts.is_some() && pts.is_some() && dts != pts { + gst_element_error!( + element, + gst::StreamError::Format, + ["DTS != PTS not supported for secondary streams"] + ); + return Err(gst::FlowError::Error); + } + + if !data.is_keyframe() { + gst_element_error!( + element, + gst::StreamError::Format, + ["Delta-units not supported for secondary streams"] + ); + return Err(gst::FlowError::Error); + } + + let mut pts_end = if duration.is_some() { + pts + duration + } else { + pts + }; + + let data = match data.clip(&state, &state.in_segment) { + None => { + gst_log!(CAT, obj: pad, "Dropping raw data outside segment"); + return Ok(HandleResult::Drop); + } + Some(data) => data, + }; + + // This will only do anything for non-raw data + pts = cmp::max(state.in_segment.get_start(), pts); + pts_end = cmp::max(state.in_segment.get_start(), pts_end); + if state.in_segment.get_stop().is_some() { + pts = cmp::min(state.in_segment.get_stop(), pts); + pts_end = cmp::min(state.in_segment.get_stop(), pts_end); + } + + let current_running_time = state.in_segment.to_running_time(pts); + let current_running_time_end = state.in_segment.to_running_time(pts_end); + state.current_running_time = cmp::max(current_running_time_end, state.current_running_time); + + gst_log!( + CAT, + obj: pad, + "Secondary stream current running time {}-{} (position: {}-{}", + current_running_time, + current_running_time_end, + pts, + pts_end + ); + + drop(state); + + let mut main_state = self.main_stream.state.lock(); + + // Wake up, in case the main stream is waiting for us to progress up to here. We progressed + // above but all notifying must happen while the main_stream state is locked as per above. + self.main_stream_cond.notify_all(); + + while (main_state.current_running_time == gst::CLOCK_TIME_NONE + || main_state.current_running_time < current_running_time_end) + && !main_state.eos + && !stream.state.lock().flushing + { + gst_log!( + CAT, + obj: pad, + "Waiting for reaching {} / EOS / flushing, main stream at {}", + current_running_time, + main_state.current_running_time + ); + + self.main_stream_cond.wait(&mut main_state); + } + + state = stream.state.lock(); + + if state.flushing { + gst_debug!(CAT, obj: pad, "Flushing"); + return Ok(HandleResult::Flushing); + } + + let rec_state = self.state.lock(); + + // If the main stream is EOS, we are also EOS unless we are + // before the final last recording stop running time + if main_state.eos { + // If we have no start or stop position (we never recorded) then we're EOS too now + if rec_state.last_recording_stop.is_none() || rec_state.last_recording_start.is_none() { + gst_debug!(CAT, obj: pad, "Main stream EOS and recording never started",); + return Ok(HandleResult::Eos); + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_start + && current_running_time_end > rec_state.last_recording_start + { + // Otherwise if we're before the recording start but the end of the buffer is after + // the start and we can clip, clip the buffer and pass it onwards. + gst_debug!( + CAT, + obj: pad, + "Main stream EOS and we're not EOS yet (overlapping recording start, {} < {} < {})", + current_running_time, + rec_state.last_recording_start, + current_running_time_end + ); + + let mut clip_start = state + .in_segment + .position_from_running_time(rec_state.last_recording_start); + if clip_start.is_none() { + clip_start = state.in_segment.get_start(); + } + let mut clip_stop = state + .in_segment + .position_from_running_time(rec_state.last_recording_stop); + if clip_stop.is_none() { + clip_stop = state.in_segment.get_stop(); + } + let mut segment = state.in_segment.clone(); + segment.set_start(clip_start); + segment.set_stop(clip_stop); + + gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + return Ok(HandleResult::Pass(data)); + } else { + gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); + return Ok(HandleResult::Drop); + } + } else if current_running_time < rec_state.last_recording_start { + // Otherwise if the buffer starts before the recording start, drop it. This + // means that we either can't clip, or that the end is also before the + // recording start + gst_debug!( + CAT, + obj: pad, + "Main stream EOS and we're not EOS yet (before recording start, {} < {})", + current_running_time, + rec_state.last_recording_start + ); + return Ok(HandleResult::Drop); + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_stop + && current_running_time_end > rec_state.last_recording_stop + { + // Similarly if the end is after the recording stop but the start is before and we + // can clip, clip the buffer and pass it through. + gst_debug!( + CAT, + obj: pad, + "Main stream EOS and we're not EOS yet (overlapping recording end, {} < {} < {})", + current_running_time, + rec_state.last_recording_stop, + current_running_time_end + ); + + let mut clip_start = state + .in_segment + .position_from_running_time(rec_state.last_recording_start); + if clip_start.is_none() { + clip_start = state.in_segment.get_start(); + } + let mut clip_stop = state + .in_segment + .position_from_running_time(rec_state.last_recording_stop); + if clip_stop.is_none() { + clip_stop = state.in_segment.get_stop(); + } + let mut segment = state.in_segment.clone(); + segment.set_start(clip_start); + segment.set_stop(clip_stop); + + gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + return Ok(HandleResult::Pass(data)); + } else { + gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); + return Ok(HandleResult::Eos); + } + } else if current_running_time_end > rec_state.last_recording_stop { + // Otherwise if the end of the buffer is after the recording stop, we're EOS + // now. This means that we either couldn't clip or that the start is also after + // the recording stop + gst_debug!( + CAT, + obj: pad, + "Main stream EOS and we're EOS too (after recording end, {} > {})", + current_running_time_end, + rec_state.last_recording_stop + ); + return Ok(HandleResult::Eos); + } else { + // In all other cases the buffer is fully between recording start and end and + // can be passed through as is + assert!(current_running_time >= rec_state.last_recording_start); + assert!(current_running_time_end <= rec_state.last_recording_stop); + + gst_debug!( + CAT, + obj: pad, + "Main stream EOS and we're not EOS yet (before recording end, {} <= {} <= {})", + rec_state.last_recording_start, + current_running_time, + rec_state.last_recording_stop + ); + return Ok(HandleResult::Pass(data)); + } + } + + // The end of our buffer is before the end of the previous buffer of the main stream + assert!(main_state.current_running_time >= current_running_time_end); + + match rec_state.recording_state { + RecordingState::Recording => { + // We're properly started, must have a start position and + // be actually after that start position + assert!(rec_state.last_recording_start.is_some()); + assert!(current_running_time >= rec_state.last_recording_start); + gst_log!(CAT, obj: pad, "Passing buffer (recording)"); + Ok(HandleResult::Pass(data)) + } + RecordingState::Stopping => { + // If we have no start position yet, the main stream is waiting for a key-frame + if rec_state.last_recording_stop.is_none() { + gst_log!( + CAT, + obj: pad, + "Passing buffer (stopping: waiting for keyframe)", + ); + Ok(HandleResult::Pass(data)) + } else if current_running_time_end <= rec_state.last_recording_stop { + gst_log!( + CAT, + obj: pad, + "Passing buffer (stopping: {} <= {})", + current_running_time_end, + rec_state.last_recording_stop + ); + Ok(HandleResult::Pass(data)) + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_stop + && current_running_time_end > rec_state.last_recording_stop + { + gst_log!( + CAT, + obj: pad, + "Passing buffer (stopping: {} < {} < {})", + current_running_time, + rec_state.last_recording_stop, + current_running_time_end, + ); + + let mut clip_stop = state + .in_segment + .position_from_running_time(rec_state.last_recording_stop); + if clip_stop.is_none() { + clip_stop = state.in_segment.get_stop(); + } + let mut segment = state.in_segment.clone(); + segment.set_stop(clip_stop); + + gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + Ok(HandleResult::Pass(data)) + } else { + gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); + Ok(HandleResult::Drop) + } + } else { + gst_log!( + CAT, + obj: pad, + "Dropping buffer (stopping: {} > {})", + current_running_time_end, + rec_state.last_recording_stop + ); + Ok(HandleResult::Drop) + } + } + RecordingState::Stopped => { + // We're properly stopped + gst_log!(CAT, obj: pad, "Dropping buffer (stopped)"); + Ok(HandleResult::Drop) + } + RecordingState::Starting => { + // If we have no start position yet, the main stream is waiting for a key-frame + if rec_state.last_recording_start.is_none() { + gst_log!( + CAT, + obj: pad, + "Dropping buffer (starting: waiting for keyframe)", + ); + Ok(HandleResult::Drop) + } else if current_running_time >= rec_state.last_recording_start { + gst_log!( + CAT, + obj: pad, + "Passing buffer (starting: {} >= {})", + current_running_time, + rec_state.last_recording_start + ); + Ok(HandleResult::Pass(data)) + } else if data.can_clip(&*state) + && current_running_time < rec_state.last_recording_start + && current_running_time_end > rec_state.last_recording_start + { + gst_log!( + CAT, + obj: pad, + "Passing buffer (starting: {} < {} < {})", + current_running_time, + rec_state.last_recording_start, + current_running_time_end, + ); + + let mut clip_start = state + .in_segment + .position_from_running_time(rec_state.last_recording_start); + if clip_start.is_none() { + clip_start = state.in_segment.get_start(); + } + let mut segment = state.in_segment.clone(); + segment.set_start(clip_start); + + gst_log!(CAT, obj: pad, "Clipping to segment {:?}", segment,); + + if let Some(data) = data.clip(&*state, &segment) { + Ok(HandleResult::Pass(data)) + } else { + gst_warning!(CAT, obj: pad, "Complete buffer clipped!"); + Ok(HandleResult::Drop) + } + } else { + gst_log!( + CAT, + obj: pad, + "Dropping buffer (starting: {} < {})", + current_running_time, + rec_state.last_recording_start + ); + Ok(HandleResult::Drop) + } + } + } + } + + fn sink_chain( + &self, + pad: &gst::Pad, + element: &gst::Element, + buffer: gst::Buffer, + ) -> Result<gst::FlowSuccess, gst::FlowError> { + let stream = self.pads.lock().get(pad).cloned().ok_or_else(|| { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + gst::FlowError::Error + })?; + + gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); + + { + let state = stream.state.lock(); + if state.eos { + return Err(gst::FlowError::Eos); + } + } + + let handle_result = if stream != self.main_stream { + self.handle_secondary_stream(element, pad, &stream, buffer) + } else { + self.handle_main_stream(element, pad, &stream, buffer) + }?; + + let buffer = match handle_result { + HandleResult::Drop => { + return Ok(gst::FlowSuccess::Ok); + } + HandleResult::Flushing => { + return Err(gst::FlowError::Flushing); + } + HandleResult::Eos => { + stream.srcpad.push_event( + gst::Event::new_eos() + .seqnum(stream.state.lock().segment_seqnum) + .build(), + ); + return Err(gst::FlowError::Eos); + } + HandleResult::Pass(buffer) => { + // Pass through and actually push the buffer + buffer + } + }; + + let out_running_time = { + let mut state = stream.state.lock(); + let mut events = Vec::with_capacity(state.pending_events.len() + 1); + + if state.segment_pending { + let rec_state = self.state.lock(); + + // Adjust so that last_recording_start has running time of + // recording_duration + + state.out_segment = state.in_segment.clone(); + let offset = rec_state.running_time_offset.unwrap_or(0); + state + .out_segment + .offset_running_time(-(offset as i64)) + .expect("Adjusting record duration"); + events.push( + gst::Event::new_segment(&state.out_segment) + .seqnum(state.segment_seqnum) + .build(), + ); + state.segment_pending = false; + gst_debug!(CAT, obj: pad, "Pending Segment {:?}", &state.out_segment); + } + + if !state.pending_events.is_empty() { + gst_debug!(CAT, obj: pad, "Pushing pending events"); + } + + events.append(&mut state.pending_events); + + let out_running_time = state.out_segment.to_running_time(buffer.get_pts()); + + // Unlock before pushing + drop(state); + + for e in events.drain(..) { + stream.srcpad.push_event(e); + } + + out_running_time + }; + + gst_log!( + CAT, + obj: pad, + "Pushing buffer with running time {}: {:?}", + out_running_time, + buffer + ); + stream.srcpad.push(buffer) + } + + fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool { + use gst::EventView; + + let stream = match self.pads.lock().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(CAT, obj: pad, "Handling event {:?}", event); + + let mut forward = true; + let mut send_pending = false; + + match event.view() { + EventView::FlushStart(..) => { + let _main_state = if stream != self.main_stream { + Some(self.main_stream.state.lock()) + } else { + None + }; + let mut state = stream.state.lock(); + + state.flushing = true; + self.main_stream_cond.notify_all(); + } + EventView::FlushStop(..) => { + let mut state = stream.state.lock(); + + state.eos = false; + state.flushing = false; + state.segment_pending = false; + state.current_running_time = gst::CLOCK_TIME_NONE; + } + EventView::Caps(c) => { + let mut state = stream.state.lock(); + let caps = c.get_caps(); + let s = caps.get_structure(0).unwrap(); + if s.get_name().starts_with("audio/") { + state.audio_info = gst_audio::AudioInfo::from_caps(caps).ok(); + gst_log!(CAT, obj: pad, "Got audio caps {:?}", state.audio_info); + state.video_info = None; + } else if s.get_name().starts_with("video/") { + state.audio_info = None; + state.video_info = gst_video::VideoInfo::from_caps(caps).ok(); + gst_log!(CAT, obj: pad, "Got video caps {:?}", state.video_info); + } else { + state.audio_info = None; + state.video_info = None; + } + } + EventView::Segment(e) => { + let mut state = stream.state.lock(); + + let segment = match e.get_segment().clone().downcast::<gst::ClockTime>() { + Err(segment) => { + gst_element_error!( + element, + gst::StreamError::Format, + [ + "Only Time segments supported, got {:?}", + segment.get_format(), + ] + ); + return false; + } + Ok(segment) => segment, + }; + + if (segment.get_rate() - 1.0).abs() > f64::EPSILON { + gst_element_error!( + element, + gst::StreamError::Format, + [ + "Only rate==1.0 segments supported, got {:?}", + segment.get_rate(), + ] + ); + return false; + } + + state.in_segment = segment; + state.segment_seqnum = event.get_seqnum(); + state.segment_pending = true; + state.current_running_time = gst::CLOCK_TIME_NONE; + + gst_debug!(CAT, obj: pad, "Got new Segment {:?}", state.in_segment); + + forward = false; + } + EventView::Gap(e) => { + gst_debug!(CAT, obj: pad, "Handling Gap event {:?}", event); + let (pts, duration) = e.get(); + let handle_result = if stream == self.main_stream { + self.handle_main_stream(element, pad, &stream, (pts, duration)) + } else { + self.handle_secondary_stream(element, pad, &stream, (pts, duration)) + }; + + forward = match handle_result { + Ok(HandleResult::Pass((new_pts, new_duration))) if new_pts.is_some() => { + if new_pts != pts || new_duration != duration { + event = gst::Event::new_gap(new_pts, new_duration).build(); + } + true + } + Ok(_) => false, + Err(_) => false, + }; + } + EventView::Eos(..) => { + let _main_state = if stream != self.main_stream { + Some(self.main_stream.state.lock()) + } else { + None + }; + let mut state = stream.state.lock(); + + state.eos = true; + self.main_stream_cond.notify_all(); + gst_debug!( + CAT, + obj: pad, + "Stream is EOS now, sending any pending events" + ); + + send_pending = true; + } + _ => (), + }; + + // If a serialized event and coming after Segment and a new Segment is pending, + // queue up and send at a later time (buffer/gap) after we sent the Segment + let type_ = event.get_type(); + if forward + && type_ != gst::EventType::Eos + && type_.is_serialized() + && type_.partial_cmp(&gst::EventType::Segment) == Some(cmp::Ordering::Greater) + { + let mut state = stream.state.lock(); + if state.segment_pending { + gst_log!(CAT, obj: pad, "Storing event for later pushing"); + state.pending_events.push(event); + return true; + } + } + + if send_pending { + let mut state = stream.state.lock(); + let mut events = Vec::with_capacity(state.pending_events.len() + 1); + + // Got not a single buffer on this stream before EOS, forward + // the input segment + if state.segment_pending { + events.push( + gst::Event::new_segment(&state.in_segment) + .seqnum(state.segment_seqnum) + .build(), + ); + } + events.append(&mut state.pending_events); + drop(state); + + for e in events.drain(..) { + stream.srcpad.push_event(e); + } + } + + if forward { + gst_log!(CAT, obj: pad, "Forwarding event {:?}", event); + stream.srcpad.push_event(event) + } else { + gst_log!(CAT, obj: pad, "Dropping event {:?}", event); + true + } + } + + fn sink_query( + &self, + pad: &gst::Pad, + element: &gst::Element, + query: &mut gst::QueryRef, + ) -> bool { + let stream = match self.pads.lock().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(CAT, obj: pad, "Handling query {:?}", query); + + stream.srcpad.peer_query(query) + } + + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, mut event: gst::Event) -> bool { + use gst::EventView; + + let stream = match self.pads.lock().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(CAT, obj: pad, "Handling event {:?}", event); + + let forward = match event.view() { + EventView::Seek(..) => false, + _ => true, + }; + + let rec_state = self.state.lock(); + let running_time_offset = rec_state.running_time_offset.unwrap_or(0) as i64; + let offset = event.get_running_time_offset(); + event + .make_mut() + .set_running_time_offset(offset + running_time_offset); + drop(rec_state); + + if forward { + gst_log!(CAT, obj: pad, "Forwarding event {:?}", event); + stream.sinkpad.push_event(event) + } else { + gst_log!(CAT, obj: pad, "Dropping event {:?}", event); + false + } + } + + fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool { + use gst::QueryView; + + let stream = match self.pads.lock().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return false; + } + Some(stream) => stream.clone(), + }; + + gst_log!(CAT, obj: pad, "Handling query {:?}", query); + match query.view_mut() { + QueryView::Scheduling(ref mut q) => { + let mut new_query = gst::Query::new_scheduling(); + let res = stream.sinkpad.peer_query(&mut new_query); + if !res { + return res; + } + + gst_log!(CAT, obj: pad, "Downstream returned {:?}", new_query); + + let (flags, min, max, align) = new_query.get_result(); + q.set(flags, min, max, align); + q.add_scheduling_modes( + &new_query + .get_scheduling_modes() + .iter() + .cloned() + .filter(|m| m != &gst::PadMode::Pull) + .collect::<Vec<_>>(), + ); + gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query()); + true + } + QueryView::Seeking(ref mut q) => { + // Seeking is not possible here + let format = q.get_format(); + q.set( + false, + gst::GenericFormattedValue::new(format, -1), + gst::GenericFormattedValue::new(format, -1), + ); + + gst_log!(CAT, obj: pad, "Returning {:?}", q.get_mut_query()); + true + } + // Position and duration is always the current recording position + QueryView::Position(ref mut q) => { + if q.get_format() == gst::Format::Time { + let state = stream.state.lock(); + let rec_state = self.state.lock(); + let mut recording_duration = rec_state.recording_duration; + if rec_state.recording_state == RecordingState::Recording + || rec_state.recording_state == RecordingState::Stopping + { + recording_duration += + state.current_running_time - rec_state.last_recording_start; + } + q.set(recording_duration); + true + } else { + false + } + } + QueryView::Duration(ref mut q) => { + if q.get_format() == gst::Format::Time { + let state = stream.state.lock(); + let rec_state = self.state.lock(); + let mut recording_duration = rec_state.recording_duration; + if rec_state.recording_state == RecordingState::Recording + || rec_state.recording_state == RecordingState::Stopping + { + recording_duration += + state.current_running_time - rec_state.last_recording_start; + } + q.set(recording_duration); + true + } else { + false + } + } + _ => { + gst_log!(CAT, obj: pad, "Forwarding query {:?}", query); + stream.sinkpad.peer_query(query) + } + } + } + + fn iterate_internal_links( + &self, + pad: &gst::Pad, + element: &gst::Element, + ) -> gst::Iterator<gst::Pad> { + let stream = match self.pads.lock().get(pad) { + None => { + gst_element_error!( + element, + gst::CoreError::Pad, + ["Unknown pad {:?}", pad.get_name()] + ); + return gst::Iterator::from_vec(vec![]); + } + Some(stream) => stream.clone(), + }; + + if pad == &stream.srcpad { + gst::Iterator::from_vec(vec![stream.sinkpad]) + } else { + gst::Iterator::from_vec(vec![stream.srcpad]) + } + } +} + +impl ObjectSubclass for ToggleRecord { + const NAME: &'static str = "RsToggleRecord"; + type ParentType = gst::Element; + type Instance = gst::subclass::ElementInstanceStruct<Self>; + type Class = subclass::simple::ClassStruct<Self>; + + glib_object_subclass!(); + + fn new_with_class(klass: &subclass::simple::ClassStruct<Self>) -> Self { + let templ = klass.get_pad_template("sink").unwrap(); + let sinkpad = gst::Pad::new_from_template(&templ, Some("sink")); + let templ = klass.get_pad_template("src").unwrap(); + let srcpad = gst::Pad::new_from_template(&templ, Some("src")); + + ToggleRecord::set_pad_functions(&sinkpad, &srcpad); + + let main_stream = Stream::new(sinkpad, srcpad); + + let mut pads = HashMap::new(); + pads.insert(main_stream.sinkpad.clone(), main_stream.clone()); + pads.insert(main_stream.srcpad.clone(), main_stream.clone()); + + Self { + settings: Mutex::new(Settings::default()), + state: Mutex::new(State::default()), + main_stream, + main_stream_cond: Condvar::new(), + other_streams: Mutex::new((Vec::new(), 0)), + pads: Mutex::new(pads), + } + } + + fn class_init(klass: &mut subclass::simple::ClassStruct<Self>) { + klass.install_properties(&PROPERTIES); + + klass.set_metadata( + "Toggle Record", + "Generic", + "Valve that ensures multiple streams start/end at the same time", + "Sebastian Dröge <sebastian@centricular.com>", + ); + + let caps = gst::Caps::new_any(); + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + + let src_pad_template = gst::PadTemplate::new( + "src_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ) + .unwrap(); + klass.add_pad_template(src_pad_template); + + let sink_pad_template = gst::PadTemplate::new( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + ) + .unwrap(); + klass.add_pad_template(sink_pad_template); + } +} + +impl ObjectImpl for ToggleRecord { + glib_object_impl!(); + + fn set_property(&self, obj: &glib::Object, id: usize, value: &glib::Value) { + let prop = &PROPERTIES[id]; + let element = obj.downcast_ref::<gst::Element>().unwrap(); + + match *prop { + subclass::Property("record", ..) => { + let mut settings = self.settings.lock(); + let record = value.get_some().expect("type checked upstream"); + gst_debug!( + CAT, + obj: element, + "Setting record from {:?} to {:?}", + settings.record, + record + ); + settings.record = record; + } + _ => unimplemented!(), + } + } + + fn get_property(&self, _obj: &glib::Object, id: usize) -> Result<glib::Value, ()> { + let prop = &PROPERTIES[id]; + + match *prop { + subclass::Property("record", ..) => { + let settings = self.settings.lock(); + Ok(settings.record.to_value()) + } + subclass::Property("recording", ..) => { + let rec_state = self.state.lock(); + Ok((rec_state.recording_state == RecordingState::Recording).to_value()) + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &glib::Object) { + self.parent_constructed(obj); + + let element = obj.downcast_ref::<gst::Element>().unwrap(); + element.add_pad(&self.main_stream.sinkpad).unwrap(); + element.add_pad(&self.main_stream.srcpad).unwrap(); + } +} + +impl ElementImpl for ToggleRecord { + fn change_state( + &self, + element: &gst::Element, + transition: gst::StateChange, + ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused => { + for s in self + .other_streams + .lock() + .0 + .iter() + .chain(iter::once(&self.main_stream)) + { + let mut state = s.state.lock(); + *state = StreamState::default(); + } + + let mut rec_state = self.state.lock(); + *rec_state = State::default(); + } + gst::StateChange::PausedToReady => { + for s in &self.other_streams.lock().0 { + let mut state = s.state.lock(); + state.flushing = true; + } + + let mut state = self.main_stream.state.lock(); + state.flushing = true; + self.main_stream_cond.notify_all(); + } + _ => (), + } + + let success = self.parent_change_state(element, transition)?; + + if transition == gst::StateChange::PausedToReady { + for s in self + .other_streams + .lock() + .0 + .iter() + .chain(iter::once(&self.main_stream)) + { + let mut state = s.state.lock(); + + state.pending_events.clear(); + } + + let mut rec_state = self.state.lock(); + *rec_state = State::default(); + drop(rec_state); + element.notify("recording"); + } + + Ok(success) + } + + fn request_new_pad( + &self, + element: &gst::Element, + _templ: &gst::PadTemplate, + _name: Option<String>, + _caps: Option<&gst::Caps>, + ) -> Option<gst::Pad> { + let mut other_streams_guard = self.other_streams.lock(); + let (ref mut other_streams, ref mut pad_count) = *other_streams_guard; + let mut pads = self.pads.lock(); + + let id = *pad_count; + *pad_count += 1; + + let templ = element.get_pad_template("sink_%u").unwrap(); + let sinkpad = gst::Pad::new_from_template(&templ, Some(format!("sink_{}", id).as_str())); + + let templ = element.get_pad_template("src_%u").unwrap(); + let srcpad = gst::Pad::new_from_template(&templ, Some(format!("src_{}", id).as_str())); + + ToggleRecord::set_pad_functions(&sinkpad, &srcpad); + + sinkpad.set_active(true).unwrap(); + srcpad.set_active(true).unwrap(); + + let stream = Stream::new(sinkpad.clone(), srcpad.clone()); + + pads.insert(stream.sinkpad.clone(), stream.clone()); + pads.insert(stream.srcpad.clone(), stream.clone()); + + other_streams.push(stream); + + drop(pads); + drop(other_streams_guard); + + element.add_pad(&sinkpad).unwrap(); + element.add_pad(&srcpad).unwrap(); + + Some(sinkpad) + } + + fn release_pad(&self, element: &gst::Element, pad: &gst::Pad) { + let mut other_streams_guard = self.other_streams.lock(); + let (ref mut other_streams, _) = *other_streams_guard; + let mut pads = self.pads.lock(); + + let stream = match pads.get(pad) { + None => return, + Some(stream) => stream.clone(), + }; + + stream.srcpad.set_active(false).unwrap(); + stream.sinkpad.set_active(false).unwrap(); + + pads.remove(&stream.sinkpad).unwrap(); + pads.remove(&stream.srcpad).unwrap(); + + // TODO: Replace with Vec::remove_item() once stable + let pos = other_streams.iter().position(|x| *x == stream); + pos.map(|pos| other_streams.swap_remove(pos)); + + drop(pads); + drop(other_streams_guard); + + element.remove_pad(&stream.sinkpad).unwrap(); + element.remove_pad(&stream.srcpad).unwrap(); + } +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "togglerecord", + gst::Rank::None, + ToggleRecord::get_type(), + ) +} diff --git a/utils/gst-plugin-togglerecord/tests/tests.rs b/utils/gst-plugin-togglerecord/tests/tests.rs new file mode 100644 index 000000000..c0eaf6745 --- /dev/null +++ b/utils/gst-plugin-togglerecord/tests/tests.rs @@ -0,0 +1,1173 @@ +// Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com> +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Library General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Library General Public License for more details. +// +// You should have received a copy of the GNU Library General Public +// License along with this library; if not, write to the +// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, +// Boston, MA 02110-1335, USA. + +extern crate glib; +use glib::prelude::*; + +extern crate gstreamer as gst; +use gst::prelude::*; + +extern crate either; +use either::*; + +use std::sync::{mpsc, Mutex}; +use std::thread; + +extern crate gsttogglerecord; + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gsttogglerecord::plugin_register_static().expect("gsttogglerecord tests"); + }); +} + +enum SendData { + Buffers(usize), + BuffersDelta(usize), + Gaps(usize), + Eos, +} + +#[allow(clippy::type_complexity)] +fn setup_sender_receiver( + pipeline: &gst::Pipeline, + togglerecord: &gst::Element, + pad: &str, + offset: gst::ClockTime, +) -> ( + mpsc::Sender<SendData>, + mpsc::Receiver<()>, + mpsc::Receiver<Either<gst::Buffer, gst::Event>>, + thread::JoinHandle<()>, +) { + let fakesink = gst::ElementFactory::make("fakesink", None).unwrap(); + fakesink.set_property("async", &false).unwrap(); + pipeline.add(&fakesink).unwrap(); + + let main_stream = pad == "src"; + + let (srcpad, sinkpad) = if main_stream { + ( + togglerecord.get_static_pad("src").unwrap(), + togglerecord.get_static_pad("sink").unwrap(), + ) + } else { + let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap(); + let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap(); + (srcpad, sinkpad) + }; + + let fakesink_sinkpad = fakesink.get_static_pad("sink").unwrap(); + srcpad.link(&fakesink_sinkpad).unwrap(); + + let (sender_output, receiver_output) = mpsc::channel::<Either<gst::Buffer, gst::Event>>(); + let sender_output = Mutex::new(sender_output); + srcpad.add_probe( + gst::PadProbeType::BUFFER | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_, ref probe_info| { + match probe_info.data { + Some(gst::PadProbeData::Buffer(ref buffer)) => { + sender_output + .lock() + .unwrap() + .send(Left(buffer.clone())) + .unwrap(); + } + Some(gst::PadProbeData::Event(ref event)) => { + sender_output + .lock() + .unwrap() + .send(Right(event.clone())) + .unwrap(); + } + _ => { + unreachable!(); + } + } + + gst::PadProbeReturn::Ok + }, + ); + + let (sender_input, receiver_input) = mpsc::channel::<SendData>(); + let (sender_input_done, receiver_input_done) = mpsc::channel::<()>(); + let thread = thread::spawn(move || { + let mut i = 0; + let mut first = true; + while let Ok(send_data) = receiver_input.recv() { + if first { + assert!(sinkpad.send_event(gst::Event::new_stream_start("test").build())); + let caps = if main_stream { + gst::Caps::builder("video/x-raw") + .field("format", &"ARGB") + .field("width", &320i32) + .field("height", &240i32) + .field("framerate", &gst::Fraction::new(50, 1)) + .build() + } else { + gst::Caps::builder("audio/x-raw") + .field("format", &"U8") + .field("layout", &"interleaved") + .field("rate", &8000i32) + .field("channels", &1i32) + .build() + }; + assert!(sinkpad.send_event(gst::Event::new_caps(&caps).build())); + + let segment = gst::FormattedSegment::<gst::ClockTime>::new(); + assert!(sinkpad.send_event(gst::Event::new_segment(&segment).build())); + + let mut tags = gst::TagList::new(); + tags.get_mut() + .unwrap() + .add::<gst::tags::Title>(&"some title", gst::TagMergeMode::Append); + assert!(sinkpad.send_event(gst::Event::new_tag(tags).build())); + + first = false; + } + + let buffer = if main_stream { + gst::Buffer::with_size(320 * 240 * 4).unwrap() + } else { + gst::Buffer::with_size(160).unwrap() + }; + + match send_data { + SendData::Eos => { + break; + } + SendData::Buffers(n) => { + for _ in 0..n { + let mut buffer = buffer.clone(); + { + let buffer = buffer.make_mut(); + buffer.set_pts(offset + i * 20 * gst::MSECOND); + buffer.set_duration(20 * gst::MSECOND); + } + let _ = sinkpad.chain(buffer); + i += 1; + } + } + SendData::BuffersDelta(n) => { + for _ in 0..n { + let mut buffer = gst::Buffer::new(); + buffer + .get_mut() + .unwrap() + .set_pts(offset + i * 20 * gst::MSECOND); + buffer.get_mut().unwrap().set_duration(20 * gst::MSECOND); + buffer + .get_mut() + .unwrap() + .set_flags(gst::BufferFlags::DELTA_UNIT); + let _ = sinkpad.chain(buffer); + i += 1; + } + } + SendData::Gaps(n) => { + for _ in 0..n { + let event = + gst::Event::new_gap(offset + i * 20 * gst::MSECOND, 20 * gst::MSECOND) + .build(); + let _ = sinkpad.send_event(event); + i += 1; + } + } + } + + let _ = sender_input_done.send(()); + } + + let _ = sinkpad.send_event(gst::Event::new_eos().build()); + let _ = sender_input_done.send(()); + }); + + (sender_input, receiver_input_done, receiver_output, thread) +} + +fn recv_buffers( + receiver_output: &mpsc::Receiver<Either<gst::Buffer, gst::Event>>, + segment: &mut gst::FormattedSegment<gst::ClockTime>, + wait_buffers: usize, +) -> Vec<(gst::ClockTime, gst::ClockTime, gst::ClockTime)> { + let mut res = Vec::new(); + let mut n_buffers = 0; + while let Ok(val) = receiver_output.recv() { + match val { + Left(buffer) => { + res.push(( + segment.to_running_time(buffer.get_pts()), + buffer.get_pts(), + buffer.get_duration(), + )); + n_buffers += 1; + if wait_buffers > 0 && n_buffers == wait_buffers { + return res; + } + } + Right(event) => { + use gst::EventView; + + match event.view() { + EventView::Gap(ref e) => { + let (ts, duration) = e.get(); + + res.push((segment.to_running_time(ts), ts, duration)); + n_buffers += 1; + if wait_buffers > 0 && n_buffers == wait_buffers { + return res; + } + } + EventView::Eos(..) => { + return res; + } + EventView::Segment(ref e) => { + *segment = e.get_segment().clone().downcast().unwrap(); + } + _ => (), + } + } + } + } + + res +} + +#[test] +fn test_create() { + init(); + assert!(gst::ElementFactory::make("togglerecord", None).is_ok()); +} + +#[test] +fn test_create_pads() { + init(); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + + let sinkpad = togglerecord.get_request_pad("sink_%u").unwrap(); + let srcpad = sinkpad.iterate_internal_links().next().unwrap().unwrap(); + + assert_eq!(sinkpad.get_name(), "sink_0"); + assert_eq!(srcpad.get_name(), "src_0"); + + togglerecord.release_request_pad(&sinkpad); + assert!(sinkpad.get_parent().is_none()); + assert!(srcpad.get_parent().is_none()); +} + +#[test] +fn test_one_stream_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, _, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_one_stream_gaps_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, _, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(5)).unwrap(); + sender_input.send(SendData::Gaps(5)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_one_stream_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, receiver_input_done, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_one_stream_open_close() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, receiver_input_done, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &false).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 10); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_one_stream_open_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input, receiver_input_done, receiver_output, thread) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &false).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + receiver_input_done.recv().unwrap(); + togglerecord.set_property("record", &true).unwrap(); + sender_input.send(SendData::Buffers(10)).unwrap(); + drop(sender_input); + + let mut segment = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers = recv_buffers(&receiver_output, &mut segment, 0); + assert_eq!(buffers.len(), 20); + for (index, &(running_time, pts, duration)) in buffers.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + + thread.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_open_shift() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 5 * gst::MSECOND); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Second to last buffer should be clipped from second stream, last should be dropped + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + if index == 9 { + assert_eq!(duration, 15 * gst::MSECOND); + } else { + assert_eq!(duration, 20 * gst::MSECOND); + } + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_open_shift_main() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 5 * gst::MSECOND); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(12)).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_1.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // PTS 5 maps to running time 0 now + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // First and second last buffer should be clipped from second stream, + // last buffer should be dropped + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + if index == 0 { + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, 5 * gst::MSECOND + index * 20 * gst::MSECOND); + assert_eq!(duration, 15 * gst::MSECOND); + } else if index == 10 { + assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 5 * gst::MSECOND); + } else { + assert_eq!(running_time, index * 20 * gst::MSECOND - 5 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + } + assert_eq!(buffers_2.len(), 11); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_open_close() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &false).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Start recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (10 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_open_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another buffer to sender 2, this will block until sender 1 advances + // but must not be dropped, although we're not recording (yet) + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 20); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 20); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_open_close_open_gaps() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(3)).unwrap(); + sender_input_1.send(SendData::Gaps(3)).unwrap(); + sender_input_1.send(SendData::Buffers(4)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 4 gaps and 5 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Gaps(4)).unwrap(); + sender_input_2.send(SendData::Buffers(5)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another gap to sender 2, this will block until sender 1 advances + // but must not be dropped, although we're not recording (yet) + sender_input_2.send(SendData::Gaps(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 20); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 20); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_two_stream_close_open_close_delta() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &false).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1 is finished + receiver_input_done_1.recv().unwrap(); + + // Start recording and push new buffers to sender 1. The first one is a delta frame, + // so will be dropped, and as such the next frame of sender 2 will also be dropped + // Sender 2 is empty now + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::BuffersDelta(1)).unwrap(); + sender_input_1.send(SendData::Buffers(9)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, both are the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Wait until all 20 buffers of both senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another buffer to sender 2, this will block until sender 1 advances + // but must not be dropped, and we're still recording + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Stop recording again and send another set of buffers to both senders + // The first one is a delta frame, so we only actually stop recording + // after recording another frame + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::BuffersDelta(1)).unwrap(); + sender_input_1.send(SendData::Buffers(9)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 10); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, (11 + index) * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 10); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} + +#[test] +fn test_three_stream_open_close_open() { + init(); + + let pipeline = gst::Pipeline::new(None); + let togglerecord = gst::ElementFactory::make("togglerecord", None).unwrap(); + pipeline.add(&togglerecord).unwrap(); + + let (sender_input_1, receiver_input_done_1, receiver_output_1, thread_1) = + setup_sender_receiver(&pipeline, &togglerecord, "src", 0.into()); + let (sender_input_2, receiver_input_done_2, receiver_output_2, thread_2) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + let (sender_input_3, receiver_input_done_3, receiver_output_3, thread_3) = + setup_sender_receiver(&pipeline, &togglerecord, "src_%u", 0.into()); + + pipeline.set_state(gst::State::Playing).unwrap(); + + togglerecord.set_property("record", &true).unwrap(); + + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(11)).unwrap(); + sender_input_3.send(SendData::Buffers(10)).unwrap(); + + // Sender 2 is waiting for sender 1 to continue, sender 1/3 are finished + receiver_input_done_1.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Stop recording and push new buffers to sender 1, which will advance + // it and release the 11th buffer of sender 2 above + togglerecord.set_property("record", &false).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + receiver_input_done_2.recv().unwrap(); + + // Send another 9 buffers to sender 2, 1/2 are at the same position now + sender_input_2.send(SendData::Buffers(9)).unwrap(); + + // Send the remaining 10 buffers to sender 3, all are at the same position now + sender_input_3.send(SendData::Buffers(10)).unwrap(); + + // Wait until all 20 buffers of all senders are done + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Send another buffer to sender 2, this will block until sender 1 advances + // but must not be dropped, although we're not recording (yet) + sender_input_2.send(SendData::Buffers(1)).unwrap(); + + // Start recording again and send another set of buffers to both senders + togglerecord.set_property("record", &true).unwrap(); + sender_input_1.send(SendData::Buffers(10)).unwrap(); + sender_input_2.send(SendData::Buffers(10)).unwrap(); + sender_input_3.send(SendData::Buffers(5)).unwrap(); + receiver_input_done_1.recv().unwrap(); + // The single buffer above for sender 1 should be handled now + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + sender_input_3.send(SendData::Buffers(5)).unwrap(); + receiver_input_done_3.recv().unwrap(); + + // Send EOS and wait for it to be handled + sender_input_1.send(SendData::Eos).unwrap(); + sender_input_2.send(SendData::Eos).unwrap(); + sender_input_3.send(SendData::Eos).unwrap(); + receiver_input_done_1.recv().unwrap(); + receiver_input_done_2.recv().unwrap(); + receiver_input_done_3.recv().unwrap(); + + let mut segment_1 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_1 = recv_buffers(&receiver_output_1, &mut segment_1, 0); + for (index, &(running_time, pts, duration)) in buffers_1.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_1.len(), 20); + + // Last buffer should be dropped from second stream + let mut segment_2 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_2 = recv_buffers(&receiver_output_2, &mut segment_2, 0); + for (index, &(running_time, pts, duration)) in buffers_2.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_2.len(), 20); + + let mut segment_3 = gst::FormattedSegment::<gst::ClockTime>::new(); + let buffers_3 = recv_buffers(&receiver_output_3, &mut segment_3, 0); + for (index, &(running_time, pts, duration)) in buffers_3.iter().enumerate() { + let pts_off = if index >= 10 { + 10 * 20 * gst::MSECOND + } else { + 0.into() + }; + + let index = index as u64; + assert_eq!(running_time, index * 20 * gst::MSECOND); + assert_eq!(pts, pts_off + index * 20 * gst::MSECOND); + assert_eq!(duration, 20 * gst::MSECOND); + } + assert_eq!(buffers_3.len(), 20); + + thread_1.join().unwrap(); + thread_2.join().unwrap(); + thread_3.join().unwrap(); + + pipeline.set_state(gst::State::Null).unwrap(); +} |