Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/text/json
diff options
context:
space:
mode:
authorSebastian Dröge <sebastian@centricular.com>2021-09-22 16:38:58 +0300
committerSebastian Dröge <sebastian@centricular.com>2021-09-22 16:38:58 +0300
commit7708b65e97605da8ae5d5e7dd9dec9e8d746141c (patch)
tree23fea56872df6d183ba18d908f0e0ebf7ef4403d /text/json
parentafd736dfec029b9fdc43147bc735fc05401dfed4 (diff)
json: Replace line_reader.rs symlink with a copy for now
Windows doesn't handle symlinks very well. Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/166
Diffstat (limited to 'text/json')
-rw-r--r--[l---------]text/json/src/line_reader.rs334
1 files changed, 333 insertions, 1 deletions
diff --git a/text/json/src/line_reader.rs b/text/json/src/line_reader.rs
index 95af5864..1d64d128 120000..100644
--- a/text/json/src/line_reader.rs
+++ b/text/json/src/line_reader.rs
@@ -1 +1,333 @@
-../../../video/closedcaption/src/line_reader.rs \ No newline at end of file
+// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
+//
+// This library is free software; you can redistribute it and/or
+// modify it under the terms of the GNU Library General Public
+// License as published by the Free Software Foundation; either
+// version 2 of the License, or (at your option) any later version.
+//
+// This library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// Library General Public License for more details.
+//
+// You should have received a copy of the GNU Library General Public
+// License along with this library; if not, write to the
+// Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
+// Boston, MA 02110-1335, USA.
+
+use std::collections::VecDeque;
+
+#[derive(Debug)]
+pub struct LineReader<T: AsRef<[u8]>> {
+ queue: VecDeque<T>,
+ // Read position into the queue in bytes
+ read_pos: usize,
+ // Offset into queue where we have to look for a newline
+ // All previous items don't contain a newline
+ search_pos: usize,
+ buf: Vec<u8>,
+}
+
+impl<T: AsRef<[u8]>> LineReader<T> {
+ pub fn new() -> LineReader<T> {
+ Self {
+ queue: VecDeque::new(),
+ read_pos: 0,
+ search_pos: 0,
+ buf: Vec::new(),
+ }
+ }
+
+ pub fn push(&mut self, b: T) {
+ self.queue.push_back(b);
+ }
+
+ /// Drops everything from the internal queue that was previously returned, i.e.
+ /// if previously a line was returned we drop this whole line so that we can
+ /// proceed with the next line
+ fn drop_previous_line(&mut self) {
+ // Drop everything we read the last time now
+ while self.read_pos > 0
+ && self.read_pos >= self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0)
+ {
+ self.read_pos -= self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0);
+ self.queue.pop_front();
+ if self.search_pos > 0 {
+ self.search_pos -= 1;
+ }
+ }
+
+ self.buf.clear();
+ }
+
+ #[allow(unused)]
+ pub fn line_or_drain(&mut self) -> Option<&[u8]> {
+ self.line_with_drain(true)
+ }
+
+ #[allow(unused)]
+ pub fn line(&mut self) -> Option<&[u8]> {
+ self.line_with_drain(false)
+ }
+
+ /// Searches the first '\n' in the currently queued buffers and returns the index in buffers
+ /// inside the queue and the index in bytes from the beginning of the queue, or None.
+ ///
+ /// Also updates the search_pos so that we don't look again in the previous buffers on the next
+ /// call
+ fn find_newline(&mut self) -> Option<(usize, usize)> {
+ let mut offset = 0;
+ for (idx, buf) in self.queue.iter().enumerate() {
+ let buf = buf.as_ref();
+
+ // Fast skip-ahead
+ if idx < self.search_pos {
+ offset += buf.len();
+ continue;
+ }
+
+ let pos = buf
+ .iter()
+ .enumerate()
+ .skip(if idx == 0 { self.read_pos } else { 0 })
+ .find(|(_, b)| **b == b'\n')
+ .map(|(idx, _)| idx);
+
+ if let Some(pos) = pos {
+ // On the next call we have to search in this buffer again
+ // as it might contain a second newline
+ self.search_pos = idx;
+ return Some((idx, offset + pos + 1));
+ }
+
+ // This buffer did not contain a newline so we don't have to look
+ // in it again next time
+ self.search_pos = idx + 1;
+ offset += buf.len();
+ }
+
+ None
+ }
+
+ /// Copies length bytes from all buffers from the beginning until last_idx into our internal
+ /// buffer, and skips the first offset bytes from the first buffer.
+ fn copy_into_internal_buffer(&mut self, last_idx: usize, offset: usize, len: usize) {
+ // Reserve space for the whole line beforehand
+ if self.buf.capacity() < len {
+ self.buf.reserve(len - self.buf.capacity());
+ }
+
+ // Then iterate over all buffers inside the queue until the one that contains
+ // the newline character
+ for (idx, buf) in self.queue.iter().enumerate().take(last_idx + 1) {
+ let buf = buf.as_ref();
+
+ // Calculate how much data we still have to copy
+ let rem = len - self.buf.len();
+ assert!(rem > 0);
+
+ // For the first index we need to take into account the offset. The first
+ // bytes might have to be skipped and as such we have fewer bytes available
+ // than the whole length of the buffer
+ let buf_len = if idx == 0 {
+ assert!(offset < buf.len());
+ buf.len() - offset
+ } else {
+ buf.len()
+ };
+
+ // Calculate how much we can copy from this buffer. At most the size of the buffer
+ // itself, but never more than the amount we still have to copy overall
+ let copy_len = if rem > buf_len { buf_len } else { rem };
+ assert!(copy_len > 0);
+
+ if idx == 0 {
+ self.buf
+ .extend_from_slice(&buf[offset..(offset + copy_len)]);
+ } else {
+ self.buf.extend_from_slice(&buf[..copy_len]);
+ }
+ }
+
+ assert_eq!(self.buf.len(), len);
+ }
+
+ pub fn line_with_drain(&mut self, drain: bool) -> Option<&[u8]> {
+ // Drop all data from the previous line
+ self.drop_previous_line();
+
+ // read_pos must always be inside the first buffer of our queue here
+ // or otherwise we went into an inconsistent state: the first buffer(s)
+ // would've had to be dropped above then as they are not relevant anymore
+ assert!(
+ self.read_pos == 0
+ || self.read_pos < self.queue.front().map(|f| f.as_ref().len()).unwrap_or(0)
+ );
+
+ // Find the next newline character from our previous position
+ if let Some((idx, pos)) = self.find_newline() {
+ // We now need to copy everything from the old read_pos to the new
+ // pos, and on the next call we have to start from the new pos
+ let old_read_pos = self.read_pos;
+ self.read_pos = pos;
+
+ assert!(self.read_pos > old_read_pos);
+ assert!(idx < self.queue.len());
+
+ // If the newline is found in the first buffer in our queue, we can directly return
+ // the slice from it without doing any copying.
+ //
+ // On average this should be the most common case.
+ if idx == 0 {
+ let buf = self.queue.front().unwrap().as_ref();
+ return Some(&buf[old_read_pos..self.read_pos]);
+ } else {
+ // Copy into our internal buffer as the current line spans multiple buffers
+ let len = self.read_pos - old_read_pos;
+ self.copy_into_internal_buffer(idx, old_read_pos, len);
+
+ return Some(&self.buf[0..len]);
+ }
+ }
+
+ // No newline found above and we're not draining, so let's wait until
+ // more data is available that might contain a newline character
+ if !drain {
+ return None;
+ }
+
+ if self.queue.is_empty() {
+ return None;
+ }
+
+ // When draining and we only have a single buffer in the queue we can
+ // directly return a slice into it
+ if self.queue.len() == 1 {
+ let res = &self.queue.front().unwrap().as_ref()[self.read_pos..];
+ self.read_pos += res.len();
+ self.search_pos = 1;
+ return Some(res);
+ }
+
+ // Otherwise we have to copy everything that is remaining into our
+ // internal buffer and then return a slice from that
+ let len = self.queue.iter().map(|v| v.as_ref().len()).sum();
+ if self.buf.capacity() < len {
+ self.buf.reserve(len - self.buf.capacity());
+ }
+
+ for (idx, ref v) in self.queue.iter().enumerate() {
+ if idx == 0 {
+ self.buf.extend_from_slice(&v.as_ref()[self.read_pos..]);
+ } else {
+ self.buf.extend_from_slice(v.as_ref());
+ }
+ }
+
+ self.read_pos += self.buf.len();
+ self.search_pos = self.queue.len();
+
+ Some(self.buf.as_ref())
+ }
+
+ pub fn clear(&mut self) {
+ self.queue.clear();
+ self.read_pos = 0;
+ self.search_pos = 0;
+ self.buf.clear();
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::LineReader;
+
+ #[test]
+ fn test_single_buffer() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\nefgh\nijkl\n".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
+ assert_eq!(r.line(), None);
+ }
+
+ #[test]
+ fn test_empty_line() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\nefgh\n\nijkl\n".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), Some(b"\n".as_ref()));
+ assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
+ assert_eq!(r.line(), None);
+ }
+
+ #[test]
+ fn test_multi_buffer_split() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\nef".as_ref()));
+ r.push(Vec::from(b"gh\nijkl\n".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
+ assert_eq!(r.line(), None);
+ }
+
+ #[test]
+ fn test_multi_buffer_split_2() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\ne".as_ref()));
+ r.push(Vec::from(b"f".as_ref()));
+ r.push(Vec::from(b"g".as_ref()));
+ r.push(Vec::from(b"h\nijkl\n".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), Some(b"ijkl\n".as_ref()));
+ assert_eq!(r.line(), None);
+ }
+
+ #[test]
+ fn test_single_buffer_drain() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\nefgh\nijkl".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), None);
+ assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
+ assert_eq!(r.line_or_drain(), None);
+ }
+
+ #[test]
+ fn test_single_buffer_drain_multi_line() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\nefgh\n".as_ref()));
+ r.push(Vec::from(b"ijkl".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), None);
+ assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
+ assert_eq!(r.line_or_drain(), None);
+ }
+
+ #[test]
+ fn test_single_buffer_drain_multi_line_2() {
+ let mut r = LineReader::new();
+ r.push(Vec::from(b"abcd\nefgh\ni".as_ref()));
+ r.push(Vec::from(b"j".as_ref()));
+ r.push(Vec::from(b"k".as_ref()));
+ r.push(Vec::from(b"l".as_ref()));
+
+ assert_eq!(r.line(), Some(b"abcd\n".as_ref()));
+ assert_eq!(r.line(), Some(b"efgh\n".as_ref()));
+ assert_eq!(r.line(), None);
+ assert_eq!(r.line_or_drain(), Some(b"ijkl".as_ref()));
+ assert_eq!(r.line_or_drain(), None);
+ }
+}