Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/sdroege/gst-plugin-rs.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorrajneeshksoni <soni.rajneesh@gmail.com>2022-12-19 15:46:54 +0300
committerGStreamer Marge Bot <gitlab-merge-bot@gstreamer-foundation.org>2023-01-04 15:36:13 +0300
commitd846f527af8ed8bf9072aa1b32268390fede16e6 (patch)
treec83508f10a7a0f4124600230ee8067066388dbd2 /net
parentde23ea7f2935fd537ce38455ce0907e4fabcfa62 (diff)
awss3hlssink: Add stats property.
application can monitor the progress of hls segment generation and upload progress. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1022>
Diffstat (limited to 'net')
-rw-r--r--net/aws/src/s3hlssink/imp.rs106
1 files changed, 88 insertions, 18 deletions
diff --git a/net/aws/src/s3hlssink/imp.rs b/net/aws/src/s3hlssink/imp.rs
index 0645a103..d47162d3 100644
--- a/net/aws/src/s3hlssink/imp.rs
+++ b/net/aws/src/s3hlssink/imp.rs
@@ -85,6 +85,7 @@ impl Default for Settings {
pub struct S3HlsSink {
settings: Mutex<Settings>,
+ state: Mutex<State>,
hlssink: gst::Element,
canceller: Mutex<Option<future::AbortHandle>>,
}
@@ -132,6 +133,18 @@ enum S3RequestControl {
Pause,
}
+enum State {
+ Stopped,
+ Started(Started),
+}
+
+#[derive(Default)]
+struct Started {
+ num_uploads_started: usize,
+ num_uploads_completed: usize,
+ num_bytes_uploaded: usize,
+}
+
impl S3Upload {
fn new(
s3_client: Client,
@@ -258,8 +271,9 @@ impl S3HlsSink {
let put_object_req_future = put_object_req.send();
let result = s3utils::wait(&self.canceller, put_object_req_future);
- if let Err(err) = result {
- gst::error!(
+ match result {
+ Err(err) => {
+ gst::error!(
CAT,
imp: self,
"Put object request for S3 key {} of data length {} failed with error {:?}",
@@ -267,12 +281,25 @@ impl S3HlsSink {
s3_data_len,
err,
);
- element_imp_error!(
- self,
- gst::ResourceError::Write,
- ["Put object request failed"]
- );
- break;
+ element_imp_error!(
+ self,
+ gst::ResourceError::Write,
+ ["Put object request failed"]
+ );
+ break;
+ }
+ Ok(_) => {
+ let mut state = self.state.lock().unwrap();
+ match *state {
+ State::Started(ref mut state) => {
+ state.num_bytes_uploaded += s3_data_len;
+ state.num_uploads_completed += 1;
+ }
+ State::Stopped => {
+ unreachable!("State not started yet")
+ }
+ };
+ }
};
}
Ok(S3Request::Delete(data)) => {
@@ -400,6 +427,24 @@ impl S3HlsSink {
};
};
}
+
+ fn create_stats(&self) -> gst::Structure {
+ let state = self.state.lock().unwrap();
+
+ match &*state {
+ State::Started(state) => gst::Structure::builder("stats")
+ .field("num-uploads-started", state.num_uploads_started as u32)
+ .field("num-uploads-completed", state.num_uploads_completed as u32)
+ .field("num-bytes-uploaded", state.num_bytes_uploaded as u32)
+ .build(),
+
+ State::Stopped => gst::Structure::builder("stats")
+ .field("num-uploads-started", 0)
+ .field("num-uploads-completed", 0)
+ .field("num-bytes-uploaded", 0)
+ .build(),
+ }
+ }
}
#[glib::object_subclass]
@@ -423,6 +468,7 @@ impl ObjectSubclass for S3HlsSink {
Self {
settings: Mutex::new(Settings::default()),
+ state: Mutex::new(State::Stopped),
hlssink,
canceller: Mutex::new(None),
}
@@ -488,6 +534,13 @@ impl ObjectImpl for S3HlsSink {
.minimum(1)
.default_value(DEFAULT_TIMEOUT_IN_MSECS)
.build(),
+ glib::ParamSpecBoxed::new(
+ "stats",
+ "Various statistics",
+ "Various statistics",
+ gst::Structure::static_type(),
+ glib::ParamFlags::READABLE,
+ ),
glib::ParamSpecString::builder("endpoint-uri")
.nick("S3 endpoint URI")
.blurb("The S3 endpoint URI to use")
@@ -568,6 +621,7 @@ impl ObjectImpl for S3HlsSink {
"acl" => settings.s3_acl.as_str().to_value(),
"retry-attempts" => settings.retry_attempts.to_value(),
"request-timeout" => (settings.request_timeout.as_millis() as u64).to_value(),
+ "stats" => self.create_stats().to_value(),
"endpoint-uri" => settings.endpoint_uri.to_value(),
_ => unimplemented!(),
}
@@ -610,6 +664,11 @@ impl ObjectImpl for S3HlsSink {
let s3client = self_.s3client_from_settings();
let settings = self_.settings.lock().unwrap();
+ let mut state = self_.state.lock().unwrap();
+ match *state {
+ State::Started(ref mut state) => state.num_uploads_started += 1,
+ State::Stopped => unreachable!("State not started yet"),
+ };
let s3_location = args[1].get::<&str>().unwrap();
let upload = S3Upload::new(
@@ -639,6 +698,11 @@ impl ObjectImpl for S3HlsSink {
let s3client = self_.s3client_from_settings();
let settings = self_.settings.lock().unwrap();
+ let mut state = self_.state.lock().unwrap();
+ match *state {
+ State::Started(ref mut state) => state.num_uploads_started += 1,
+ State::Stopped => unreachable!("State not started yet"),
+ };
let s3_location = args[1].get::<&str>().unwrap();
let upload = S3Upload::new(
@@ -764,36 +828,40 @@ impl ElementImpl for S3HlsSink {
* in turn will require the settings lock.
*/
let settings = self.settings.lock().unwrap();
+ let mut state = self.state.lock().unwrap();
match transition {
- gst::StateChange::PlayingToPaused => {
+ gst::StateChange::ReadyToPaused => *state = State::Started(Started::default()),
+ gst::StateChange::PausedToPlaying => {
let s3_txc = settings.s3_txc.clone();
if let Some(tx) = s3_txc {
gst::debug!(
CAT,
imp: self,
- "Sending pause request to S3 request thread."
+ "Sending continue request to S3 request thread."
);
- if settings.s3_upload_handle.is_some()
- && tx.send(S3RequestControl::Pause).is_err()
- {
- gst::error!(CAT, imp: self, "Could not send pause request.");
+ if tx.send(S3RequestControl::Continue).is_err() {
+ gst::error!(CAT, imp: self, "Could not send continue request.");
}
}
}
- gst::StateChange::PausedToPlaying => {
+
+ gst::StateChange::PlayingToPaused => {
let s3_txc = settings.s3_txc.clone();
if let Some(tx) = s3_txc {
gst::debug!(
CAT,
imp: self,
- "Sending continue request to S3 request thread."
+ "Sending pause request to S3 request thread."
);
- if tx.send(S3RequestControl::Continue).is_err() {
- gst::error!(CAT, imp: self, "Could not send continue request.");
+ if settings.s3_upload_handle.is_some()
+ && tx.send(S3RequestControl::Pause).is_err()
+ {
+ gst::error!(CAT, imp: self, "Could not send pause request.");
}
}
}
+
gst::StateChange::ReadyToNull => {
drop(settings);
/*
@@ -801,6 +869,8 @@ impl ElementImpl for S3HlsSink {
* pending requests.
*/
self.stop();
+
+ *state = State::Stopped
}
_ => (),
}