Index: sys/kern/uipc_socket.c =================================================================== RCS file: /mount/openbsd/cvs/src/sys/kern/uipc_socket.c,v diff -u -p -u -p -r1.338 uipc_socket.c --- sys/kern/uipc_socket.c 14 Jul 2024 15:42:23 -0000 1.338 +++ sys/kern/uipc_socket.c 18 Jul 2024 19:15:26 -0000 @@ -65,6 +65,7 @@ void sotask(void *); void soreaper(void *); void soput(void *); int somove(struct socket *, int); +int somove_dgram(struct socket *, int); void sorflush(struct socket *); void filt_sordetach(struct knote *kn); @@ -324,31 +325,22 @@ sofree(struct socket *so, int keep_lock) sounlock(head); } - if (persocket) { + switch (so->so_proto->pr_domain->dom_family) { + case AF_INET: + case AF_INET6: + if (so->so_proto->pr_type == SOCK_STREAM) + break; + /* FALLTHROUGH */ + default: sounlock(so); refcnt_finalize(&so->so_refcnt, "sofinal"); solock(so); + break; } sigio_free(&so->so_sigio); klist_free(&so->so_rcv.sb_klist); klist_free(&so->so_snd.sb_klist); -#ifdef SOCKET_SPLICE - if (issplicedback(so)) { - int freeing = SOSP_FREEING_WRITE; - - if (so->so_sp->ssp_soback == so) - freeing |= SOSP_FREEING_READ; - sounsplice(so->so_sp->ssp_soback, so, freeing); - } - if (isspliced(so)) { - int freeing = SOSP_FREEING_READ; - - if (so == so->so_sp->ssp_socket) - freeing |= SOSP_FREEING_WRITE; - sounsplice(so, so->so_sp->ssp_socket, freeing); - } -#endif /* SOCKET_SPLICE */ mtx_enter(&so->so_snd.sb_mtx); sbrelease(so, &so->so_snd); @@ -458,6 +450,79 @@ discard: if (so->so_state & SS_NOFDREF) panic("soclose NOFDREF: so %p, so_type %d", so, so->so_type); so->so_state |= SS_NOFDREF; + +#ifdef SOCKET_SPLICE + if (so->so_sp) { + if (so->so_proto->pr_flags & PR_WANTRCVD) { + /* + * XXXSMP: Copy - Paste, but can't relock and + * sleep in sofree() in tcp(4) case. + */ + + if (issplicedback(so)) { + int freeing = SOSP_FREEING_WRITE; + + if (so->so_sp->ssp_soback == so) + freeing |= SOSP_FREEING_READ; + sounsplice(so->so_sp->ssp_soback, so, freeing); + } + if (isspliced(so)) { + int freeing = SOSP_FREEING_READ; + + if (so == so->so_sp->ssp_socket) + freeing |= SOSP_FREEING_WRITE; + sounsplice(so, so->so_sp->ssp_socket, freeing); + } + goto free; + } + + sounlock(so); + mtx_enter(&so->so_snd.sb_mtx); + /* + * Concurrent sounsplice() locks `sb_mtx' mutextes on + * both `so_snd' and `so_rcv' before unsplice sockets. + */ + if ((so2 = so->so_sp->ssp_soback) == NULL) { + mtx_leave(&so->so_snd.sb_mtx); + goto notsplicedback; + } + soref(so2); + mtx_leave(&so->so_snd.sb_mtx); + + sblock(&so2->so_rcv, SBL_WAIT | SBL_NOINTR); + /* + * sblock() is always taken on `so_rcv' before call + * sounsplice(). `so' is dying and can be only unspliced. + */ + if (issplicedback(so)) { + int freeing = SOSP_FREEING_WRITE; + + if (so->so_sp->ssp_soback == so) + freeing |= SOSP_FREEING_READ; + solock(so2); + sounsplice(so->so_sp->ssp_soback, so, freeing); + sounlock(so2); + } + sbunlock(&so2->so_rcv); + sorele(so2); + +notsplicedback: + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); + if (isspliced(so)) { + int freeing = SOSP_FREEING_READ; + + if (so == so->so_sp->ssp_socket) + freeing |= SOSP_FREEING_WRITE; + solock(so); + sounsplice(so, so->so_sp->ssp_socket, freeing); + sounlock(so); + } + sbunlock(&so->so_rcv); + + solock(so); + } +free: +#endif /* SOCKET_SPLICE */ /* sofree() calls sounlock(). */ sofree(so, 0); return (error); @@ -1318,7 +1383,7 @@ sosplice(struct socket *so, int fd, off_ struct file *fp; struct socket *sosp; struct taskq *tq; - int error = 0; + int error = 0, somoveret; if ((so->so_proto->pr_flags & PR_SPLICE) == 0) return (EPROTONOSUPPORT); @@ -1411,14 +1476,6 @@ sosplice(struct socket *so, int fd, off_ goto release; } - /* Splice so and sosp together. */ - mtx_enter(&so->so_rcv.sb_mtx); - mtx_enter(&sosp->so_snd.sb_mtx); - so->so_sp->ssp_socket = sosp; - sosp->so_sp->ssp_soback = so; - mtx_leave(&sosp->so_snd.sb_mtx); - mtx_leave(&so->so_rcv.sb_mtx); - so->so_splicelen = 0; so->so_splicemax = max; if (tv) @@ -1428,11 +1485,23 @@ sosplice(struct socket *so, int fd, off_ timeout_set_proc(&so->so_idleto, soidle, so); task_set(&so->so_splicetask, sotask, so); - /* - * To prevent softnet interrupt from calling somove() while - * we sleep, the socket buffers are not marked as spliced yet. - */ - if (somove(so, M_WAIT)) { + /* Splice so and sosp together. */ + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + so->so_sp->ssp_socket = sosp; + sosp->so_sp->ssp_soback = so; + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + + if (so->so_proto->pr_flags & PR_WANTRCVD) + somoveret = somove(so, M_WAIT); + else { + sounlock(so); + somoveret = somove_dgram(so, M_WAIT); + solock(so); + } + + if (somoveret) { mtx_enter(&so->so_rcv.sb_mtx); mtx_enter(&sosp->so_snd.sb_mtx); so->so_rcv.sb_flags |= SB_SPLICE; @@ -1454,6 +1523,8 @@ sosplice(struct socket *so, int fd, off_ void sounsplice(struct socket *so, struct socket *sosp, int freeing) { + if ((so->so_proto->pr_flags & PR_WANTRCVD) == 0) + sbassertlocked(&so->so_rcv); soassertlocked(so); task_del(sosplice_taskq, &so->so_splicetask); @@ -1479,32 +1550,55 @@ soidle(void *arg) { struct socket *so = arg; + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); solock(so); + /* + * Depending on socket type, sblock(&so->so_rcv) or solock() + * is always held while modifying SB_SPLICE and + * so->so_sp->ssp_socket. + */ if (so->so_rcv.sb_flags & SB_SPLICE) { so->so_error = ETIMEDOUT; sounsplice(so, so->so_sp->ssp_socket, 0); } sounlock(so); + sbunlock(&so->so_rcv); } void sotask(void *arg) { struct socket *so = arg; + int doyield = 0; - solock(so); - if (so->so_rcv.sb_flags & SB_SPLICE) { + sblock(&so->so_rcv, SBL_WAIT | SBL_NOINTR); + if (so->so_proto->pr_flags & PR_WANTRCVD) { + solock(so); + /* + * solock() is always held while modifying SB_SPLICE + * and so->so_sp->ssp_socket. + */ + if (so->so_rcv.sb_flags & SB_SPLICE) { + somove(so, M_DONTWAIT); + doyield = 1; + } + sounlock(so); + } else { /* - * We may not sleep here as sofree() and unsplice() may be - * called from softnet interrupt context. This would remove - * the socket during somove(). + * sblock(&so->so_rcv) is always held while modifying + * SB_SPLICE and so->so_sp->ssp_socket. UDP sockets + * do not modify `so_snd' buffer in sending path, no + * need to lock it. */ - somove(so, M_DONTWAIT); + if (so->so_rcv.sb_flags & SB_SPLICE) + somove_dgram(so, M_DONTWAIT); } - sounlock(so); + sbunlock(&so->so_rcv); - /* Avoid user land starvation. */ - yield(); + if (doyield) { + /* Avoid user land starvation. */ + yield(); + } } /* @@ -1549,7 +1643,7 @@ somove(struct socket *so, int wait) int error = 0, maxreached = 0; unsigned int rcvstate; - soassertlocked(so); + soassertlocked_readonly(so); nextpkt: if (so->so_error) { @@ -1830,6 +1924,226 @@ somove(struct socket *so, int wait) return (1); } +int +somove_dgram(struct socket *so, int wait) +{ + struct socket *sosp = so->so_sp->ssp_socket; + struct mbuf *m, **mp, *nextrecord; + u_long len, off; + long space; + int error = 0, maxreached = 0, unsplice = 0; + + sbassertlocked(&so->so_rcv); + + nextpkt: + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + + if ((error = READ_ONCE(so->so_error))) + goto release; + if (sosp->so_snd.sb_state & SS_CANTSENDMORE) { + error = EPIPE; + goto release; + } + + error = READ_ONCE(sosp->so_error); + if (error && error != ETIMEDOUT && error != EFBIG && error != ELOOP) + goto release; + if ((sosp->so_state & SS_ISCONNECTED) == 0) + goto release; + + /* Calculate how many bytes can be copied now. */ + len = so->so_rcv.sb_datacc; + if (so->so_splicemax) { + KASSERT(so->so_splicelen < so->so_splicemax); + if (so->so_splicemax <= so->so_splicelen + len) { + len = so->so_splicemax - so->so_splicelen; + maxreached = 1; + } + } + space = sbspace_locked(sosp, &sosp->so_snd); + if (space <= 0) { + maxreached = 0; + goto release; + } + if (space < len) { + maxreached = 0; + if (space < sosp->so_snd.sb_lowat) { + goto release; + } + len = space; + } + + SBLASTRECORDCHK(&so->so_rcv, "somove_dgram 1"); + SBLASTMBUFCHK(&so->so_rcv, "somove_dgram 1"); + m = so->so_rcv.sb_mb; + if (m == NULL) + goto release; + nextrecord = m->m_nextpkt; + + /* Drop address and control information not used with splicing. */ +#ifdef DIAGNOSTIC + if (m->m_type != MT_SONAME) + panic("somove_dgram soname: so %p, so_type %d, m %p, " + "m_type %d", so, so->so_type, m, m->m_type); +#endif + m = m->m_next; + + while (m && m->m_type == MT_CONTROL) + m = m->m_next; + if (m == NULL) { + mtx_leave(&sosp->so_snd.sb_mtx); + sbdroprecord(so, &so->so_rcv); + mtx_leave(&so->so_rcv.sb_mtx); + goto nextpkt; + } + + /* + * By splicing sockets connected to localhost, userland might create a + * loop. Dissolve splicing with error if loop is detected by counter. + * + * If we deal with looped broadcast/multicast packet we bail out with + * no error to suppress splice termination. + */ + if ((m->m_flags & M_PKTHDR) && + ((m->m_pkthdr.ph_loopcnt++ >= M_MAXLOOP) || + ((m->m_flags & M_LOOP) && (m->m_flags & (M_BCAST|M_MCAST))))) { + error = ELOOP; + goto release; + } + + if ((m->m_flags & M_PKTHDR) == 0) + panic("somove_dgram !PKTHDR: so %p, so_type %d, m %p, " + "m_type %d", so, so->so_type, m, m->m_type); + if (sosp->so_snd.sb_hiwat < m->m_pkthdr.len) { + error = EMSGSIZE; + goto release; + } + if (len < m->m_pkthdr.len) + goto release; + if (m->m_pkthdr.len < len) { + maxreached = 0; + len = m->m_pkthdr.len; + } + /* + * Throw away the name mbuf after it has been assured + * that the whole first record can be processed. + */ + m = so->so_rcv.sb_mb; + sbfree(so, &so->so_rcv, m); + so->so_rcv.sb_mb = m_free(m); + sbsync(&so->so_rcv, nextrecord); + + /* + * Throw away the control mbufs after it has been assured + * that the whole first record can be processed. + */ + m = so->so_rcv.sb_mb; + while (m && m->m_type == MT_CONTROL) { + sbfree(so, &so->so_rcv, m); + so->so_rcv.sb_mb = m_free(m); + m = so->so_rcv.sb_mb; + sbsync(&so->so_rcv, nextrecord); + } + + SBLASTRECORDCHK(&so->so_rcv, "somove_dgram 2"); + SBLASTMBUFCHK(&so->so_rcv, "somove_dgram 2"); + + /* Take at most len mbufs out of receive buffer. */ + for (off = 0, mp = &m; off <= len && *mp; + off += (*mp)->m_len, mp = &(*mp)->m_next) { + u_long size = len - off; + +#ifdef DIAGNOSTIC + if ((*mp)->m_type != MT_DATA && (*mp)->m_type != MT_HEADER) + panic("somove_dgram type: so %p, so_type %d, m %p, " + "m_type %d", so, so->so_type, *mp, (*mp)->m_type); +#endif + if ((*mp)->m_len > size) { + /* + * Move only a partial mbuf at maximum splice length or + * if the drain buffer is too small for this large mbuf. + */ + if (!maxreached && sosp->so_snd.sb_datacc > 0) { + len -= size; + break; + } + *mp = m_copym(so->so_rcv.sb_mb, 0, size, wait); + if (*mp == NULL) { + len -= size; + break; + } + so->so_rcv.sb_mb->m_data += size; + so->so_rcv.sb_mb->m_len -= size; + so->so_rcv.sb_cc -= size; + so->so_rcv.sb_datacc -= size; + } else { + *mp = so->so_rcv.sb_mb; + sbfree(so, &so->so_rcv, *mp); + so->so_rcv.sb_mb = (*mp)->m_next; + sbsync(&so->so_rcv, nextrecord); + } + } + *mp = NULL; + + SBLASTRECORDCHK(&so->so_rcv, "somove_dgram 3"); + SBLASTMBUFCHK(&so->so_rcv, "somove_dgram 3"); + SBCHECK(so, &so->so_rcv); + if (m == NULL) + goto release; + m->m_nextpkt = NULL; + if (m->m_flags & M_PKTHDR) { + m_resethdr(m); + m->m_pkthdr.len = len; + } + + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + solock_shared(sosp); + error = pru_send(sosp, m, NULL, NULL); + sounlock_shared(sosp); + mtx_enter(&so->so_rcv.sb_mtx); + mtx_enter(&sosp->so_snd.sb_mtx); + + if (error) { + if (sosp->so_snd.sb_state & SS_CANTSENDMORE) + error = EPIPE; + goto release; + } + so->so_splicelen += len; + + /* Move several packets if possible. */ + if (!maxreached && nextrecord) { + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + goto nextpkt; + } + + release: + if (!error && maxreached && so->so_splicemax == so->so_splicelen) + error = EFBIG; + if (error) + WRITE_ONCE(so->so_error, error); + + if (((so->so_rcv.sb_state & SS_CANTRCVMORE) && + so->so_rcv.sb_cc == 0) || + (sosp->so_snd.sb_state & SS_CANTSENDMORE) || + maxreached || error) + unsplice = 1; + + mtx_leave(&sosp->so_snd.sb_mtx); + mtx_leave(&so->so_rcv.sb_mtx); + + if (unsplice) { + solock(so); + sounsplice(so, sosp, 0); + sounlock(so); + return (0); + } + if (timerisset(&so->so_idletv)) + timeout_add_tv(&so->so_idleto, &so->so_idletv); + return (1); +} #endif /* SOCKET_SPLICE */ void @@ -1839,22 +2153,16 @@ sorwakeup(struct socket *so) soassertlocked_readonly(so); #ifdef SOCKET_SPLICE - if (so->so_rcv.sb_flags & SB_SPLICE) { - /* - * TCP has a sendbuffer that can handle multiple packets - * at once. So queue the stream a bit to accumulate data. - * The sosplice thread will call somove() later and send - * the packets calling tcp_output() only once. - * In the UDP case, send out the packets immediately. - * Using a thread would make things slower. - */ - if (so->so_proto->pr_flags & PR_WANTRCVD) + if (so->so_proto->pr_flags & PR_SPLICE) { + sb_mtx_lock(&so->so_rcv); + if (so->so_rcv.sb_flags & SB_SPLICE) task_add(sosplice_taskq, &so->so_splicetask); - else - somove(so, M_DONTWAIT); + if (isspliced(so)) { + sb_mtx_unlock(&so->so_rcv); + return; + } + sb_mtx_unlock(&so->so_rcv); } - if (isspliced(so)) - return; #endif sowakeup(so, &so->so_rcv); if (so->so_upcall) @@ -1868,10 +2176,17 @@ sowwakeup(struct socket *so) soassertlocked_readonly(so); #ifdef SOCKET_SPLICE - if (so->so_snd.sb_flags & SB_SPLICE) - task_add(sosplice_taskq, &so->so_sp->ssp_soback->so_splicetask); - if (issplicedback(so)) - return; + if (so->so_proto->pr_flags & PR_SPLICE) { + sb_mtx_lock(&so->so_snd); + if (so->so_snd.sb_flags & SB_SPLICE) + task_add(sosplice_taskq, + &so->so_sp->ssp_soback->so_splicetask); + if (issplicedback(so)) { + sb_mtx_unlock(&so->so_snd); + return; + } + sb_mtx_unlock(&so->so_snd); + } #endif sowakeup(so, &so->so_snd); } Index: sys/netinet/in_proto.c =================================================================== RCS file: /mount/openbsd/cvs/src/sys/netinet/in_proto.c,v diff -u -p -u -p -r1.106 in_proto.c --- sys/netinet/in_proto.c 13 Jul 2024 12:00:11 -0000 1.106 +++ sys/netinet/in_proto.c 18 Jul 2024 19:15:26 -0000 @@ -185,7 +185,7 @@ const struct protosw inetsw[] = { .pr_type = SOCK_DGRAM, .pr_domain = &inetdomain, .pr_protocol = IPPROTO_UDP, - .pr_flags = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPSOCKET, + .pr_flags = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPINPUT|PR_MPSOCKET, .pr_input = udp_input, .pr_ctlinput = udp_ctlinput, .pr_ctloutput = ip_ctloutput, Index: sys/netinet/udp_usrreq.c =================================================================== RCS file: /mount/openbsd/cvs/src/sys/netinet/udp_usrreq.c,v diff -u -p -u -p -r1.321 udp_usrreq.c --- sys/netinet/udp_usrreq.c 12 Jul 2024 19:50:35 -0000 1.321 +++ sys/netinet/udp_usrreq.c 18 Jul 2024 19:15:26 -0000 @@ -592,7 +592,7 @@ udp_input(struct mbuf **mp, int *offp, i } KASSERT(sotoinpcb(inp->inp_socket) == inp); - soassertlocked(inp->inp_socket); + soassertlocked_readonly(inp->inp_socket); #ifdef INET6 if (ip6 && inp->inp_ip6_minhlim && @@ -1207,7 +1207,12 @@ udp_send(struct socket *so, struct mbuf { struct inpcb *inp = sotoinpcb(so); - soassertlocked(so); + soassertlocked_readonly(so); + + if (inp == NULL) { + /* PCB could be destroyed, but socket still spliced. */ + return (EINVAL); + } #ifdef PIPEX if (inp->inp_pipex) { Index: sys/netinet6/in6_proto.c =================================================================== RCS file: /mount/openbsd/cvs/src/sys/netinet6/in6_proto.c,v diff -u -p -u -p -r1.115 in6_proto.c --- sys/netinet6/in6_proto.c 12 Jul 2024 19:50:35 -0000 1.115 +++ sys/netinet6/in6_proto.c 18 Jul 2024 19:15:26 -0000 @@ -136,7 +136,7 @@ const struct protosw inet6sw[] = { .pr_type = SOCK_DGRAM, .pr_domain = &inet6domain, .pr_protocol = IPPROTO_UDP, - .pr_flags = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPSOCKET, + .pr_flags = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPINPUT|PR_MPSOCKET, .pr_input = udp_input, .pr_ctlinput = udp6_ctlinput, .pr_ctloutput = ip6_ctloutput, Index: sys/sys/socketvar.h =================================================================== RCS file: /mount/openbsd/cvs/src/sys/sys/socketvar.h,v diff -u -p -u -p -r1.132 socketvar.h --- sys/sys/socketvar.h 12 Jul 2024 17:20:18 -0000 1.132 +++ sys/sys/socketvar.h 18 Jul 2024 19:15:26 -0000 @@ -330,6 +330,12 @@ int sblock(struct sockbuf *, int); /* release lock on sockbuf sb */ void sbunlock(struct sockbuf *); +static inline void +sbassertlocked(struct sockbuf *sb) +{ + rw_assert_wrlock(&sb->sb_lock); +} + #define SB_EMPTY_FIXUP(sb) do { \ if ((sb)->sb_mb == NULL) { \ (sb)->sb_mbtail = NULL; \