diff --git a/Cargo.lock b/Cargo.lock index fd46a63..88584e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,6 +1307,7 @@ version = "0.2.0" dependencies = [ "async-trait", "flume", + "futures-core", "image", "num-rational", "num-traits", @@ -1315,6 +1316,7 @@ dependencies = [ "serde", "small-map", "thiserror 2.0.0", + "typed-builder", "wgpu", ] @@ -1949,6 +1951,26 @@ dependencies = [ "winnow", ] +[[package]] +name = "typed-builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e14ed59dc8b7b26cacb2a92bad2e8b1f098806063898ab42a3bd121d7d45e75" +dependencies = [ + "typed-builder-macro", +] + +[[package]] +name = "typed-builder-macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "560b82d656506509d43abe30e0ba64c56b1953ab3d4fe7ba5902747a7a3cedd5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "udev" version = "0.5.0" diff --git a/nokhwa-bindings-linux/src/v4l2.rs b/nokhwa-bindings-linux/src/v4l2.rs index 6abae03..a886223 100644 --- a/nokhwa-bindings-linux/src/v4l2.rs +++ b/nokhwa-bindings-linux/src/v4l2.rs @@ -4,13 +4,14 @@ use nokhwa_core::error::{NokhwaError, NokhwaResult}; use nokhwa_core::frame_format::FrameFormat; use nokhwa_core::platform::{Backends, PlatformTrait}; use nokhwa_core::ranges::Range; -use nokhwa_core::stream::Stream; +use nokhwa_core::stream::{StreamHandle, StreamConfiguration, StreamInnerTrait}; use nokhwa_core::types::{CameraFormat, CameraIndex, CameraInformation, FrameRate, Resolution}; use std::collections::hash_map::{Keys, Values}; use std::collections::{HashMap, HashSet}; use std::num::NonZeroI32; use std::sync::Arc; -use std::thread::JoinHandle; +use std::thread::{sleep, JoinHandle}; +use std::time::Duration; use flume::{Sender, Receiver, unbounded, bounded}; use v4l::context::enum_devices; use v4l::control::{Description, Flags, MenuItem, Type, Value}; @@ -348,7 +349,7 @@ pub struct V4L2Camera { camera_format: Option, camera_index: CameraIndex, controls: Controls, - stream: Option, + stream: Option>, } impl Setting for V4L2Camera { @@ -503,7 +504,7 @@ impl Setting for V4L2Camera { struct V4L2Stream { thread: JoinHandle<()>, control: Sender<()>, - receiver: Arc>, + receiver: Arc>>, } impl Drop for V4L2Stream { @@ -512,14 +513,41 @@ impl Drop for V4L2Stream { } } +impl StreamInnerTrait for V4L2Stream { + fn configuration(&self) -> &Option { + &None + } + + + fn receiver(&self) -> Arc>> { + self.receiver.clone() + } + + fn stop(&mut self) -> NokhwaResult<()> { + self.control.send(()).map_err(|why| NokhwaError::StreamShutdownError(why.to_string()))?; + loop { + if self.thread.is_finished() { + break; + } + sleep(Duration::from_millis(1)) + } + Ok(()) + } +} + impl Capture for V4L2Camera { - fn open_stream(&mut self) -> Result { + fn open_stream(&mut self) -> Result, NokhwaError> { + if self.stream.is_some() { + return Err(NokhwaError::OpenStreamError("Stream Already Open".to_string())) + } + + let format = match self.camera_format { Some(fmt) => fmt, None => return Err(NokhwaError::OpenStreamError("No Format".to_string())) }; - let (control, ctrl_recv) = bounded(1); + let (control, ctrl_recv) = bounded::<()>(1); let (sender, receiver) = unbounded(); let receiver = Arc::new(receiver); @@ -530,7 +558,6 @@ impl Capture for V4L2Camera { })?; let thread = std::thread::spawn(move || { - loop { if ctrl_recv.is_disconnected() || sender.is_disconnected() { return; @@ -540,18 +567,37 @@ impl Capture for V4L2Camera { } match mmap_stream.next() { - Ok((data, meta)) => { - FrameBuffer::new() + Ok((data, _meta)) => { // TODO: Add metadata + if let Err(_why) = sender.send(Ok(FrameBuffer::new(data))) { + return (); + } + } + Err(why) => { + if let Err(_why) = sender.send(Err(NokhwaError::ReadFrameError(why.to_string()))) { + return (); + } } - Err(_) => {} } } - () - }) + return (); + }); + + let stream = Arc::new(StreamHandle::new(Box::new(V4L2Stream { + thread, + control, + receiver, + }))); + + self.stream = Some(stream.clone()); + Ok(stream) } fn close_stream(&mut self) -> Result<(), NokhwaError> { - todo!() + if let Some(stream) = self.stream.clone() { + stream.stop_stream()?; + + } + Ok(()) } } diff --git a/nokhwa-core/Cargo.toml b/nokhwa-core/Cargo.toml index 7a2d651..b5265d0 100644 --- a/nokhwa-core/Cargo.toml +++ b/nokhwa-core/Cargo.toml @@ -16,7 +16,7 @@ serialize = ["serde"] wgpu-types = ["wgpu"] opencv-mat = ["opencv", "opencv/clang-runtime"] docs-features = ["serialize", "wgpu-types"] -async = ["async-trait", "flume/async"] +async = ["async-trait", "flume/async", "futures-core"] test-fail-warnings = [] @@ -25,6 +25,7 @@ thiserror = "2.0" flume = "0.11" num-traits = "0.2" ordered-float = "5" +typed-builder = "0.20" [dependencies.num-rational] version = "0.4" @@ -58,7 +59,9 @@ optional = true version = "0.1" optional = true - +[dependencies.futures-core] +version = "0.3" +optional = true [package.metadata.docs.rs] features = ["docs-features"] diff --git a/nokhwa-core/src/camera.rs b/nokhwa-core/src/camera.rs index 5578562..d73fc5c 100644 --- a/nokhwa-core/src/camera.rs +++ b/nokhwa-core/src/camera.rs @@ -1,10 +1,11 @@ use crate::control::{ControlDescription, ControlId, ControlValue, Controls}; use crate::error::NokhwaError; use crate::frame_format::FrameFormat; -use crate::stream::Stream; +use crate::stream::StreamHandle; use crate::types::{CameraFormat, FrameRate, Resolution}; use std::collections::hash_map::{Keys, Values}; use std::collections::HashMap; +use std::sync::Arc; pub trait Setting { fn enumerate_formats(&self) -> Result, NokhwaError>; @@ -33,7 +34,6 @@ pub trait Setting { } #[cfg(feature = "async")] -#[cfg_attr(feature = "async", async_trait::async_trait)] pub trait AsyncSetting { async fn enumerate_formats_async(&self) -> Result, NokhwaError>; @@ -55,16 +55,15 @@ pub trait AsyncSetting { pub trait Capture { // Implementations MUST guarantee that there can only ever be one stream open at once. - fn open_stream(&mut self) -> Result; + fn open_stream(&mut self) -> Result, NokhwaError>; // Implementations MUST be multi-close tolerant. fn close_stream(&mut self) -> Result<(), NokhwaError>; } #[cfg(feature = "async")] -#[cfg_attr(feature = "async", async_trait::async_trait)] pub trait AsyncStream { - async fn open_stream_async(&mut self) -> Result; + async fn open_stream_async(&mut self) -> Result; async fn close_stream_async(&mut self) -> Result<(), NokhwaError>; } @@ -72,5 +71,4 @@ pub trait AsyncStream { pub trait Camera: Setting + Capture {} #[cfg(feature = "async")] -#[cfg_attr(feature = "async", async_trait::async_trait)] pub trait AsyncCamera: Camera + AsyncSetting + AsyncStream {} diff --git a/nokhwa-core/src/decoder.rs b/nokhwa-core/src/decoder.rs index 0dd2606..f59d6f7 100644 --- a/nokhwa-core/src/decoder.rs +++ b/nokhwa-core/src/decoder.rs @@ -1,99 +1,59 @@ -use crate::{error::NokhwaError, frame_buffer::FrameBuffer, frame_format::FrameFormat}; -use image::{ImageBuffer, Pixel}; -use std::ops::{ControlFlow, Deref}; +use std::borrow::Cow; +use std::fmt::Debug; +use crate::error::NokhwaError; +use crate::frame_buffer::FrameBuffer; +use crate::frame_format::FrameFormat; +use crate::stream::{StreamHandle}; +use crate::types::{CameraFormat, FrameRate, Resolution}; -/// Trait to define a struct that can decode a [`FrameBuffer`] -pub trait Decoder { - /// Formats that the decoder can decode. +#[derive(Debug)] +pub struct Decoder<'stream, Video> where + Video: Codec { + video: Video, + stream: &'stream mut StreamHandle +} + +impl<'stream, Video> Decoder<'stream, Video> where Video: Codec { + pub fn new(stream: &'stream mut StreamHandle, decoder: Video) -> Result { + let format = stream.format(); + + let mut decoder = decoder; + decoder.initialize(format)?; + Ok(Self { video: decoder, stream }) + } + + pub fn +} + +#[cfg(feature = "async")] +#[derive(Debug)] +pub struct DecoderAsync<'stream, Video> where + Video: CodecAsync { + video: Video, + stream_handle: &'stream mut StreamHandle +} + +pub trait Codec: Debug { const ALLOWED_FORMATS: &'static [FrameFormat]; - - /// Container type for the decoder. Will be used for ImageBuffer - type PixelContainer: Deref; - - fn check_format(buffer: &FrameBuffer) -> ControlFlow { - if !Self::ALLOWED_FORMATS.contains(&buffer.source_frame_format()) { - return ControlFlow::Break(NokhwaError::ConversionError("unsupported".to_string())); - } - - ControlFlow::Continue(()) - } - - /// Decode function. - fn decode( - &mut self, - buffer: &FrameBuffer, - ) -> Result, NokhwaError>; - - /// Decode to user-provided Buffer - /// - /// Incase that the buffer is not large enough this should error. - fn decode_buffer( - &mut self, - buffer: &FrameBuffer, - output: &mut [OutputPixel::Subpixel], - ) -> Result<(), NokhwaError>; - - /// Decoder Predicted Size - fn predicted_size_of_frame(buffer: &FrameBuffer) -> Option { - if !Self::ALLOWED_FORMATS.contains(&buffer.source_frame_format()) { - return None; - } - let res = buffer.resolution(); - Some( - res.x() as usize - * res.y() as usize - * size_of::() - * OutputPixel::CHANNEL_COUNT as usize, - ) - } -} - -/// Decoder that can be used statically (struct contains no state) -/// -/// This is useful for times that a simple function is all that is required. -pub trait StaticDecoder: Decoder { - fn decode_static( - buffer: &FrameBuffer, - ) -> Result, NokhwaError>; - - fn decode_static_to_buffer( - buffer: &FrameBuffer, - output: &mut [OutputPixel::Subpixel], - ) -> Result<(), NokhwaError>; + + fn initialize(&mut self, camera_format: CameraFormat) -> Result<(), NokhwaError>; + + fn stop(&mut self) -> Result<(), NokhwaError>; + + fn frame_format(&self) -> Result; + + fn resolution(&self) -> Result; + + fn frame_rate(&self) -> Result; + + fn set_frame_format(&mut self, frame_format: FrameFormat) -> Result<(), NokhwaError>; + + fn set_resolution(&mut self, resolution: Resolution) -> Result<(), NokhwaError>; + + fn set_frame_rate(&mut self, frame_rate: FrameRate) -> Result<(), NokhwaError>; + + fn decode_frame(&mut self, buffer: &FrameBuffer) -> Result, NokhwaError>; } #[cfg(feature = "async")] -#[cfg_attr(feature = "async", async_trait::async_trait)] -pub trait AsyncDecoder: Decoder { - /// Asynchronous decoder - async fn decode_async( - &mut self, - buffer: &FrameBuffer, - ) -> Result, NokhwaError>; - - /// Asynchronous decoder to user buffer. - async fn decode_buffer( - &mut self, - buffer: &FrameBuffer, - output: &mut [OutputPixel::Subpixel], - ) -> Result<(), NokhwaError>; -} - -#[cfg(feature = "async")] -#[cfg_attr(feature = "async", async_trait::async_trait)] -pub trait AsyncStaticDecoder: - Decoder + AsyncDecoder -{ - /// Asynchronous decoder - async fn decode_static_async( - buffer: &FrameBuffer, - ) -> Result, NokhwaError>; - - /// Asynchronous decoder to user buffer. - async fn decode_static_buffer_async( - buffer: &FrameBuffer, - output: &mut [OutputPixel::Subpixel], - ) -> Result<(), NokhwaError>; -} - -// #[cfg(feature = "decoders")] +pub trait CodecAsync: Codec + Debug {} diff --git a/nokhwa-core/src/frame_buffer.rs b/nokhwa-core/src/frame_buffer.rs index 7535d5b..b440353 100644 --- a/nokhwa-core/src/frame_buffer.rs +++ b/nokhwa-core/src/frame_buffer.rs @@ -15,7 +15,6 @@ */ use std::hash::{Hash, Hasher}; use crate::frame_format::FrameFormat; -use crate::types::Resolution; use small_map::{FxSmallMap, Iter}; use crate::control::ControlValue; @@ -76,9 +75,7 @@ impl PartialEq for Metadata { /// Note that decoding on the main thread **will** decrease your performance and lead to dropped frames. #[derive(Clone, Debug, Hash, PartialEq)] pub struct FrameBuffer { - resolution: Resolution, buffer: Vec, - source_frame_format: FrameFormat, metadata: Option, } @@ -86,21 +83,13 @@ impl FrameBuffer { /// Creates a new buffer with a [`&[u8]`]. #[must_use] #[inline] - pub fn new(resolution: Resolution, buffer: Vec, source_frame_format: FrameFormat, metadata: Option) -> Self { + pub fn new(buffer: Vec, metadata: Option) -> Self { Self { - resolution, buffer, - source_frame_format, metadata, } } - - /// Get the [`Resolution`] of this buffer. - #[must_use] - pub fn resolution(&self) -> Resolution { - self.resolution - } - + /// Get the data of this buffer. #[must_use] pub fn buffer(&self) -> &[u8] { @@ -117,9 +106,4 @@ impl FrameBuffer { self.metadata.as_ref() } - /// Get the [`SourceFrameFormat`] of this buffer. - #[must_use] - pub fn source_frame_format(&self) -> FrameFormat { - self.source_frame_format - } } diff --git a/nokhwa-core/src/lib.rs b/nokhwa-core/src/lib.rs index 4f9e163..d8ff647 100644 --- a/nokhwa-core/src/lib.rs +++ b/nokhwa-core/src/lib.rs @@ -1,10 +1,7 @@ #![deny(clippy::pedantic)] #![warn(clippy::all)] -#![allow(clippy::cast_precision_loss)] -#![allow(clippy::cast_sign_loss)] -#![allow(clippy::cast_possible_truncation)] #![cfg_attr(feature = "test-fail-warning", deny(warnings))] -// #![cfg_attr(feature = "docs-features", feature(doc_cfg))] +#![cfg_attr(feature = "docs-features", feature(doc_cfg))] /* * Copyright 2022 l1npengtul / The Nokhwa Contributors * diff --git a/nokhwa-core/src/platform.rs b/nokhwa-core/src/platform.rs index b1df303..ed2a66f 100644 --- a/nokhwa-core/src/platform.rs +++ b/nokhwa-core/src/platform.rs @@ -37,7 +37,6 @@ pub trait PlatformTrait { } #[cfg(feature = "async")] -#[cfg_attr(feature = "async", async_trait::async_trait)] pub trait AsyncPlatformTrait { const PLATFORM: Backends; type AsyncCamera: AsyncCamera; diff --git a/nokhwa-core/src/stream.rs b/nokhwa-core/src/stream.rs index 143d67b..96ea8cb 100644 --- a/nokhwa-core/src/stream.rs +++ b/nokhwa-core/src/stream.rs @@ -1,95 +1,205 @@ -use crate::error::{NokhwaError, NokhwaResult}; +use std::cell::Cell; +use std::time::Duration; +use flume::{Receiver, Sender, TryRecvError}; +use typed_builder::TypedBuilder; +use crate::error::NokhwaError; use crate::frame_buffer::FrameBuffer; -use std::sync::Arc; -use derive_builder::Builder; -use flume::{Receiver, TryRecvError}; +use crate::types::CameraFormat; -#[derive(Clone, Debug, Default, PartialEq, Builder)] +/// What receiving behaviour the stream should observe. +/// +/// Note that [`StreamHandleTrait::poll_frame`] does not respect [`StreamReceiverBehaviour::Timeout`] - +/// it will either immediately return (try once) or block until the next frame or error. +/// +/// The default behaviour is to block until a new event is sent. +#[derive(Clone, Debug, Default, PartialOrd, PartialEq)] +pub enum StreamReceiverBehaviour { + /// Blocks until a new event is sent to the Stream. + #[default] + Blocking, + /// Only waits [duration] amount of time for a new event, returning [`Event::NotReady`] otherwise. + Timeout(Duration), + /// Immediately return. If there is no event waiting for the stream, it will return an [`Event::NotReady`] instead. + Try, +} + +/// How many events a stream can hold. By default, it is **one**. +/// +/// This means that streams will be blocked until the stream handle is emptied. +#[derive(Clone, Debug, PartialOrd, PartialEq)] +pub enum StreamBounds { + Bounded(u32), + Unbounded, +} + +impl Default for StreamBounds { + fn default() -> Self { + StreamBounds::Bounded(1) + } +} + +#[derive(Clone, Default, Debug, PartialOrd, PartialEq)] +pub enum ControlFlowOnOther { + Continue, + #[default] + Break, +} + +/// Configuration for a [`StreamHandle`]. +#[derive(Clone, Debug, Default, PartialOrd, PartialEq, TypedBuilder)] pub struct StreamConfiguration { - buffer_size: Option, - + #[builder(default)] + pub receiver: StreamReceiverBehaviour, + #[builder(default)] + pub bound: StreamBounds, + #[builder(default)] + pub on_other: ControlFlowOnOther, } -pub trait StreamInnerTrait { - fn configuration(&self) -> &StreamConfiguration; - fn receiver(&self) -> Arc>; - fn stop(&mut self) -> NokhwaResult<()>; +/// Possible events to receive from an active stream. +#[derive(Clone, Debug, PartialEq)] +pub enum Event { + /// A new frame. + NewFrame(FrameBuffer), + /// Camera Format Changed. + /// + /// This will usually require the reset of a buffer, or be followed by a [`Event::Terminated`], + /// depending on the backend used. + FormatChange(CameraFormat), + /// This stream is not ready for another event. This is **never** sent by the stream itself, but + /// instead a [`StreamHandle`] construct for when the user sets [`StreamReceiverBehaviour`] to either + /// [`StreamReceiverBehaviour::Timeout`] or [`StreamReceiverBehaviour::Try`] but the stream does not + /// have the data ready. + /// + /// (This can be ignored when iterating, or using the [`StreamReceiverBehaviour::Blocking`] approach.) + NotReady, + /// The stream will be ended shortly. Users should call [`StreamHandleTrait::close_stream`] afterwards. + Terminating, + /// The stream is closed. + Closed, + /// Some other message sent by the driver. This can be ignored, although logging this is preferable. + Other(String) } -pub struct Stream { - inner: Box, +/// Represents a handle to a currently open stream. +/// +/// Streams are only valid as long as the camera is live. Any Stream that is living past a camera +/// is invalid to use. (This doesn't cause UB, it will just kindly tell you that the stream has +/// already closed.) +/// +/// Streams may unexpectedly close due to unforeseen consequences e.g. webcam undergoes spontaneous +/// deconstruction. +/// +/// The async methods [`StreamHandle::poll_event`] and [`StreamHandle::poll_frame`] **do not** respect the [`StreamReceiverBehaviour`] setting. +/// +/// You may also close the stream from the handle side using +#[derive(Debug)] +pub struct StreamHandle { + frame: Receiver, + control: Sender<()>, + configuration: StreamConfiguration, + format: Cell, } -impl Stream { - pub fn new(inner: Box) -> Self { +impl StreamHandle { + /// You shouldn't be here. + pub fn new(recv: Receiver, control: Sender<()>, configuration: StreamConfiguration, format: CameraFormat) -> Self { Self { - inner, + frame: recv, + control, + configuration, + format: Cell::new(format), } } - - pub fn check_disconnected(&self) -> NokhwaResult<()> { - if self.inner.receiver().is_disconnected() { - return Err(NokhwaError::ReadFrameError( - "stream is disconnected!".to_string(), - )) + + pub fn configuration(&self) -> &StreamConfiguration { + &self.configuration + } + + pub fn format(&self) -> CameraFormat { + self.format.get() + } + + pub fn next_event(&self) -> Result { + let event = match self.configuration.receiver { + StreamReceiverBehaviour::Blocking => { + self.frame.recv().map_or_else(|_| { Event::Closed }, |e| { e }) + } + StreamReceiverBehaviour::Timeout(time) => { + self.frame.recv_timeout(time).map_or_else(|_| { Event::NotReady }, |e| { e }) + } + StreamReceiverBehaviour::Try => { + self.frame.try_recv().map_or_else(|why| { + match why { + TryRecvError::Empty => Event::NotReady, + TryRecvError::Disconnected => Event::Closed, + } + }, |e| { e }) + } + }; + + if let Event::FormatChange(fmt) = event { + self.format.set(fmt); } - Ok(()) + + return Ok(event) } - pub fn poll_frame(&self) -> NokhwaResult { - self.check_disconnected()?; - - self.inner - .receiver() - .recv() - .map_err(|why| NokhwaError::ReadFrameError(why.to_string())) - } - - pub fn try_poll_frame(&self) -> NokhwaResult> { - self.check_disconnected()?; - - if self.inner.receiver().is_empty() { - return Ok(None); - } - - let possible_frame = self.inner - .receiver() - .try_recv(); - - match possible_frame { - Ok(f) => Ok(Some(f)), - Err(why) => { - match why { - TryRecvError::Empty => Ok(None), - TryRecvError::Disconnected => Err(NokhwaError::ReadFrameError( - "stream is disconnected!".to_string(), - )) + pub fn next_frame(&self) -> Result { + loop { + let event = self.next_event()?; + match event { + Event::NewFrame(f) => return Ok(f), + Event::FormatChange(_) | Event::NotReady => continue, + Event::Terminating | Event::Closed => { + let _ = self.control.try_send(()); + return Err(NokhwaError::ReadFrameError("Stream Closed.".to_string())) + } + Event::Other(why) => { + match self.configuration.on_other { + ControlFlowOnOther::Continue => continue, + ControlFlowOnOther::Break => return Err(NokhwaError::ReadFrameError(why)) + } } } } - } #[cfg(feature = "async")] - pub async fn await_frame(&self) -> NokhwaResult { - use futures::TryFutureExt; - - self.check_disconnected()?; - - self.inner - .receiver() - .recv_async() - .map_err(|why| NokhwaError::ReadFrameError(why.to_string())).await + pub async fn poll_event(&self) -> Result { + Ok(self.frame.recv_async().await.map_or_else(|_| { Event::Closed }, |e| { if let Event::FormatChange(fmt) = e { + self.format.set(fmt); + } + e + })) } - - pub fn stop_stream(mut self) -> NokhwaResult<()> { - self.inner.stop()?; - Ok(()) + + // TODO: a smarter implementation? maybe? + #[cfg(feature = "async")] + pub async fn poll_next_frame(&self) -> Result { + loop { + let event = self.poll_event().await?; + match event { + Event::NewFrame(f) => return Ok(f), + Event::FormatChange(_) | Event::NotReady => continue, + Event::Terminating | Event::Closed => { + let _ = self.control.try_send(()); + return Err(NokhwaError::ReadFrameError("Stream Closed.".to_string())) + } + Event::Other(why) => { + match self.configuration.on_other { + ControlFlowOnOther::Continue => continue, + ControlFlowOnOther::Break => return Err(NokhwaError::ReadFrameError(why)) + } + } + } + } } } -impl Drop for Stream { +impl Drop for StreamHandle { fn drop(&mut self) { - let _ = self.inner.stop(); + let _ = self.control.try_send(()); } } +