Module std::sync::mpscStable
[-] [+]
[src]
Multi-producer, single-consumer FIFO queue communication primitives.
This module provides message-based communication over channels, concretely defined among three types:
Sender
SyncSender
Receiver
A Sender
or SyncSender
is used to send data to a Receiver
. Both
senders are clone-able (multi-producer) such that many threads can send
simultaneously to one receiver (single-consumer).
These channels come in two flavors:
An asynchronous, infinitely buffered channel. The
channel()
function will return a(Sender, Receiver)
tuple where all sends will be asynchronous (they never block). The channel conceptually has an infinite buffer.A synchronous, bounded channel. The
sync_channel()
function will return a(SyncSender, Receiver)
tuple where the storage for pending messages is a pre-allocated buffer of a fixed size. All sends will be synchronous by blocking until there is buffer space available. Note that a bound of 0 is allowed, causing the channel to become a "rendezvous" channel where each sender atomically hands off a message to a receiver.
Disconnection
The send and receive operations on channels will all return a Result
indicating whether the operation succeeded or not. An unsuccessful operation
is normally indicative of the other half of a channel having "hung up" by
being dropped in its corresponding thread.
Once half of a channel has been deallocated, most operations can no longer
continue to make progress, so Err
will be returned. Many applications will
continue to unwrap()
the results returned from this module, instigating a
propagation of failure among threads if one unexpectedly dies.
Examples
Simple usage:
fn main() { use std::thread; use std::sync::mpsc::channel; // Create a simple streaming channel let (tx, rx) = channel(); thread::spawn(move|| { tx.send(10).unwrap(); }); assert_eq!(rx.recv().unwrap(), 10); }use std::thread; use std::sync::mpsc::channel; // Create a simple streaming channel let (tx, rx) = channel(); thread::spawn(move|| { tx.send(10).unwrap(); }); assert_eq!(rx.recv().unwrap(), 10);
Shared usage:
fn main() { use std::thread; use std::sync::mpsc::channel; // Create a shared channel that can be sent along from many threads // where tx is the sending half (tx for transmission), and rx is the receiving // half (rx for receiving). let (tx, rx) = channel(); for i in 0..10 { let tx = tx.clone(); thread::spawn(move|| { tx.send(i).unwrap(); }); } for _ in 0..10 { let j = rx.recv().unwrap(); assert!(0 <= j && j < 10); } }use std::thread; use std::sync::mpsc::channel; // Create a shared channel that can be sent along from many threads // where tx is the sending half (tx for transmission), and rx is the receiving // half (rx for receiving). let (tx, rx) = channel(); for i in 0..10 { let tx = tx.clone(); thread::spawn(move|| { tx.send(i).unwrap(); }); } for _ in 0..10 { let j = rx.recv().unwrap(); assert!(0 <= j && j < 10); }
Propagating panics:
fn main() { use std::sync::mpsc::channel; // The call to recv() will return an error because the channel has already // hung up (or been deallocated) let (tx, rx) = channel::<int>(); drop(tx); assert!(rx.recv().is_err()); }use std::sync::mpsc::channel; // The call to recv() will return an error because the channel has already // hung up (or been deallocated) let (tx, rx) = channel::<int>(); drop(tx); assert!(rx.recv().is_err());
Synchronous channels:
fn main() { use std::thread; use std::sync::mpsc::sync_channel; let (tx, rx) = sync_channel::<int>(0); thread::spawn(move|| { // This will wait for the parent task to start receiving tx.send(53).unwrap(); }); rx.recv().unwrap(); }use std::thread; use std::sync::mpsc::sync_channel; let (tx, rx) = sync_channel::<int>(0); thread::spawn(move|| { // This will wait for the parent task to start receiving tx.send(53).unwrap(); }); rx.recv().unwrap();
Reading from a channel with a timeout requires to use a Timer together with the channel. You can use the select! macro to select either and handle the timeout case. This first example will break out of the loop after 10 seconds no matter what:
fn main() { use std::sync::mpsc::channel; use std::old_io::timer::Timer; use std::time::Duration; let (tx, rx) = channel::<int>(); let mut timer = Timer::new().unwrap(); let timeout = timer.oneshot(Duration::seconds(10)); loop { select! { val = rx.recv() => println!("Received {}", val.unwrap()), _ = timeout.recv() => { println!("timed out, total time was more than 10 seconds"); break; } } } }use std::sync::mpsc::channel; use std::old_io::timer::Timer; use std::time::Duration; let (tx, rx) = channel::<int>(); let mut timer = Timer::new().unwrap(); let timeout = timer.oneshot(Duration::seconds(10)); loop { select! { val = rx.recv() => println!("Received {}", val.unwrap()), _ = timeout.recv() => { println!("timed out, total time was more than 10 seconds"); break; } } }
This second example is more costly since it allocates a new timer every time a message is received, but it allows you to timeout after the channel has been inactive for 5 seconds:
fn main() { use std::sync::mpsc::channel; use std::old_io::timer::Timer; use std::time::Duration; let (tx, rx) = channel::<int>(); let mut timer = Timer::new().unwrap(); loop { let timeout = timer.oneshot(Duration::seconds(5)); select! { val = rx.recv() => println!("Received {}", val.unwrap()), _ = timeout.recv() => { println!("timed out, no message received in 5 seconds"); break; } } } }use std::sync::mpsc::channel; use std::old_io::timer::Timer; use std::time::Duration; let (tx, rx) = channel::<int>(); let mut timer = Timer::new().unwrap(); loop { let timeout = timer.oneshot(Duration::seconds(5)); select! { val = rx.recv() => println!("Received {}", val.unwrap()), _ = timeout.recv() => { println!("timed out, no message received in 5 seconds"); break; } } }
Structs
Handle | A handle to a receiver which is currently a member of a |
Iter | An iterator over messages on a receiver, this iterator will block
whenever |
Receiver | The receiving-half of Rust's channel type. This half can only be owned by one task |
RecvError | An error returned from the |
Select | The "receiver set" of the select interface. This structure is used to manage a set of receivers which are being selected over. |
SendError | An error returned from the |
Sender | The sending-half of Rust's asynchronous channel type. This half can only be owned by one task, but it can be cloned to send to other tasks. |
SyncSender | The sending-half of Rust's synchronous channel type. This half can only be owned by one task, but it can be cloned to send to other tasks. |
Enums
TryRecvError | This enumeration is the list of the possible reasons that try_recv could not return data when called. |
TrySendError | This enumeration is the list of the possible error outcomes for the
|
Functions
channel | Creates a new asynchronous channel, returning the sender/receiver halves. |
sync_channel | Creates a new synchronous, bounded channel. |