Welcome to mirror list, hosted at ThFree Co, Russian Federation.

line_reader.rs « src « closedcaption « video - gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 1d64d128f3a40817518b4d2397d48be3c841a68e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Library General Public
// License as published by the Free Software Foundation; either
// version 2 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Library General Public License for more details.
//
// You should have received a copy of the GNU Library General Public
// License along with this library; if not, write to the
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA.

use std::collections::VecDeque;

#[derive(Debug)]
pub struct LineReader<T: AsRef<[u8]>> {
    queue: VecDeque<T>,
    // Read position into the queue in bytes
    read_pos: usize,
    // Offset into queue where we have to look for a newline
    // All previous items don't contain a newline
    search_pos: usize,
    buf: Vec<u8>,
}

impl<T: AsRef<[u8]>> LineReader<T> {
    pub fn new() -> LineReader<T> {
        Self {
            queue: VecDeque::new(),
            read_pos: 0,
            search_pos: 0,
            buf: Vec::new(),
        }
    }

    pub fn push(&mut self, b: T) {
        self.queue.push_back(b);
    }

    /// Drops everything from the internal queue that was previously returned, i.e.
    /// if previously a line was returned we drop this whole line so that we can
    /// proceed with the next line
    fn drop_previous_line(&mut self) {
        // Drop everything we read the last time now
        while self.read_pos > 0
            && self.read_pos >= self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0)
        {
            self.read_pos -= self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0);
            self.queue.pop_front();
            if self.search_pos > 0 {
                self.search_pos -= 1;
            }
        }

        self.buf.clear();
    }

    #[allow(unused)]
    pub fn line_or_drain(&mut self) -> Option<&[u8]> {
        self.line_with_drain(true)
    }

    #[allow(unused)]
    pub fn line(&mut self) -> Option<&[u8]> {
        self.line_with_drain(false)
    }

    /// Searches the first '\n' in the currently queued buffers and returns the index in buffers
    /// inside the queue and the index in bytes from the beginning of the queue, or None.
    ///
    /// Also updates the search_pos so that we don't look again in the previous buffers on the next
    /// call
    fn find_newline(&mut self) -> Option<(usize, usize)> {
        let mut offset = 0;
        for (idx, buf) in self.queue.iter().enumerate() {
            let buf = buf.as_ref();

            // Fast skip-ahead
            if idx < self.search_pos {
                offset += buf.len();
                continue;
            }

            let pos = buf
                .iter()
                .enumerate()
                .skip(if idx == 0 { self.read_pos } else { 0 })
                .find(|(_, b)| **b == b'\n')
                .map(|(idx, _)| idx);

            if let Some(pos) = pos {
                // On the next call we have to search in this buffer again
                // as it might contain a second newline
                self.search_pos = idx;
                return Some((idx, offset + pos + 1));
            }

            // This buffer did not contain a newline so we don't have to look
            // in it again next time
            self.search_pos = idx + 1;
            offset += buf.len();
        }

        None
    }

    /// Copies length bytes from all buffers from the beginning until last_idx into our internal
    /// buffer, and skips the first offset bytes from the first buffer.
    fn copy_into_internal_buffer(&mut self, last_idx: usize, offset: usize, len: usize) {
        // Reserve space for the whole line beforehand
        if self.buf.capacity() < len {
            self.buf.reserve(len - self.buf.capacity());
        }

        // Then iterate over all buffers inside the queue until the one that contains
        // the newline character
        for (idx, buf) in self.queue.iter().enumerate().take(last_idx + 1) {
            let buf = buf.as_ref();

            // Calculate how much data we still have to copy
            let rem = len - self.buf.len();
            assert!(rem > 0);

            // For the first index we need to take into account the offset. The first
            // bytes might have to be skipped and as such we have fewer bytes available
            // than the whole length of the buffer
            let buf_len = if idx == 0 {
                assert!(offset < buf.len());
                buf.len() - offset
            } else {
                buf.len()
            };

            // Calculate how much we can copy from this buffer. At most the size of the buffer
            // itself, but never more than the amount we still have to copy overall
            let copy_len = if rem > buf_len { buf_len } else { rem };
            assert!(copy_len > 0);

            if idx == 0 {
                self.buf
                    .extend_from_slice(&buf[offset..(offset + copy_len)]);
            } else {
                self.buf.extend_from_slice(&buf[..copy_len]);
            }
        }

        assert_eq!(self.buf.len(), len);
    }

    pub fn line_with_drain(&mut self, drain: bool) -> Option<&[u8]> {
        // Drop all data from the previous line
        self.drop_previous_line();

        // read_pos must always be inside the first buffer of our queue here
        // or otherwise we went into an inconsistent state: the first buffer(s)
        // would've had to be dropped above then as they are not relevant anymore
        assert!(
            self.read_pos == 0
                || self.read_pos < self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0)
        );

        // Find the next newline character from our previous position
        if let Some((idx, pos)) = self.find_newline() {
            // We now need to copy everything from the old read_pos to the new
            // pos, and on the next call we have to start from the new pos
            let old_read_pos = self.read_pos;
            self.read_pos = pos;

            assert!(self.read_pos > old_read_pos);
            assert!(idx < self.queue.len());

            // If the newline is found in the first buffer in our queue, we can directly return
            // the slice from it without doing any copying.
            //
            // On average this should be the most common case.
            if idx == 0 {
                let buf = self.queue.front().unwrap().as_ref();
                return Some(&buf[old_read_pos..self.read_pos]);
            } else {
                // Copy into our internal buffer as the current line spans multiple buffers
                let len = self.read_pos - old_read_pos;
                self.copy_into_internal_buffer(idx, old_read_pos, len);

                return Some(&self.buf[0..len]);
            }
        }

        // No newline found above and we're not draining, so let's wait until
        // more data is available that might contain a newline character
        if !drain {
            return None;
        }

        if self.queue.is_empty() {
            return None;
        }

        // When draining and we only have a single buffer in the queue we can
        // directly return a slice into it
        if self.queue.len() == 1 {
            let res = &self.queue.front().unwrap().as_ref()[self.read_pos..];
            self.read_pos += res.len();
            self.search_pos = 1;
            return Some(res);
        }

        // Otherwise we have to copy everything that is remaining into our
        // internal buffer and then return a slice from that
        let len = self.queue.iter().map(|v| v.as_ref().len()).sum();
        if self.buf.capacity() < len {
            self.buf.reserve(len - self.buf.capacity());
        }

        for (idx, ref v) in self.queue.iter().enumerate() {
            if idx == 0 {
                self.buf.extend_from_slice(&v.as_ref()[self.read_pos..]);
            } else {
                self.buf.extend_from_slice(v.as_ref());
            }
        }

        self.read_pos += self.buf.len();
        self.search_pos = self.queue.len();

        Some(self.buf.as_ref())
    }

    pub fn clear(&mut self) {
        self.queue.clear();
        self.read_pos = 0;
        self.search_pos = 0;
        self.buf.clear();
    }
}

#[cfg(test)]
mod tests {
    use super::LineReader;

    #[test]
    fn test_single_buffer() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\nefgh\nijkl\n".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
        assert_eq!(r.line(), None);
    }

    #[test]
    fn test_empty_line() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\nefgh\n\nijkl\n".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), Some(b"\n".as_ref()));
        assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
        assert_eq!(r.line(), None);
    }

    #[test]
    fn test_multi_buffer_split() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\nef".as_ref()));
        r.push(Vec::from(b"gh\nijkl\n".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
        assert_eq!(r.line(), None);
    }

    #[test]
    fn test_multi_buffer_split_2() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\ne".as_ref()));
        r.push(Vec::from(b"f".as_ref()));
        r.push(Vec::from(b"g".as_ref()));
        r.push(Vec::from(b"h\nijkl\n".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
        assert_eq!(r.line(), None);
    }

    #[test]
    fn test_single_buffer_drain() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\nefgh\nijkl".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), None);
        assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
        assert_eq!(r.line_or_drain(), None);
    }

    #[test]
    fn test_single_buffer_drain_multi_line() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\nefgh\n".as_ref()));
        r.push(Vec::from(b"ijkl".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), None);
        assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
        assert_eq!(r.line_or_drain(), None);
    }

    #[test]
    fn test_single_buffer_drain_multi_line_2() {
        let mut r = LineReader::new();
        r.push(Vec::from(b"abcd\nefgh\ni".as_ref()));
        r.push(Vec::from(b"j".as_ref()));
        r.push(Vec::from(b"k".as_ref()));
        r.push(Vec::from(b"l".as_ref()));

        assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
        assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
        assert_eq!(r.line(), None);
        assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
        assert_eq!(r.line_or_drain(), None);
    }
}