specialize io::copy to use copy_file_range, splice or sendfile

Currently it only applies to linux systems. It can be extended to make use
of similar syscalls on other unix systems.
This commit is contained in:
The8472 2020-07-29 00:35:01 +02:00
parent 35debd4c11
commit 16236470c1
7 changed files with 540 additions and 80 deletions

View File

@ -1656,7 +1656,7 @@ pub fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()>
/// the length of the `to` file as reported by `metadata`.
///
/// If youre wanting to copy the contents of one file to another and youre
/// working with [`File`]s, see the [`io::copy`] function.
/// working with [`File`]s, see the [`io::copy()`] function.
///
/// # Platform-specific behavior
///

456
library/std/src/io/copy.rs Normal file
View File

@ -0,0 +1,456 @@
use crate::io::{self, ErrorKind, Read, Write};
use crate::mem::MaybeUninit;
/// Copies the entire contents of a reader into a writer.
///
/// This function will continuously read data from `reader` and then
/// write it into `writer` in a streaming fashion until `reader`
/// returns EOF.
///
/// On success, the total number of bytes that were copied from
/// `reader` to `writer` is returned.
///
/// If youre wanting to copy the contents of one file to another and youre
/// working with filesystem paths, see the [`fs::copy`] function.
///
/// [`fs::copy`]: crate::fs::copy
///
/// # Errors
///
/// This function will return an error immediately if any call to [`read`] or
/// [`write`] returns an error. All instances of [`ErrorKind::Interrupted`] are
/// handled by this function and the underlying operation is retried.
///
/// [`read`]: Read::read
/// [`write`]: Write::write
///
/// # Examples
///
/// ```
/// use std::io;
///
/// fn main() -> io::Result<()> {
/// let mut reader: &[u8] = b"hello";
/// let mut writer: Vec<u8> = vec![];
///
/// io::copy(&mut reader, &mut writer)?;
///
/// assert_eq!(&b"hello"[..], &writer[..]);
/// Ok(())
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
where
R: Read,
W: Write,
{
#[cfg(any(target_os = "linux", target_os = "android"))]
{
kernel_copy::copy_spec(reader, writer)
}
#[cfg(not(any(target_os = "linux", target_os = "android")))]
generic_copy(reader, writer)
}
pub(crate) fn generic_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
where
R: Read,
W: Write,
{
let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit();
// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
// _uninitialized_ integers, which is **undefined behavior**
//
// - Only the standard library gets to soundly "ignore" this,
// based on its privileged knowledge of unstable rustc
// internals;
unsafe {
reader.initializer().initialize(buf.assume_init_mut());
}
let mut written = 0;
loop {
let len = match reader.read(unsafe { buf.assume_init_mut() }) {
Ok(0) => return Ok(written),
Ok(len) => len,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
writer.write_all(unsafe { &buf.assume_init_ref()[..len] })?;
written += len as u64;
}
}
#[cfg(any(target_os = "linux", target_os = "android"))]
mod kernel_copy {
use crate::cmp::min;
use crate::convert::TryInto;
use crate::fs::{File, Metadata};
use crate::io::{
BufRead, BufReader, BufWriter, Read, Result, StderrLock, StdinLock, StdoutLock, Take, Write,
};
use crate::mem::ManuallyDrop;
use crate::net::TcpStream;
use crate::os::unix::fs::FileTypeExt;
use crate::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use crate::process::{ChildStderr, ChildStdin, ChildStdout};
pub(super) fn copy_spec<R: Read + ?Sized, W: Write + ?Sized>(
read: &mut R,
write: &mut W,
) -> Result<u64> {
let copier = Copier { read, write };
SpecCopy::copy(copier)
}
enum FdMeta {
Metadata(Metadata),
Socket,
Pipe,
None,
}
impl FdMeta {
fn is_fifo(&self) -> bool {
match self {
FdMeta::Metadata(meta) => meta.file_type().is_fifo(),
FdMeta::Socket => false,
FdMeta::Pipe => true,
FdMeta::None => false,
}
}
}
struct CopyParams(FdMeta, Option<RawFd>);
struct Copier<'a, 'b, R: Read + ?Sized, W: Write + ?Sized> {
pub read: &'a mut R,
pub write: &'b mut W,
}
trait SpecCopy {
fn copy(self) -> Result<u64>;
}
impl<R: Read + ?Sized, W: Write + ?Sized> SpecCopy for Copier<'_, '_, R, W> {
default fn copy(self) -> Result<u64> {
super::generic_copy(self.read, self.write)
}
}
impl<R: CopyRead, W: CopyWrite> SpecCopy for Copier<'_, '_, R, W> {
fn copy(self) -> Result<u64> {
let (reader, writer) = (self.read, self.write);
let r_cfg = reader.properties();
let w_cfg = writer.properties();
// before direct operations on file descriptors ensure that all source and sink buffers are emtpy
let mut flush = || -> crate::io::Result<u64> {
let bytes = reader.drain_to(writer, u64::MAX)?;
writer.flush()?;
Ok(bytes)
};
match (r_cfg, w_cfg) {
(
CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)),
CopyParams(FdMeta::Metadata(writer_meta), Some(writefd)),
) if reader_meta.is_file() && writer_meta.is_file() => {
let bytes_flushed = flush()?;
let max_write = reader.min_limit();
let (mut reader, mut writer) =
unsafe { (fd_as_file(readfd), fd_as_file(writefd)) };
let len = reader_meta.len();
crate::sys::fs::copy_regular_files(
&mut reader,
&mut writer,
min(len, max_write),
)
.map(|bytes_copied| bytes_copied + bytes_flushed)
}
(CopyParams(reader_meta, Some(readfd)), CopyParams(writer_meta, Some(writefd)))
if reader_meta.is_fifo() || writer_meta.is_fifo() =>
{
// splice
let bytes_flushed = flush()?;
let max_write = reader.min_limit();
let (mut reader, mut writer) =
unsafe { (fd_as_file(readfd), fd_as_file(writefd)) };
crate::sys::fs::sendfile_splice(
crate::sys::fs::SpliceMode::Splice,
&mut reader,
&mut writer,
max_write,
)
.map(|bytes_sent| bytes_sent + bytes_flushed)
}
(
CopyParams(FdMeta::Metadata(reader_meta), Some(readfd)),
CopyParams(_, Some(writefd)),
) if reader_meta.is_file() => {
// try sendfile, most modern systems it should work with any target as long as the source is a mmapable file.
// in the rare cases where it's no supported the wrapper function will fall back to a normal copy loop
let bytes_flushed = flush()?;
let (mut reader, mut writer) =
unsafe { (fd_as_file(readfd), fd_as_file(writefd)) };
let len = reader_meta.len();
let max_write = reader.min_limit();
crate::sys::fs::sendfile_splice(
crate::sys::fs::SpliceMode::Sendfile,
&mut reader,
&mut writer,
min(len, max_write),
)
.map(|bytes_sent| bytes_sent + bytes_flushed)
}
_ => super::generic_copy(reader, writer),
}
}
}
#[rustc_specialization_trait]
trait CopyRead: Read {
fn drain_to<W: Write>(&mut self, _writer: &mut W, _limit: u64) -> Result<u64> {
Ok(0)
}
/// The minimum of the limit of all `Take<_>` wrappers, `u64::MAX` otherwise.
/// This method does not account for data `BufReader` buffers and would underreport
/// the limit of a `Take<BufReader<Take<_>>>` type. Thus its result is only valid
/// after draining the buffers.
fn min_limit(&self) -> u64 {
u64::MAX
}
fn properties(&self) -> CopyParams;
}
#[rustc_specialization_trait]
trait CopyWrite: Write {
fn properties(&self) -> CopyParams;
}
impl CopyRead for File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyRead for &File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(*self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for &File {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(*self), Some(self.as_raw_fd()))
}
}
impl CopyRead for TcpStream {
fn properties(&self) -> CopyParams {
// avoid the stat syscall since we can be fairly sure it's a socket
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyRead for &TcpStream {
fn properties(&self) -> CopyParams {
// avoid the stat syscall since we can be fairly sure it's a socket
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for TcpStream {
fn properties(&self) -> CopyParams {
// avoid the stat syscall since we can be fairly sure it's a socket
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for &TcpStream {
fn properties(&self) -> CopyParams {
// avoid the stat syscall since we can be fairly sure it's a socket
CopyParams(FdMeta::Socket, Some(self.as_raw_fd()))
}
}
impl CopyWrite for ChildStdin {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Pipe, Some(self.as_raw_fd()))
}
}
impl CopyRead for ChildStdout {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Pipe, Some(self.as_raw_fd()))
}
}
impl CopyRead for ChildStderr {
fn properties(&self) -> CopyParams {
CopyParams(FdMeta::Pipe, Some(self.as_raw_fd()))
}
}
impl CopyRead for StdinLock<'_> {
fn drain_to<W: Write>(&mut self, writer: &mut W, outer_limit: u64) -> Result<u64> {
let buf_reader = self.as_mut_buf();
let buf = buf_reader.buffer();
let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))];
let bytes_drained = buf.len();
writer.write_all(buf)?;
buf_reader.consume(bytes_drained);
Ok(bytes_drained as u64)
}
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for StdoutLock<'_> {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl CopyWrite for StderrLock<'_> {
fn properties(&self) -> CopyParams {
CopyParams(fd_to_meta(self), Some(self.as_raw_fd()))
}
}
impl<T: CopyRead> CopyRead for Take<T> {
fn drain_to<W: Write>(&mut self, writer: &mut W, outer_limit: u64) -> Result<u64> {
let local_limit = self.limit();
let combined_limit = min(outer_limit, local_limit);
let bytes_drained = self.get_mut().drain_to(writer, combined_limit)?;
// update limit since read() was bypassed
self.set_limit(local_limit - bytes_drained);
Ok(bytes_drained)
}
fn min_limit(&self) -> u64 {
min(Take::limit(self), self.get_ref().min_limit())
}
fn properties(&self) -> CopyParams {
self.get_ref().properties()
}
}
impl<T: CopyRead> CopyRead for BufReader<T> {
fn drain_to<W: Write>(&mut self, writer: &mut W, outer_limit: u64) -> Result<u64> {
let buf = self.buffer();
let buf = &buf[0..min(buf.len(), outer_limit.try_into().unwrap_or(usize::MAX))];
let bytes = buf.len();
writer.write_all(buf)?;
self.consume(bytes);
let remaining = outer_limit - bytes as u64;
// in case of nested bufreaders we also need to drain the ones closer to the source
let inner_bytes = self.get_mut().drain_to(writer, remaining)?;
Ok(bytes as u64 + inner_bytes)
}
fn min_limit(&self) -> u64 {
self.get_ref().min_limit()
}
fn properties(&self) -> CopyParams {
self.get_ref().properties()
}
}
impl<T: CopyWrite> CopyWrite for BufWriter<T> {
fn properties(&self) -> CopyParams {
self.get_ref().properties()
}
}
fn fd_to_meta<T: AsRawFd>(fd: &T) -> FdMeta {
let fd = fd.as_raw_fd();
let file: ManuallyDrop<File> = ManuallyDrop::new(unsafe { File::from_raw_fd(fd) });
match file.metadata() {
Ok(meta) => FdMeta::Metadata(meta),
Err(_) => FdMeta::None,
}
}
unsafe fn fd_as_file(fd: RawFd) -> ManuallyDrop<File> {
ManuallyDrop::new(File::from_raw_fd(fd))
}
#[cfg(test)]
mod tests {
use crate::io::{BufRead, BufReader, BufWriter, Read, Result, Seek, SeekFrom, Write};
#[test]
fn copy_specialization() -> Result<()> {
let path = crate::env::temp_dir();
let source_path = path.join("copy-spec.source");
let sink_path = path.join("copy-spec.sink");
let result: Result<()> = try {
let mut source = crate::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&source_path)?;
source.write_all(b"abcdefghiklmnopqr")?;
source.seek(SeekFrom::Start(8))?;
let mut source = BufReader::with_capacity(8, source.take(5));
source.fill_buf()?;
assert_eq!(source.buffer(), b"iklmn");
source.get_mut().set_limit(6);
source.get_mut().get_mut().seek(SeekFrom::Start(1))?; // "bcdefg"
let mut source = source.take(10); // "iklmnbcdef"
let mut sink = crate::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(true)
.open(&sink_path)?;
sink.write_all(b"000000")?;
let mut sink = BufWriter::with_capacity(5, sink);
sink.write_all(b"wxyz")?;
assert_eq!(sink.buffer(), b"wxyz");
let copied = crate::io::copy(&mut source, &mut sink)?;
assert_eq!(copied, 10);
assert_eq!(sink.buffer().len(), 0);
let mut sink = sink.into_inner()?;
sink.seek(SeekFrom::Start(0))?;
let mut copied = Vec::new();
sink.read_to_end(&mut copied)?;
assert_eq!(&copied, b"000000wxyziklmnbcdef");
};
let rm1 = crate::fs::remove_file(source_path);
let rm2 = crate::fs::remove_file(sink_path);
result.and(rm1).and(rm2)
}
}
}

View File

@ -266,6 +266,8 @@ pub use self::buffered::IntoInnerError;
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::buffered::{BufReader, BufWriter, LineWriter};
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::copy::copy;
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::cursor::Cursor;
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::error::{Error, ErrorKind, Result};
@ -279,9 +281,12 @@ pub use self::stdio::{_eprint, _print};
#[doc(no_inline, hidden)]
pub use self::stdio::{set_panic, set_print};
#[stable(feature = "rust1", since = "1.0.0")]
pub use self::util::{copy, empty, repeat, sink, Empty, Repeat, Sink};
pub use self::util::{empty, repeat, sink, Empty, Repeat, Sink};
pub(crate) use self::copy::generic_copy;
mod buffered;
mod copy;
mod cursor;
mod error;
mod impls;

View File

@ -409,6 +409,12 @@ impl Read for Stdin {
}
}
impl StdinLock<'_> {
pub(crate) fn as_mut_buf(&mut self) -> &mut BufReader<impl Read> {
&mut self.inner
}
}
#[stable(feature = "rust1", since = "1.0.0")]
impl Read for StdinLock<'_> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {

View File

@ -4,78 +4,7 @@
mod tests;
use crate::fmt;
use crate::io::{self, BufRead, ErrorKind, Initializer, IoSlice, IoSliceMut, Read, Write};
use crate::mem::MaybeUninit;
/// Copies the entire contents of a reader into a writer.
///
/// This function will continuously read data from `reader` and then
/// write it into `writer` in a streaming fashion until `reader`
/// returns EOF.
///
/// On success, the total number of bytes that were copied from
/// `reader` to `writer` is returned.
///
/// If youre wanting to copy the contents of one file to another and youre
/// working with filesystem paths, see the [`fs::copy`] function.
///
/// [`fs::copy`]: crate::fs::copy
///
/// # Errors
///
/// This function will return an error immediately if any call to [`read`] or
/// [`write`] returns an error. All instances of [`ErrorKind::Interrupted`] are
/// handled by this function and the underlying operation is retried.
///
/// [`read`]: Read::read
/// [`write`]: Write::write
///
/// # Examples
///
/// ```
/// use std::io;
///
/// fn main() -> io::Result<()> {
/// let mut reader: &[u8] = b"hello";
/// let mut writer: Vec<u8> = vec![];
///
/// io::copy(&mut reader, &mut writer)?;
///
/// assert_eq!(&b"hello"[..], &writer[..]);
/// Ok(())
/// }
/// ```
#[stable(feature = "rust1", since = "1.0.0")]
pub fn copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> io::Result<u64>
where
R: Read,
W: Write,
{
let mut buf = MaybeUninit::<[u8; super::DEFAULT_BUF_SIZE]>::uninit();
// FIXME: #42788
//
// - This creates a (mut) reference to a slice of
// _uninitialized_ integers, which is **undefined behavior**
//
// - Only the standard library gets to soundly "ignore" this,
// based on its privileged knowledge of unstable rustc
// internals;
unsafe {
reader.initializer().initialize(buf.assume_init_mut());
}
let mut written = 0;
loop {
let len = match reader.read(unsafe { buf.assume_init_mut() }) {
Ok(0) => return Ok(written),
Ok(len) => len,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
writer.write_all(unsafe { &buf.assume_init_ref()[..len] })?;
written += len as u64;
}
}
use crate::io::{self, BufRead, Initializer, IoSlice, IoSliceMut, Read, Write};
/// A reader which is always at EOF.
///

View File

@ -316,6 +316,7 @@
#![feature(toowned_clone_into)]
#![feature(total_cmp)]
#![feature(trace_macros)]
#![feature(try_blocks)]
#![feature(try_reserve)]
#![feature(unboxed_closures)]
#![feature(unsafe_block_in_unsafe_fn)]

View File

@ -1191,6 +1191,21 @@ pub fn copy(from: &Path, to: &Path) -> io::Result<u64> {
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn copy(from: &Path, to: &Path) -> io::Result<u64> {
let (mut reader, reader_metadata) = open_from(from)?;
let max_len = u64::MAX;
let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?;
copy_regular_files(&mut reader, &mut writer, max_len)
}
/// linux-specific implementation that will attempt to use copy_file_range for copy offloading
/// as the name says, it only works on regular files
#[cfg(any(target_os = "linux", target_os = "android"))]
pub(crate) fn copy_regular_files(
reader: &mut crate::fs::File,
writer: &mut crate::fs::File,
max_len: u64,
) -> io::Result<u64> {
use crate::cmp;
use crate::sync::atomic::{AtomicBool, Ordering};
@ -1209,10 +1224,6 @@ pub fn copy(from: &Path, to: &Path) -> io::Result<u64> {
libc::syscall(libc::SYS_copy_file_range, fd_in, off_in, fd_out, off_out, len, flags)
}
let (mut reader, reader_metadata) = open_from(from)?;
let max_len = u64::MAX;
let (mut writer, _) = open_to_and_set_permissions(to, reader_metadata)?;
let has_copy_file_range = HAS_COPY_FILE_RANGE.load(Ordering::Relaxed);
let mut written = 0u64;
while written < max_len {
@ -1249,7 +1260,7 @@ pub fn copy(from: &Path, to: &Path) -> io::Result<u64> {
// - reading virtual files from the proc filesystem which appear to have 0 size
// but are not empty. noted in coreutils to affect kernels at least up to 5.6.19.
// - copying from an overlay filesystem in docker. reported to occur on fedora 32.
return io::copy(&mut reader, &mut writer);
return io::copy(reader, writer);
}
Ok(0) => return Ok(written), // reached EOF
Ok(ret) => written += ret as u64,
@ -1265,7 +1276,59 @@ pub fn copy(from: &Path, to: &Path) -> io::Result<u64> {
// - copy_file_range is disallowed, for example by seccomp (EPERM)
// - copy_file_range cannot be used with pipes or device nodes (EINVAL)
assert_eq!(written, 0);
return io::copy(&mut reader, &mut writer);
return io::generic_copy(reader, writer);
}
_ => return Err(err),
}
}
}
}
Ok(written)
}
pub(crate) enum SpliceMode {
Sendfile,
Splice,
}
/// performs splice or sendfile between file descriptors
#[cfg(any(target_os = "linux", target_os = "android"))]
pub(crate) fn sendfile_splice(
mode: SpliceMode,
reader: &mut crate::fs::File,
writer: &mut crate::fs::File,
len: u64,
) -> io::Result<u64> {
let mut written = 0u64;
while written < len {
let chunk_size = crate::cmp::min(len - written, 0x7ffff000_u64) as usize;
let result = match mode {
SpliceMode::Sendfile => cvt(unsafe {
libc::sendfile(writer.as_raw_fd(), reader.as_raw_fd(), ptr::null_mut(), chunk_size)
}),
SpliceMode::Splice => cvt(unsafe {
libc::splice(
reader.as_raw_fd(),
ptr::null_mut(),
writer.as_raw_fd(),
ptr::null_mut(),
chunk_size,
0,
)
}),
};
match result {
Ok(0) => break, // EOF
Ok(ret) => written += ret as u64,
Err(err) => {
match err.raw_os_error() {
Some(os_err) if os_err == libc::EINVAL => {
// Try fallback io::copy if splice/sendfile do not support this particular
// file descritor (EINVAL)
assert_eq!(written, 0);
return io::generic_copy(reader, writer);
}
_ => return Err(err),
}