Auto merge of #116132 - darthunix:connect_poll, r=cuviper

Make TCP connect handle EINTR correctly

According to the [POSIX](https://pubs.opengroup.org/onlinepubs/009695399/functions/connect.html) standard, if connect() is interrupted by a signal that is caught while blocked waiting to establish a connection, connect() shall fail and set errno to EINTR, but the connection request shall not be aborted, and the connection shall be established asynchronously. When the connection has been established asynchronously, select() and poll() shall indicate that the file descriptor for the socket is ready for writing.

The previous implementation differs from the recomendation: in a case of the EINTR we tried to reconnect in a loop and sometimes get EISCONN error (this problem was originally detected on MacOS).

1. More details about the problem in an [article](http://www.madore.org/~david/computers/connect-intr.html).
2. The original [issue](https://git.picodata.io/picodata/picodata/tarantool-module/-/issues/157).
This commit is contained in:
bors 2023-10-19 11:22:28 +00:00
commit 3fbcfd2b6f
5 changed files with 38 additions and 12 deletions

View File

@ -56,6 +56,12 @@ impl Socket {
unimplemented!() unimplemented!()
} }
pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
let (addr, len) = addr.into_inner();
cvt_r(|| unsafe { netc::connect(self.as_raw_fd(), addr.as_ptr(), len) })?;
Ok(())
}
pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> { pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> {
self.set_nonblocking(true)?; self.set_nonblocking(true)?;
let r = unsafe { let r = unsafe {

View File

@ -233,12 +233,15 @@ impl Socket {
} }
} }
pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
let (addr, len) = addr.into_inner();
cvt(unsafe { netc::connect(self.0.raw(), addr.as_ptr(), len) })?;
Ok(())
}
pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> { pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> {
self.set_nonblocking(true)?; self.set_nonblocking(true)?;
let r = unsafe { let r = self.connect(addr);
let (addr, len) = addr.into_inner();
cvt(netc::connect(self.0.raw(), addr.as_ptr(), len))
};
self.set_nonblocking(false)?; self.set_nonblocking(false)?;
match r { match r {

View File

@ -6,6 +6,7 @@ use crate::net::{Shutdown, SocketAddr};
use crate::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use crate::str; use crate::str;
use crate::sys::fd::FileDesc; use crate::sys::fd::FileDesc;
use crate::sys::unix::IsMinusOne;
use crate::sys_common::net::{getsockopt, setsockopt, sockaddr_to_addr}; use crate::sys_common::net::{getsockopt, setsockopt, sockaddr_to_addr};
use crate::sys_common::{AsInner, FromInner, IntoInner}; use crate::sys_common::{AsInner, FromInner, IntoInner};
use crate::time::{Duration, Instant}; use crate::time::{Duration, Instant};
@ -140,6 +141,22 @@ impl Socket {
unimplemented!() unimplemented!()
} }
pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
let (addr, len) = addr.into_inner();
loop {
let result = unsafe { libc::connect(self.as_raw_fd(), addr.as_ptr(), len) };
if result.is_minus_one() {
let err = crate::sys::os::errno();
match err {
libc::EINTR => continue,
libc::EISCONN => return Ok(()),
_ => return Err(io::Error::from_raw_os_error(err)),
}
}
return Ok(());
}
}
pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> { pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> {
self.set_nonblocking(true)?; self.set_nonblocking(true)?;
let r = unsafe { let r = unsafe {

View File

@ -140,13 +140,15 @@ impl Socket {
} }
} }
pub fn connect(&self, addr: &SocketAddr) -> io::Result<()> {
let (addr, len) = addr.into_inner();
let result = unsafe { c::connect(self.as_raw(), addr.as_ptr(), len) };
cvt(result).map(drop)
}
pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> { pub fn connect_timeout(&self, addr: &SocketAddr, timeout: Duration) -> io::Result<()> {
self.set_nonblocking(true)?; self.set_nonblocking(true)?;
let result = { let result = self.connect(addr);
let (addr, len) = addr.into_inner();
let result = unsafe { c::connect(self.as_raw(), addr.as_ptr(), len) };
cvt(result).map(drop)
};
self.set_nonblocking(false)?; self.set_nonblocking(false)?;
match result { match result {

View File

@ -226,9 +226,7 @@ impl TcpStream {
init(); init();
let sock = Socket::new(addr, c::SOCK_STREAM)?; let sock = Socket::new(addr, c::SOCK_STREAM)?;
sock.connect(addr)?;
let (addr, len) = addr.into_inner();
cvt_r(|| unsafe { c::connect(sock.as_raw(), addr.as_ptr(), len) })?;
Ok(TcpStream { inner: sock }) Ok(TcpStream { inner: sock })
} }