From 708100767848cf0bd52047ab552f945768164ab2 Mon Sep 17 00:00:00 2001 From: Raphael Speyer Date: Mon, 13 Oct 2014 22:09:30 +1100 Subject: [PATCH] impl Buffer for ChanReader --- src/libstd/io/comm_adapters.rs | 75 +++++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 15 deletions(-) diff --git a/src/libstd/io/comm_adapters.rs b/src/libstd/io/comm_adapters.rs index 5bec131f222..bd9577c8cfc 100644 --- a/src/libstd/io/comm_adapters.rs +++ b/src/libstd/io/comm_adapters.rs @@ -13,10 +13,10 @@ use cmp; use collections::Collection; use comm::{Sender, Receiver}; use io; -use option::{None, Option, Some}; +use option::{None, Some}; use result::{Ok, Err}; use slice::{bytes, CloneableVector}; -use super::{Reader, Writer, IoResult}; +use super::{Buffer, Reader, Writer, IoResult}; use vec::Vec; /// Allows reading from a rx. @@ -37,7 +37,7 @@ use vec::Vec; /// } /// ``` pub struct ChanReader { - buf: Option>, // A buffer of bytes received but not consumed. + buf: Vec, // A buffer of bytes received but not consumed. pos: uint, // How many of the buffered bytes have already be consumed. rx: Receiver>, // The Receiver to pull data from. closed: bool, // Whether the channel this Receiver connects to has been closed. @@ -47,7 +47,7 @@ impl ChanReader { /// Wraps a `Port` in a `ChanReader` structure pub fn new(rx: Receiver>) -> ChanReader { ChanReader { - buf: None, + buf: Vec::new(), pos: 0, rx: rx, closed: false, @@ -55,27 +55,51 @@ impl ChanReader { } } +impl Buffer for ChanReader { + fn fill_buf<'a>(&'a mut self) -> IoResult<&'a [u8]> { + if self.pos >= self.buf.len() { + self.pos = 0; + match self.rx.recv_opt() { + Ok(bytes) => { + self.buf = bytes; + }, + Err(()) => { + self.closed = true; + self.buf = Vec::new(); + } + } + } + if self.closed { + Err(io::standard_error(io::EndOfFile)) + } else { + Ok(self.buf.slice_from(self.pos)) + } + } + + fn consume(&mut self, amt: uint) { + self.pos += amt; + assert!(self.pos <= self.buf.len()); + } +} + impl Reader for ChanReader { fn read(&mut self, buf: &mut [u8]) -> IoResult { let mut num_read = 0; loop { - match self.buf { - Some(ref prev) => { + let count = match self.fill_buf().ok() { + Some(src) => { let dst = buf[mut num_read..]; - let src = prev[self.pos..]; - let count = cmp::min(dst.len(), src.len()); + let count = cmp::min(src.len(), dst.len()); bytes::copy_memory(dst, src[..count]); - num_read += count; - self.pos += count; + count }, - None => (), + None => 0, }; + self.consume(count); + num_read += count; if num_read == buf.len() || self.closed { break; } - self.pos = 0; - self.buf = self.rx.recv_opt().ok(); - self.closed = self.buf.is_none(); } if self.closed && num_read == 0 { Err(io::standard_error(io::EndOfFile)) @@ -149,7 +173,6 @@ mod test { let mut reader = ChanReader::new(rx); let mut buf = [0u8, ..3]; - assert_eq!(Ok(0), reader.read([])); assert_eq!(Ok(3), reader.read(buf)); @@ -178,6 +201,28 @@ mod test { assert_eq!(a, buf.as_slice()); } + #[test] + fn test_rx_buffer() { + let (tx, rx) = channel(); + task::spawn(proc() { + tx.send(b"he".to_vec()); + tx.send(b"llo wo".to_vec()); + tx.send(b"".to_vec()); + tx.send(b"rld\nhow ".to_vec()); + tx.send(b"are you?".to_vec()); + tx.send(b"".to_vec()); + }); + + let mut reader = ChanReader::new(rx); + + assert_eq!(Ok("hello world\n".to_string()), reader.read_line()); + assert_eq!(Ok("how are you?".to_string()), reader.read_line()); + match reader.read_line() { + Ok(..) => fail!(), + Err(e) => assert_eq!(e.kind, io::EndOfFile), + } + } + #[test] fn test_chan_writer() { let (tx, rx) = channel();