Welcome to mirror list, hosted at ThFree Co, Russian Federation.

dash_vod.rs « examples « fmp4 « mux - github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 71938f4d372df703aebc09c3c47236e32244f3d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// Copyright (C) 2021 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0

// This creates a VoD DASH manifest based on the output of `cmafmux`. The media header
// ("initialization segment") is written into a separate file as the segments, and each segment is
// its own file too.
//
// All segments that are created are exactly 10s, expect for the last one which is only 3.333s.

use gst::prelude::*;

use std::path::PathBuf;
use std::sync::{Arc, Mutex};

use anyhow::Error;

struct Segment {
    start_time: gst::ClockTime,
    duration: gst::ClockTime,
}

struct State {
    start_time: Option<gst::ClockTime>,
    end_time: Option<gst::ClockTime>,
    segments: Vec<Segment>,
    path: PathBuf,
}

fn main() -> Result<(), Error> {
    gst::init()?;

    gstfmp4::plugin_register_static()?;

    let state = Arc::new(Mutex::new(State {
        start_time: None,
        end_time: None,
        segments: Vec::new(),
        path: PathBuf::from("dash_stream"),
    }));

    let pipeline = gst::parse_launch("videotestsrc num-buffers=2500 ! timecodestamper ! video/x-raw,format=I420,width=1280,height=720,framerate=30/1 ! timeoverlay ! x264enc bframes=0 bitrate=2048 ! video/x-h264,profile=main ! cmafmux fragment-duration=10000000000 header-update-mode=update write-mehd=true ! appsink name=sink").unwrap().downcast::<gst::Pipeline>().unwrap();

    let sink = pipeline
        .by_name("sink")
        .unwrap()
        .dynamic_cast::<gst_app::AppSink>()
        .unwrap();
    sink.set_buffer_list(true);

    let state_clone = state.clone();
    sink.set_callbacks(
        gst_app::AppSinkCallbacks::builder()
            .new_sample(move |sink| {
                let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
                let mut state = state.lock().unwrap();

                // The muxer only outputs non-empty buffer lists
                let mut buffer_list = sample.buffer_list_owned().expect("no buffer list");
                assert!(!buffer_list.is_empty());

                let mut first = buffer_list.get(0).unwrap();

                // Each list contains a full segment, i.e. does not start with a DELTA_UNIT
                assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT));

                // If the buffer has the DISCONT and HEADER flag set then it contains the media
                // header, i.e. the `ftyp`, `moov` and other media boxes.
                //
                // This might be the initial header or the updated header at the end of the stream.
                if first
                    .flags()
                    .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
                {
                    let mut path = state.path.clone();
                    std::fs::create_dir_all(&path).expect("failed to create directory");
                    path.push("init.cmfi");

                    println!("writing header to {}", path.display());
                    let map = first.map_readable().unwrap();
                    std::fs::write(path, &map).expect("failed to write header");
                    drop(map);

                    // Remove the header from the buffer list
                    buffer_list.make_mut().remove(0, 1);

                    // If the list is now empty then it only contained the media header and nothing
                    // else.
                    if buffer_list.is_empty() {
                        return Ok(gst::FlowSuccess::Ok);
                    }

                    // Otherwise get the next buffer and continue working with that.
                    first = buffer_list.get(0).unwrap();
                }

                // If the buffer only has the HEADER flag set then this is a segment header that is
                // followed by one or more actual media buffers.
                assert!(first.flags().contains(gst::BufferFlags::HEADER));

                let segment = sample
                    .segment()
                    .expect("no segment")
                    .downcast_ref::<gst::ClockTime>()
                    .expect("no time segment");

                // Initialize the start time with the first PTS we observed. This will be used
                // later for calculating the duration of the whole media for the DASH manifest.
                //
                // The PTS of the segment header is equivalent to the earliest PTS of the whole
                // segment.
                let pts = segment
                    .to_running_time(first.pts().unwrap())
                    .expect("can't get running time");
                if state.start_time.is_none() {
                    state.start_time = Some(pts);
                }

                // The metadata of the first media buffer is duplicated to the segment header.
                // Based on this we can know the timecode of the first frame in this segment.
                let meta = first
                    .meta::<gst_video::VideoTimeCodeMeta>()
                    .expect("no timecode meta");

                let mut path = state.path.clone();
                path.push(format!("segment_{}.cmfv", state.segments.len() + 1));
                println!(
                    "writing segment with timecode {} to {}",
                    meta.tc(),
                    path.display()
                );

                // Calculate the end time at this point. The duration of the segment header is set
                // to the whole duration of this segment.
                let duration = first.duration().unwrap();
                let end_time = first.pts().unwrap() + first.duration().unwrap();
                state.end_time = Some(
                    segment
                        .to_running_time(end_time)
                        .expect("can't get running time"),
                );

                let mut file = std::fs::File::create(path).expect("failed to open fragment");
                for buffer in &*buffer_list {
                    use std::io::prelude::*;

                    let map = buffer.map_readable().unwrap();
                    file.write_all(&map).expect("failed to write fragment");
                }

                state.segments.push(Segment {
                    start_time: pts,
                    duration,
                });

                Ok(gst::FlowSuccess::Ok)
            })
            .eos(move |_sink| {
                let state = state_clone.lock().unwrap();

                // Now write the manifest
                let mut path = state.path.clone();
                path.push("manifest.mpd");

                println!("writing manifest to {}", path.display());

                let duration = state
                    .end_time
                    .opt_checked_sub(state.start_time)
                    .ok()
                    .flatten()
                    .unwrap()
                    .mseconds();

                // Write the whole segment timeline out here, compressing multiple segments with
                // the same duration to a repeated segment.
                let mut segments = vec![];
                let mut write_segment =
                    |start: gst::ClockTime, duration: gst::ClockTime, repeat: usize| {
                        let mut s = dash_mpd::S {
                            t: Some(start.mseconds() as i64),
                            d: duration.mseconds() as i64,
                            ..Default::default()
                        };
                        if repeat > 0 {
                            s.r = Some(repeat as i64);
                        }

                        segments.push(s);
                    };

                let mut start = None;
                let mut num_segments = 0;
                let mut last_duration = None;
                for segment in &state.segments {
                    if start.is_none() {
                        start = Some(segment.start_time);
                    }
                    if last_duration.is_none() {
                        last_duration = Some(segment.duration);
                    }

                    // If the duration of this segment is different from the previous one then we
                    // have to write out the segment now.
                    if last_duration != Some(segment.duration) {
                        write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
                        start = Some(segment.start_time);
                        last_duration = Some(segment.duration);
                        num_segments = 1;
                    } else {
                        num_segments += 1;
                    }
                }

                // Write the last segment if any
                if num_segments > 0 {
                    write_segment(start.unwrap(), last_duration.unwrap(), num_segments - 1);
                }

                let segment_template = dash_mpd::SegmentTemplate {
                    timescale: Some(1000),
                    initialization: Some("init.cmfi".to_string()),
                    media: Some("segment_$Number$.cmfv".to_string()),
                    SegmentTimeline: Some(dash_mpd::SegmentTimeline { segments }),
                    ..Default::default()
                };

                let rep = dash_mpd::Representation {
                    id: Some("A".to_string()),
                    width: Some(1280),
                    height: Some(720),
                    bandwidth: Some(2048000),
                    SegmentTemplate: Some(segment_template),
                    ..Default::default()
                };

                let adapt = dash_mpd::AdaptationSet {
                    contentType: Some("video".to_string()),
                    mimeType: Some("video/mp4".to_string()),
                    codecs: Some("avc1.4d0228".to_string()),
                    frameRate: Some("30/1".to_string()),
                    segmentAlignment: Some(true),
                    subsegmentStartsWithSAP: Some(1),
                    representations: vec![rep],
                    ..Default::default()
                };

                let period = dash_mpd::Period {
                    adaptations: vec![adapt],
                    ..Default::default()
                };

                let mpd = dash_mpd::MPD {
                    mpdtype: Some("static".to_string()),
                    xmlns: Some("urn:mpeg:dash:schema:mpd:2011".to_string()),
                    schemaLocation: Some("urn:mpeg:dash:schema:mpd:2011 DASH-MPD.xsd".to_string()),
                    profiles: Some("urn:mpeg:dash:profile:isoff-on-demand:2011".to_string()),
                    periods: vec![period],
                    mediaPresentationDuration: Some(std::time::Duration::from_millis(duration)),
                    minBufferTime: Some(std::time::Duration::from_secs(1)),
                    ..Default::default()
                };

                use serde::ser::Serialize;

                let mut xml = String::new();
                let mut ser = quick_xml::se::Serializer::new(&mut xml);
                ser.indent(' ', 4);
                mpd.serialize(ser).unwrap();

                let manifest = format!(
                    r###"<?xml version="1.0" encoding="UTF-8"?>
{xml}
"###
                );

                std::fs::write(path, manifest).expect("failed to write manifest");
            })
            .build(),
    );

    pipeline.set_state(gst::State::Playing)?;

    let bus = pipeline
        .bus()
        .expect("Pipeline without bus. Shouldn't happen!");

    for msg in bus.iter_timed(gst::ClockTime::NONE) {
        use gst::MessageView;

        match msg.view() {
            MessageView::Eos(..) => {
                println!("EOS");
                break;
            }
            MessageView::Error(err) => {
                pipeline.set_state(gst::State::Null)?;
                eprintln!(
                    "Got error from {}: {} ({})",
                    msg.src()
                        .map(|s| String::from(s.path_string()))
                        .unwrap_or_else(|| "None".into()),
                    err.error(),
                    err.debug().unwrap_or_else(|| "".into()),
                );
                break;
            }
            _ => (),
        }
    }

    pipeline.set_state(gst::State::Null)?;

    Ok(())
}