intermidiate

This commit is contained in:
l1npengtul
2025-03-28 05:23:21 +09:00
parent 8a38293978
commit eee01e6a2e
9 changed files with 324 additions and 205 deletions
Generated
+22
View File
@@ -1307,6 +1307,7 @@ version = "0.2.0"
dependencies = [
"async-trait",
"flume",
"futures-core",
"image",
"num-rational",
"num-traits",
@@ -1315,6 +1316,7 @@ dependencies = [
"serde",
"small-map",
"thiserror 2.0.0",
"typed-builder",
"wgpu",
]
@@ -1949,6 +1951,26 @@ dependencies = [
"winnow",
]
[[package]]
name = "typed-builder"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e14ed59dc8b7b26cacb2a92bad2e8b1f098806063898ab42a3bd121d7d45e75"
dependencies = [
"typed-builder-macro",
]
[[package]]
name = "typed-builder-macro"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "560b82d656506509d43abe30e0ba64c56b1953ab3d4fe7ba5902747a7a3cedd5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "udev"
version = "0.5.0"
+60 -14
View File
@@ -4,13 +4,14 @@ use nokhwa_core::error::{NokhwaError, NokhwaResult};
use nokhwa_core::frame_format::FrameFormat;
use nokhwa_core::platform::{Backends, PlatformTrait};
use nokhwa_core::ranges::Range;
use nokhwa_core::stream::Stream;
use nokhwa_core::stream::{StreamHandle, StreamConfiguration, StreamInnerTrait};
use nokhwa_core::types::{CameraFormat, CameraIndex, CameraInformation, FrameRate, Resolution};
use std::collections::hash_map::{Keys, Values};
use std::collections::{HashMap, HashSet};
use std::num::NonZeroI32;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::thread::{sleep, JoinHandle};
use std::time::Duration;
use flume::{Sender, Receiver, unbounded, bounded};
use v4l::context::enum_devices;
use v4l::control::{Description, Flags, MenuItem, Type, Value};
@@ -348,7 +349,7 @@ pub struct V4L2Camera {
camera_format: Option<CameraFormat>,
camera_index: CameraIndex,
controls: Controls,
stream: Option<Stream>,
stream: Option<Arc<StreamHandle>>,
}
impl Setting for V4L2Camera {
@@ -503,7 +504,7 @@ impl Setting for V4L2Camera {
struct V4L2Stream {
thread: JoinHandle<()>,
control: Sender<()>,
receiver: Arc<Receiver<FrameBuffer>>,
receiver: Arc<Receiver<NokhwaResult<FrameBuffer>>>,
}
impl Drop for V4L2Stream {
@@ -512,14 +513,41 @@ impl Drop for V4L2Stream {
}
}
impl StreamInnerTrait for V4L2Stream {
fn configuration(&self) -> &Option<StreamConfiguration> {
&None
}
fn receiver(&self) -> Arc<Receiver<NokhwaResult<FrameBuffer>>> {
self.receiver.clone()
}
fn stop(&mut self) -> NokhwaResult<()> {
self.control.send(()).map_err(|why| NokhwaError::StreamShutdownError(why.to_string()))?;
loop {
if self.thread.is_finished() {
break;
}
sleep(Duration::from_millis(1))
}
Ok(())
}
}
impl Capture for V4L2Camera {
fn open_stream(&mut self) -> Result<Stream, NokhwaError> {
fn open_stream(&mut self) -> Result<Arc<StreamHandle>, NokhwaError> {
if self.stream.is_some() {
return Err(NokhwaError::OpenStreamError("Stream Already Open".to_string()))
}
let format = match self.camera_format {
Some(fmt) => fmt,
None => return Err(NokhwaError::OpenStreamError("No Format".to_string()))
};
let (control, ctrl_recv) = bounded(1);
let (control, ctrl_recv) = bounded::<()>(1);
let (sender, receiver) = unbounded();
let receiver = Arc::new(receiver);
@@ -530,7 +558,6 @@ impl Capture for V4L2Camera {
})?;
let thread = std::thread::spawn(move || {
loop {
if ctrl_recv.is_disconnected() || sender.is_disconnected() {
return;
@@ -540,18 +567,37 @@ impl Capture for V4L2Camera {
}
match mmap_stream.next() {
Ok((data, meta)) => {
FrameBuffer::new()
}
Err(_) => {}
Ok((data, _meta)) => { // TODO: Add metadata
if let Err(_why) = sender.send(Ok(FrameBuffer::new(data))) {
return ();
}
}
()
})
Err(why) => {
if let Err(_why) = sender.send(Err(NokhwaError::ReadFrameError(why.to_string()))) {
return ();
}
}
}
}
return ();
});
let stream = Arc::new(StreamHandle::new(Box::new(V4L2Stream {
thread,
control,
receiver,
})));
self.stream = Some(stream.clone());
Ok(stream)
}
fn close_stream(&mut self) -> Result<(), NokhwaError> {
todo!()
if let Some(stream) = self.stream.clone() {
stream.stop_stream()?;
}
Ok(())
}
}
+5 -2
View File
@@ -16,7 +16,7 @@ serialize = ["serde"]
wgpu-types = ["wgpu"]
opencv-mat = ["opencv", "opencv/clang-runtime"]
docs-features = ["serialize", "wgpu-types"]
async = ["async-trait", "flume/async"]
async = ["async-trait", "flume/async", "futures-core"]
test-fail-warnings = []
@@ -25,6 +25,7 @@ thiserror = "2.0"
flume = "0.11"
num-traits = "0.2"
ordered-float = "5"
typed-builder = "0.20"
[dependencies.num-rational]
version = "0.4"
@@ -58,7 +59,9 @@ optional = true
version = "0.1"
optional = true
[dependencies.futures-core]
version = "0.3"
optional = true
[package.metadata.docs.rs]
features = ["docs-features"]
+4 -6
View File
@@ -1,10 +1,11 @@
use crate::control::{ControlDescription, ControlId, ControlValue, Controls};
use crate::error::NokhwaError;
use crate::frame_format::FrameFormat;
use crate::stream::Stream;
use crate::stream::StreamHandle;
use crate::types::{CameraFormat, FrameRate, Resolution};
use std::collections::hash_map::{Keys, Values};
use std::collections::HashMap;
use std::sync::Arc;
pub trait Setting {
fn enumerate_formats(&self) -> Result<Vec<CameraFormat>, NokhwaError>;
@@ -33,7 +34,6 @@ pub trait Setting {
}
#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncSetting {
async fn enumerate_formats_async(&self) -> Result<Vec<CameraFormat>, NokhwaError>;
@@ -55,16 +55,15 @@ pub trait AsyncSetting {
pub trait Capture {
// Implementations MUST guarantee that there can only ever be one stream open at once.
fn open_stream(&mut self) -> Result<Stream, NokhwaError>;
fn open_stream(&mut self) -> Result<Arc<StreamHandle>, NokhwaError>;
// Implementations MUST be multi-close tolerant.
fn close_stream(&mut self) -> Result<(), NokhwaError>;
}
#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncStream {
async fn open_stream_async(&mut self) -> Result<Stream, NokhwaError>;
async fn open_stream_async(&mut self) -> Result<StreamHandle, NokhwaError>;
async fn close_stream_async(&mut self) -> Result<(), NokhwaError>;
}
@@ -72,5 +71,4 @@ pub trait AsyncStream {
pub trait Camera: Setting + Capture {}
#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncCamera: Camera + AsyncSetting + AsyncStream {}
+46 -86
View File
@@ -1,99 +1,59 @@
use crate::{error::NokhwaError, frame_buffer::FrameBuffer, frame_format::FrameFormat};
use image::{ImageBuffer, Pixel};
use std::ops::{ControlFlow, Deref};
use std::borrow::Cow;
use std::fmt::Debug;
use crate::error::NokhwaError;
use crate::frame_buffer::FrameBuffer;
use crate::frame_format::FrameFormat;
use crate::stream::{StreamHandle};
use crate::types::{CameraFormat, FrameRate, Resolution};
/// Trait to define a struct that can decode a [`FrameBuffer`]
pub trait Decoder<OutputPixel: Pixel> {
/// Formats that the decoder can decode.
#[derive(Debug)]
pub struct Decoder<'stream, Video> where
Video: Codec {
video: Video,
stream: &'stream mut StreamHandle
}
impl<'stream, Video> Decoder<'stream, Video> where Video: Codec {
pub fn new(stream: &'stream mut StreamHandle, decoder: Video) -> Result<Self, NokhwaError> {
let format = stream.format();
let mut decoder = decoder;
decoder.initialize(format)?;
Ok(Self { video: decoder, stream })
}
pub fn
}
#[cfg(feature = "async")]
#[derive(Debug)]
pub struct DecoderAsync<'stream, Video> where
Video: CodecAsync {
video: Video,
stream_handle: &'stream mut StreamHandle
}
pub trait Codec: Debug {
const ALLOWED_FORMATS: &'static [FrameFormat];
/// Container type for the decoder. Will be used for ImageBuffer
type PixelContainer: Deref<Target = [OutputPixel::Subpixel]>;
fn initialize(&mut self, camera_format: CameraFormat) -> Result<(), NokhwaError>;
fn check_format(buffer: &FrameBuffer) -> ControlFlow<NokhwaError> {
if !Self::ALLOWED_FORMATS.contains(&buffer.source_frame_format()) {
return ControlFlow::Break(NokhwaError::ConversionError("unsupported".to_string()));
}
fn stop(&mut self) -> Result<(), NokhwaError>;
ControlFlow::Continue(())
}
fn frame_format(&self) -> Result<FrameFormat, NokhwaError>;
/// Decode function.
fn decode(
&mut self,
buffer: &FrameBuffer,
) -> Result<ImageBuffer<OutputPixel, Self::PixelContainer>, NokhwaError>;
fn resolution(&self) -> Result<Resolution, NokhwaError>;
/// Decode to user-provided Buffer
///
/// Incase that the buffer is not large enough this should error.
fn decode_buffer(
&mut self,
buffer: &FrameBuffer,
output: &mut [OutputPixel::Subpixel],
) -> Result<(), NokhwaError>;
fn frame_rate(&self) -> Result<FrameRate, NokhwaError>;
/// Decoder Predicted Size
fn predicted_size_of_frame(buffer: &FrameBuffer) -> Option<usize> {
if !Self::ALLOWED_FORMATS.contains(&buffer.source_frame_format()) {
return None;
}
let res = buffer.resolution();
Some(
res.x() as usize
* res.y() as usize
* size_of::<OutputPixel::Subpixel>()
* OutputPixel::CHANNEL_COUNT as usize,
)
}
}
fn set_frame_format(&mut self, frame_format: FrameFormat) -> Result<(), NokhwaError>;
/// Decoder that can be used statically (struct contains no state)
///
/// This is useful for times that a simple function is all that is required.
pub trait StaticDecoder<OutputPixel: Pixel>: Decoder<OutputPixel> {
fn decode_static(
buffer: &FrameBuffer,
) -> Result<ImageBuffer<OutputPixel, Self::PixelContainer>, NokhwaError>;
fn set_resolution(&mut self, resolution: Resolution) -> Result<(), NokhwaError>;
fn decode_static_to_buffer(
buffer: &FrameBuffer,
output: &mut [OutputPixel::Subpixel],
) -> Result<(), NokhwaError>;
fn set_frame_rate(&mut self, frame_rate: FrameRate) -> Result<(), NokhwaError>;
fn decode_frame(&mut self, buffer: &FrameBuffer) -> Result<Cow<'_, [u8]>, NokhwaError>;
}
#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncDecoder<OutputPixel: Pixel>: Decoder<OutputPixel> {
/// Asynchronous decoder
async fn decode_async(
&mut self,
buffer: &FrameBuffer,
) -> Result<ImageBuffer<OutputPixel, Self::PixelContainer>, NokhwaError>;
/// Asynchronous decoder to user buffer.
async fn decode_buffer(
&mut self,
buffer: &FrameBuffer,
output: &mut [OutputPixel::Subpixel],
) -> Result<(), NokhwaError>;
}
#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncStaticDecoder<OutputPixel: Pixel>:
Decoder<OutputPixel> + AsyncDecoder<OutputPixel>
{
/// Asynchronous decoder
async fn decode_static_async(
buffer: &FrameBuffer,
) -> Result<ImageBuffer<OutputPixel, Self::PixelContainer>, NokhwaError>;
/// Asynchronous decoder to user buffer.
async fn decode_static_buffer_async(
buffer: &FrameBuffer,
output: &mut [OutputPixel::Subpixel],
) -> Result<(), NokhwaError>;
}
// #[cfg(feature = "decoders")]
pub trait CodecAsync: Codec + Debug {}
+1 -17
View File
@@ -15,7 +15,6 @@
*/
use std::hash::{Hash, Hasher};
use crate::frame_format::FrameFormat;
use crate::types::Resolution;
use small_map::{FxSmallMap, Iter};
use crate::control::ControlValue;
@@ -76,9 +75,7 @@ impl PartialEq for Metadata {
/// Note that decoding on the main thread **will** decrease your performance and lead to dropped frames.
#[derive(Clone, Debug, Hash, PartialEq)]
pub struct FrameBuffer {
resolution: Resolution,
buffer: Vec<u8>,
source_frame_format: FrameFormat,
metadata: Option<Metadata>,
}
@@ -86,21 +83,13 @@ impl FrameBuffer {
/// Creates a new buffer with a [`&[u8]`].
#[must_use]
#[inline]
pub fn new(resolution: Resolution, buffer: Vec<u8>, source_frame_format: FrameFormat, metadata: Option<Metadata>) -> Self {
pub fn new(buffer: Vec<u8>, metadata: Option<Metadata>) -> Self {
Self {
resolution,
buffer,
source_frame_format,
metadata,
}
}
/// Get the [`Resolution`] of this buffer.
#[must_use]
pub fn resolution(&self) -> Resolution {
self.resolution
}
/// Get the data of this buffer.
#[must_use]
pub fn buffer(&self) -> &[u8] {
@@ -117,9 +106,4 @@ impl FrameBuffer {
self.metadata.as_ref()
}
/// Get the [`SourceFrameFormat`] of this buffer.
#[must_use]
pub fn source_frame_format(&self) -> FrameFormat {
self.source_frame_format
}
}
+1 -4
View File
@@ -1,10 +1,7 @@
#![deny(clippy::pedantic)]
#![warn(clippy::all)]
#![allow(clippy::cast_precision_loss)]
#![allow(clippy::cast_sign_loss)]
#![allow(clippy::cast_possible_truncation)]
#![cfg_attr(feature = "test-fail-warning", deny(warnings))]
// #![cfg_attr(feature = "docs-features", feature(doc_cfg))]
#![cfg_attr(feature = "docs-features", feature(doc_cfg))]
/*
* Copyright 2022 l1npengtul <l1npengtul@protonmail.com> / The Nokhwa Contributors
*
-1
View File
@@ -37,7 +37,6 @@ pub trait PlatformTrait {
}
#[cfg(feature = "async")]
#[cfg_attr(feature = "async", async_trait::async_trait)]
pub trait AsyncPlatformTrait {
const PLATFORM: Backends;
type AsyncCamera: AsyncCamera;
+171 -61
View File
@@ -1,95 +1,205 @@
use crate::error::{NokhwaError, NokhwaResult};
use std::cell::Cell;
use std::time::Duration;
use flume::{Receiver, Sender, TryRecvError};
use typed_builder::TypedBuilder;
use crate::error::NokhwaError;
use crate::frame_buffer::FrameBuffer;
use std::sync::Arc;
use derive_builder::Builder;
use flume::{Receiver, TryRecvError};
use crate::types::CameraFormat;
#[derive(Clone, Debug, Default, PartialEq, Builder)]
/// What receiving behaviour the stream should observe.
///
/// Note that [`StreamHandleTrait::poll_frame`] does not respect [`StreamReceiverBehaviour::Timeout`] -
/// it will either immediately return (try once) or block until the next frame or error.
///
/// The default behaviour is to block until a new event is sent.
#[derive(Clone, Debug, Default, PartialOrd, PartialEq)]
pub enum StreamReceiverBehaviour {
/// Blocks until a new event is sent to the Stream.
#[default]
Blocking,
/// Only waits [duration] amount of time for a new event, returning [`Event::NotReady`] otherwise.
Timeout(Duration),
/// Immediately return. If there is no event waiting for the stream, it will return an [`Event::NotReady`] instead.
Try,
}
/// How many events a stream can hold. By default, it is **one**.
///
/// This means that streams will be blocked until the stream handle is emptied.
#[derive(Clone, Debug, PartialOrd, PartialEq)]
pub enum StreamBounds {
Bounded(u32),
Unbounded,
}
impl Default for StreamBounds {
fn default() -> Self {
StreamBounds::Bounded(1)
}
}
#[derive(Clone, Default, Debug, PartialOrd, PartialEq)]
pub enum ControlFlowOnOther {
Continue,
#[default]
Break,
}
/// Configuration for a [`StreamHandle`].
#[derive(Clone, Debug, Default, PartialOrd, PartialEq, TypedBuilder)]
pub struct StreamConfiguration {
buffer_size: Option<u32>,
#[builder(default)]
pub receiver: StreamReceiverBehaviour,
#[builder(default)]
pub bound: StreamBounds,
#[builder(default)]
pub on_other: ControlFlowOnOther,
}
pub trait StreamInnerTrait {
fn configuration(&self) -> &StreamConfiguration;
fn receiver(&self) -> Arc<Receiver<FrameBuffer>>;
fn stop(&mut self) -> NokhwaResult<()>;
/// Possible events to receive from an active stream.
#[derive(Clone, Debug, PartialEq)]
pub enum Event {
/// A new frame.
NewFrame(FrameBuffer),
/// Camera Format Changed.
///
/// This will usually require the reset of a buffer, or be followed by a [`Event::Terminated`],
/// depending on the backend used.
FormatChange(CameraFormat),
/// This stream is not ready for another event. This is **never** sent by the stream itself, but
/// instead a [`StreamHandle`] construct for when the user sets [`StreamReceiverBehaviour`] to either
/// [`StreamReceiverBehaviour::Timeout`] or [`StreamReceiverBehaviour::Try`] but the stream does not
/// have the data ready.
///
/// (This can be ignored when iterating, or using the [`StreamReceiverBehaviour::Blocking`] approach.)
NotReady,
/// The stream will be ended shortly. Users should call [`StreamHandleTrait::close_stream`] afterwards.
Terminating,
/// The stream is closed.
Closed,
/// Some other message sent by the driver. This can be ignored, although logging this is preferable.
Other(String)
}
pub struct Stream {
inner: Box<dyn StreamInnerTrait>,
/// Represents a handle to a currently open stream.
///
/// Streams are only valid as long as the camera is live. Any Stream that is living past a camera
/// is invalid to use. (This doesn't cause UB, it will just kindly tell you that the stream has
/// already closed.)
///
/// Streams may unexpectedly close due to unforeseen consequences e.g. webcam undergoes spontaneous
/// deconstruction.
///
/// The async methods [`StreamHandle::poll_event`] and [`StreamHandle::poll_frame`] **do not** respect the [`StreamReceiverBehaviour`] setting.
///
/// You may also close the stream from the handle side using
#[derive(Debug)]
pub struct StreamHandle {
frame: Receiver<Event>,
control: Sender<()>,
configuration: StreamConfiguration,
format: Cell<CameraFormat>,
}
impl Stream {
pub fn new(inner: Box<dyn StreamInnerTrait>) -> Self {
impl StreamHandle {
/// You shouldn't be here.
pub fn new(recv: Receiver<Event>, control: Sender<()>, configuration: StreamConfiguration, format: CameraFormat) -> Self {
Self {
inner,
frame: recv,
control,
configuration,
format: Cell::new(format),
}
}
pub fn check_disconnected(&self) -> NokhwaResult<()> {
if self.inner.receiver().is_disconnected() {
return Err(NokhwaError::ReadFrameError(
"stream is disconnected!".to_string(),
))
}
Ok(())
pub fn configuration(&self) -> &StreamConfiguration {
&self.configuration
}
pub fn poll_frame(&self) -> NokhwaResult<FrameBuffer> {
self.check_disconnected()?;
self.inner
.receiver()
.recv()
.map_err(|why| NokhwaError::ReadFrameError(why.to_string()))
pub fn format(&self) -> CameraFormat {
self.format.get()
}
pub fn try_poll_frame(&self) -> NokhwaResult<Option<FrameBuffer>> {
self.check_disconnected()?;
if self.inner.receiver().is_empty() {
return Ok(None);
pub fn next_event(&self) -> Result<Event, NokhwaError> {
let event = match self.configuration.receiver {
StreamReceiverBehaviour::Blocking => {
self.frame.recv().map_or_else(|_| { Event::Closed }, |e| { e })
}
let possible_frame = self.inner
.receiver()
.try_recv();
match possible_frame {
Ok(f) => Ok(Some(f)),
Err(why) => {
StreamReceiverBehaviour::Timeout(time) => {
self.frame.recv_timeout(time).map_or_else(|_| { Event::NotReady }, |e| { e })
}
StreamReceiverBehaviour::Try => {
self.frame.try_recv().map_or_else(|why| {
match why {
TryRecvError::Empty => Ok(None),
TryRecvError::Disconnected => Err(NokhwaError::ReadFrameError(
"stream is disconnected!".to_string(),
))
TryRecvError::Empty => Event::NotReady,
TryRecvError::Disconnected => Event::Closed,
}
}, |e| { e })
}
};
if let Event::FormatChange(fmt) = event {
self.format.set(fmt);
}
return Ok(event)
}
pub fn next_frame(&self) -> Result<FrameBuffer, NokhwaError> {
loop {
let event = self.next_event()?;
match event {
Event::NewFrame(f) => return Ok(f),
Event::FormatChange(_) | Event::NotReady => continue,
Event::Terminating | Event::Closed => {
let _ = self.control.try_send(());
return Err(NokhwaError::ReadFrameError("Stream Closed.".to_string()))
}
Event::Other(why) => {
match self.configuration.on_other {
ControlFlowOnOther::Continue => continue,
ControlFlowOnOther::Break => return Err(NokhwaError::ReadFrameError(why))
}
}
}
}
}
#[cfg(feature = "async")]
pub async fn await_frame(&self) -> NokhwaResult<FrameBuffer> {
use futures::TryFutureExt;
self.check_disconnected()?;
self.inner
.receiver()
.recv_async()
.map_err(|why| NokhwaError::ReadFrameError(why.to_string())).await
pub async fn poll_event(&self) -> Result<Event, NokhwaError> {
Ok(self.frame.recv_async().await.map_or_else(|_| { Event::Closed }, |e| { if let Event::FormatChange(fmt) = e {
self.format.set(fmt);
}
e
}))
}
pub fn stop_stream(mut self) -> NokhwaResult<()> {
self.inner.stop()?;
Ok(())
// TODO: a smarter implementation? maybe?
#[cfg(feature = "async")]
pub async fn poll_next_frame(&self) -> Result<FrameBuffer, NokhwaError> {
loop {
let event = self.poll_event().await?;
match event {
Event::NewFrame(f) => return Ok(f),
Event::FormatChange(_) | Event::NotReady => continue,
Event::Terminating | Event::Closed => {
let _ = self.control.try_send(());
return Err(NokhwaError::ReadFrameError("Stream Closed.".to_string()))
}
Event::Other(why) => {
match self.configuration.on_other {
ControlFlowOnOther::Continue => continue,
ControlFlowOnOther::Break => return Err(NokhwaError::ReadFrameError(why))
}
}
}
}
}
}
impl Drop for Stream {
impl Drop for StreamHandle {
fn drop(&mut self) {
let _ = self.inner.stop();
let _ = self.control.try_send(());
}
}