PSARC/2009/590 Socket Filter Framework
6939085 Socket Filter Framework
6802067 connect_failed kernel socket callback is not triggered
6776450 time spent in tcp_close could be reduced/deferred to a worker thread
6828586 assertion failed: family == 26, file: ../../common/fs/sockfs/socksyscalls.c, line: 1608
6802078 kernel socket 'newconn' callback is passing rcv queue size as an argument
diff --git a/usr/src/uts/common/fs/sockfs/sockcommon_subr.c b/usr/src/uts/common/fs/sockfs/sockcommon_subr.c
index 2e3442e..a44d389 100644
--- a/usr/src/uts/common/fs/sockfs/sockcommon_subr.c
+++ b/usr/src/uts/common/fs/sockfs/sockcommon_subr.c
@@ -20,8 +20,7 @@
  */
 
 /*
- * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
- * Use is subject to license terms.
+ * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  */
 
 #include <sys/types.h>
@@ -39,6 +38,7 @@
 #include <sys/tihdr.h>
 
 #include <fs/sockfs/sockcommon.h>
+#include <fs/sockfs/sockfilter_impl.h>
 #include <fs/sockfs/socktpi.h>
 #include <fs/sockfs/sodirect.h>
 #include <sys/ddi.h>
@@ -59,46 +59,6 @@
 static boolean_t so_check_length(sonode_t *so);
 #endif
 
-int
-so_acceptq_enqueue_locked(struct sonode *so, struct sonode *nso)
-{
-	ASSERT(MUTEX_HELD(&so->so_acceptq_lock));
-	ASSERT(nso->so_acceptq_next == NULL);
-
-	*so->so_acceptq_tail = nso;
-	so->so_acceptq_tail = &nso->so_acceptq_next;
-	so->so_acceptq_len++;
-
-	/* Wakeup a single consumer */
-	cv_signal(&so->so_acceptq_cv);
-
-	return (so->so_acceptq_len);
-}
-
-/*
- * int so_acceptq_enqueue(struct sonode *so, struct sonode *nso)
- *
- * Enqueue an incoming connection on a listening socket.
- *
- * Arguments:
- *   so	  - listening socket
- *   nso  - new connection
- *
- * Returns:
- *   Number of queued connections, including the new connection
- */
-int
-so_acceptq_enqueue(struct sonode *so, struct sonode *nso)
-{
-	int conns;
-
-	mutex_enter(&so->so_acceptq_lock);
-	conns = so_acceptq_enqueue_locked(so, nso);
-	mutex_exit(&so->so_acceptq_lock);
-
-	return (conns);
-}
-
 static int
 so_acceptq_dequeue_locked(struct sonode *so, boolean_t dontblock,
     struct sonode **nsop)
@@ -107,7 +67,7 @@
 
 	*nsop = NULL;
 	ASSERT(MUTEX_HELD(&so->so_acceptq_lock));
-	while ((nso = so->so_acceptq_head) == NULL) {
+	while ((nso = list_remove_head(&so->so_acceptq_list)) == NULL) {
 		/*
 		 * No need to check so_error here, because it is not
 		 * possible for a listening socket to be reset or otherwise
@@ -126,15 +86,9 @@
 	}
 
 	ASSERT(nso != NULL);
-	so->so_acceptq_head = nso->so_acceptq_next;
-	nso->so_acceptq_next = NULL;
-
-	if (so->so_acceptq_head == NULL) {
-		ASSERT(so->so_acceptq_tail == &nso->so_acceptq_next);
-		so->so_acceptq_tail = &so->so_acceptq_head;
-	}
 	ASSERT(so->so_acceptq_len > 0);
-	--so->so_acceptq_len;
+	so->so_acceptq_len--;
+	nso->so_listener = NULL;
 
 	*nsop = nso;
 
@@ -174,8 +128,36 @@
 	return (error);
 }
 
+static void
+so_acceptq_flush_impl(struct sonode *so, list_t *list, boolean_t doclose)
+{
+	struct sonode *nso;
+
+	while ((nso = list_remove_head(list)) != NULL) {
+		nso->so_listener = NULL;
+		if (doclose) {
+			(void) socket_close(nso, 0, CRED());
+		} else {
+			/*
+			 * Only used for fallback - not possible when filters
+			 * are present.
+			 */
+			ASSERT(so->so_filter_active == 0);
+			/*
+			 * Since the socket is on the accept queue, there can
+			 * only be one reference. We drop the reference and
+			 * just blow off the socket.
+			 */
+			ASSERT(nso->so_count == 1);
+			nso->so_count--;
+			/* drop the proto ref */
+			VN_RELE(SOTOV(nso));
+		}
+		socket_destroy(nso);
+	}
+}
 /*
- * void so_acceptq_flush(struct sonode *so, boolean_t doclose)
+ * void so_acceptq_flush(struct sonode *so)
  *
  * Removes all pending connections from a listening socket, and
  * frees the associated resources.
@@ -183,7 +165,6 @@
  * Arguments
  *   so	     - listening socket
  *   doclose - make a close downcall for each socket on the accept queue
- *             (Note, only SCTP and SDP sockets rely on this)
  *
  * Return values:
  *   None.
@@ -197,28 +178,9 @@
 void
 so_acceptq_flush(struct sonode *so, boolean_t doclose)
 {
-	struct sonode *nso;
+	so_acceptq_flush_impl(so, &so->so_acceptq_list, doclose);
+	so_acceptq_flush_impl(so, &so->so_acceptq_defer, doclose);
 
-	while ((nso = so->so_acceptq_head) != NULL) {
-		so->so_acceptq_head = nso->so_acceptq_next;
-		nso->so_acceptq_next = NULL;
-
-		if (doclose) {
-			(void) socket_close(nso, 0, CRED());
-		} else {
-			/*
-			 * Since the socket is on the accept queue, there can
-			 * only be one reference. We drop the reference and
-			 * just blow off the socket.
-			 */
-			ASSERT(nso->so_count == 1);
-			nso->so_count--;
-		}
-		socket_destroy(nso);
-	}
-
-	so->so_acceptq_head = NULL;
-	so->so_acceptq_tail = &so->so_acceptq_head;
 	so->so_acceptq_len = 0;
 }
 
@@ -296,7 +258,7 @@
 	int error;
 
 	ASSERT(MUTEX_HELD(&so->so_lock));
-	while (so->so_snd_qfull) {
+	while (SO_SND_FLOWCTRLD(so)) {
 		if (so->so_state & SS_CANTSENDMORE)
 			return (EPIPE);
 		if (dontblock)
@@ -334,11 +296,9 @@
 	int error = 0;
 
 	mutex_enter(&so->so_lock);
-	if (so->so_snd_qfull) {
-		so->so_snd_wakeup = B_TRUE;
-		error = so_snd_wait_qnotfull_locked(so, dontblock);
-		so->so_snd_wakeup = B_FALSE;
-	}
+	so->so_snd_wakeup = B_TRUE;
+	error = so_snd_wait_qnotfull_locked(so, dontblock);
+	so->so_snd_wakeup = B_FALSE;
 	mutex_exit(&so->so_lock);
 
 	return (error);
@@ -601,8 +561,13 @@
 void
 so_process_new_message(struct sonode *so, mblk_t *mp_head, mblk_t *mp_last_head)
 {
+	if (so->so_filter_active > 0 &&
+	    (mp_head = sof_filter_data_in_proc(so, mp_head,
+	    &mp_last_head)) == NULL)
+		return;
+
 	ASSERT(mp_head->b_prev != NULL);
-	if (so->so_rcv_q_head  == NULL) {
+	if (so->so_rcv_q_head == NULL) {
 		so->so_rcv_q_head = mp_head;
 		so->so_rcv_q_last_head = mp_last_head;
 		ASSERT(so->so_rcv_q_last_head->b_prev != NULL);
@@ -650,13 +615,13 @@
  * Check flow control on a given sonode.  Must have so_lock held, and
  * this function will release the hold.
  */
-
-static void
+void
 so_check_flow_control(struct sonode *so)
 {
 	ASSERT(MUTEX_HELD(&so->so_lock));
 
-	if (so->so_flowctrld && so->so_rcv_queued < so->so_rcvlowat) {
+	if (so->so_flowctrld && (so->so_rcv_queued < so->so_rcvlowat &&
+	    !(so->so_state & SS_FIL_RCV_FLOWCTRL))) {
 		so->so_flowctrld = B_FALSE;
 		mutex_exit(&so->so_lock);
 		/*
@@ -668,6 +633,8 @@
 			(*so->so_downcalls->sd_clr_flowctrl)
 			    (so->so_proto_handle);
 		}
+		/* filters can start injecting data */
+		sof_sonode_notify_filters(so, SOF_EV_INJECT_DATA_IN_OK, 0);
 	} else {
 		mutex_exit(&so->so_lock);
 	}
@@ -1116,7 +1083,7 @@
 	}
 
 	/*
-	 * Free messages sitting in the send and recv queue
+	 * Free messages sitting in the recv queues
 	 */
 	while (so->so_rcv_q_head != NULL) {
 		mp = so->so_rcv_q_head;
@@ -1313,11 +1280,29 @@
 		so->so_pollev = pso->so_pollev & SO_POLLEV_ALWAYS;
 
 		mutex_exit(&pso->so_lock);
+
+		/*
+		 * If the parent has any filters, try to inherit them.
+		 */
+		if (pso->so_filter_active > 0 &&
+		    (error = sof_sonode_inherit_filters(so, pso)) != 0)
+			return (error);
+
 	} else {
 		struct sockparams *sp = so->so_sockparams;
 		sock_upcalls_t *upcalls_to_use;
 
 		/*
+		 * Attach automatic filters, if there are any.
+		 */
+		if (!list_is_empty(&sp->sp_auto_filters) &&
+		    (error = sof_sonode_autoattach_filters(so, cr)) != 0)
+			return (error);
+
+		/* OK to attach filters */
+		so->so_state |= SS_FILOP_OK;
+
+		/*
 		 * Based on the version number select the right upcalls to
 		 * pass down. Currently we only have one version so choose
 		 * default
@@ -1384,6 +1369,9 @@
 	if (uioasync.enabled)
 		sod_sock_init(so);
 
+	/* put an extra reference on the socket for the protocol */
+	VN_HOLD(SOTOV(so));
+
 	return (0);
 }
 
@@ -1812,6 +1800,22 @@
 		*optlenp = sizeof (struct so_snd_bufinfo);
 		return (0);
 	}
+	case SO_SND_COPYAVOID: {
+		sof_instance_t *inst;
+
+		/*
+		 * Avoid zero-copy if there is a filter with a data_out
+		 * callback. We could let the operation succeed, but then
+		 * the filter would have to copy the data anyway.
+		 */
+		for (inst = so->so_filter_top; inst != NULL;
+		    inst = inst->sofi_next) {
+			if (SOF_INTERESTED(inst, data_out))
+				return (EOPNOTSUPP);
+		}
+		break;
+	}
+
 	default:
 		break;
 	}
@@ -1982,15 +1986,19 @@
  * We do not need to hold so_lock, since there can be only one thread
  * operating on the sonode.
  */
-static void
-so_quiesced_cb(sock_upper_handle_t sock_handle, queue_t *q,
-    struct T_capability_ack *tcap, struct sockaddr *laddr, socklen_t laddrlen,
+static mblk_t *
+so_quiesced_cb(sock_upper_handle_t sock_handle, sock_quiesce_arg_t *arg,
+    struct T_capability_ack *tcap,
+    struct sockaddr *laddr, socklen_t laddrlen,
     struct sockaddr *faddr, socklen_t faddrlen, short opts)
 {
 	struct sonode *so = (struct sonode *)sock_handle;
 	boolean_t atmark;
+	mblk_t *retmp = NULL, **tailmpp = &retmp;
 
-	sotpi_update_state(so, tcap, laddr, laddrlen, faddr, faddrlen, opts);
+	if (tcap != NULL)
+		sotpi_update_state(so, tcap, laddr, laddrlen, faddr, faddrlen,
+		    opts);
 
 	/*
 	 * Some protocols do not quiece the data path during fallback. Once
@@ -2038,9 +2046,9 @@
 		 */
 		if (atmark) {
 			struct T_exdata_ind *tei;
-			mblk_t *mp1 = SOTOTPI(so)->sti_exdata_mp;
+			mblk_t *mp1 = arg->soqa_exdata_mp;
 
-			SOTOTPI(so)->sti_exdata_mp = NULL;
+			arg->soqa_exdata_mp = NULL;
 			ASSERT(mp1 != NULL);
 			mp1->b_datap->db_type = M_PROTO;
 			tei = (struct T_exdata_ind *)mp1->b_rptr;
@@ -2101,7 +2109,8 @@
 		 * Queue data on the STREAM head.
 		 */
 		so->so_rcv_queued -= mlen;
-		putnext(q, mp);
+		*tailmpp = mp;
+		tailmpp = &mp->b_next;
 	}
 	so->so_rcv_head = NULL;
 	so->so_rcv_last_head = NULL;
@@ -2121,8 +2130,8 @@
 		if (atmark && so->so_oobmsg != NULL) {
 			struct T_exdata_ind *tei;
 
-			mp = SOTOTPI(so)->sti_exdata_mp;
-			SOTOTPI(so)->sti_exdata_mp = NULL;
+			mp = arg->soqa_exdata_mp;
+			arg->soqa_exdata_mp = NULL;
 			ASSERT(mp != NULL);
 			mp->b_datap->db_type = M_PROTO;
 			tei = (struct T_exdata_ind *)mp->b_rptr;
@@ -2133,38 +2142,32 @@
 			mp->b_cont = so->so_oobmsg;
 			so->so_oobmsg = NULL;
 
-			putnext(q, mp);
+			*tailmpp = mp;
+			tailmpp = &mp->b_next;
 		} else {
 			/* Send up the signal */
-			mp = SOTOTPI(so)->sti_exdata_mp;
-			SOTOTPI(so)->sti_exdata_mp = NULL;
+			mp = arg->soqa_exdata_mp;
+			arg->soqa_exdata_mp = NULL;
 			ASSERT(mp != NULL);
 			DB_TYPE(mp) = M_PCSIG;
 			*mp->b_wptr++ = (uchar_t)SIGURG;
-			putnext(q, mp);
+			*tailmpp = mp;
+			tailmpp = &mp->b_next;
 
 			/* Send up the mark indicator */
-			mp = SOTOTPI(so)->sti_urgmark_mp;
-			SOTOTPI(so)->sti_urgmark_mp = NULL;
+			mp = arg->soqa_urgmark_mp;
+			arg->soqa_urgmark_mp = NULL;
 			mp->b_flag = atmark ? MSGMARKNEXT : MSGNOTMARKNEXT;
-			putnext(q, mp);
+			*tailmpp = mp;
+			tailmpp = &mp->b_next;
 
 			so->so_oobmark = 0;
 		}
 	}
-
-	if (SOTOTPI(so)->sti_exdata_mp != NULL) {
-		freeb(SOTOTPI(so)->sti_exdata_mp);
-		SOTOTPI(so)->sti_exdata_mp = NULL;
-	}
-
-	if (SOTOTPI(so)->sti_urgmark_mp != NULL) {
-		freeb(SOTOTPI(so)->sti_urgmark_mp);
-		SOTOTPI(so)->sti_urgmark_mp = NULL;
-	}
-
 	ASSERT(so->so_oobmark == 0);
 	ASSERT(so->so_rcv_queued == 0);
+
+	return (retmp);
 }
 
 #ifdef DEBUG
@@ -2203,7 +2206,8 @@
 	VERIFY(cur->so_version == orig->so_version);
 	/* New conns might have arrived, but none should have been lost */
 	VERIFY(cur->so_acceptq_len >= orig->so_acceptq_len);
-	VERIFY(cur->so_acceptq_head == orig->so_acceptq_head);
+	VERIFY(list_head(&cur->so_acceptq_list) ==
+	    list_head(&orig->so_acceptq_list));
 	VERIFY(cur->so_backlog == orig->so_backlog);
 	/* New OOB migth have arrived, but mark should not have been lost */
 	VERIFY(cur->so_oobmark >= orig->so_oobmark);
@@ -2243,8 +2247,10 @@
 	struct sockparams *sp;
 	struct sockparams *newsp = NULL;
 	so_proto_fallback_func_t fbfunc;
+	const char *devpath;
 	boolean_t direct;
 	struct sonode *nso;
+	sock_quiesce_arg_t arg = { NULL, NULL };
 #ifdef DEBUG
 	struct sonode origso;
 #endif
@@ -2253,10 +2259,27 @@
 	fbfunc = sp->sp_smod_info->smod_proto_fallback_func;
 
 	/*
-	 * Fallback can only happen if there is a device associated
-	 * with the sonode, and the socket module has a fallback function.
+	 * Cannot fallback if the socket has active filters
 	 */
-	if (!SOCKPARAMS_HAS_DEVICE(sp) || fbfunc == NULL)
+	if (so->so_filter_active > 0)
+		return (EINVAL);
+
+	switch (so->so_family) {
+	case AF_INET:
+		devpath = sp->sp_smod_info->smod_fallback_devpath_v4;
+		break;
+	case AF_INET6:
+		devpath = sp->sp_smod_info->smod_fallback_devpath_v6;
+		break;
+	default:
+		return (EINVAL);
+	}
+
+	/*
+	 * Fallback can only happen if the socket module has a TPI device
+	 * and fallback function.
+	 */
+	if (devpath == NULL || fbfunc == NULL)
 		return (EINVAL);
 
 	/*
@@ -2276,8 +2299,7 @@
 	sp->sp_stats.sps_nfallback.value.ui64++;
 
 	newsp = sockparams_hold_ephemeral_bydev(so->so_family, so->so_type,
-	    so->so_protocol, so->so_sockparams->sp_sdev_info.sd_devpath,
-	    KM_SLEEP, &error);
+	    so->so_protocol, devpath, KM_SLEEP, &error);
 	if (error != 0)
 		goto out;
 
@@ -2295,14 +2317,30 @@
 	error = sotpi_convert_sonode(so, newsp, &direct, &q, cr);
 	if (error != 0)
 		goto out;
-
+	/*
+	 * When it comes to urgent data we have two cases to deal with;
+	 * (1) The oob byte has already arrived, or (2) the protocol has
+	 * notified that oob data is pending, but it has not yet arrived.
+	 *
+	 * For (1) all we need to do is send a T_EXDATA_IND to indicate were
+	 * in the byte stream the oob byte is. For (2) we have to send a
+	 * SIGURG (M_PCSIG), followed by a zero-length mblk indicating whether
+	 * the oob byte will be the next byte from the protocol.
+	 *
+	 * So in the worst case we need two mblks, one for the signal, another
+	 * for mark indication. In that case we use the exdata_mp for the sig.
+	 */
+	arg.soqa_exdata_mp = allocb_wait(sizeof (struct T_exdata_ind),
+	    BPRI_MED, STR_NOSIG, NULL);
+	arg.soqa_urgmark_mp = allocb_wait(0, BPRI_MED, STR_NOSIG, NULL);
 
 	/*
 	 * Now tell the protocol to start using TPI. so_quiesced_cb be
 	 * called once it's safe to synchronize state.
 	 */
 	DTRACE_PROBE1(proto__fallback__begin, struct sonode *, so);
-	error = (*fbfunc)(so->so_proto_handle, q, direct, so_quiesced_cb);
+	error = (*fbfunc)(so->so_proto_handle, q, direct, so_quiesced_cb,
+	    &arg);
 	DTRACE_PROBE1(proto__fallback__end, struct sonode *, so);
 
 	if (error != 0) {
@@ -2315,19 +2353,40 @@
 	 * Walk the accept queue and notify the proto that they should
 	 * fall back to TPI. The protocol will send up the T_CONN_IND.
 	 */
-	nso = so->so_acceptq_head;
+	nso = list_head(&so->so_acceptq_list);
 	while (nso != NULL) {
 		int rval;
+		struct sonode *next;
+
+		if (arg.soqa_exdata_mp == NULL) {
+			arg.soqa_exdata_mp =
+			    allocb_wait(sizeof (struct T_exdata_ind),
+			    BPRI_MED, STR_NOSIG, NULL);
+		}
+		if (arg.soqa_urgmark_mp == NULL) {
+			arg.soqa_urgmark_mp = allocb_wait(0, BPRI_MED,
+			    STR_NOSIG, NULL);
+		}
 
 		DTRACE_PROBE1(proto__fallback__begin, struct sonode *, nso);
-		rval = (*fbfunc)(nso->so_proto_handle, NULL, direct, NULL);
+		rval = (*fbfunc)(nso->so_proto_handle, NULL, direct,
+		    so_quiesced_cb, &arg);
 		DTRACE_PROBE1(proto__fallback__end, struct sonode *, nso);
 		if (rval != 0) {
+			/* Abort the connection */
 			zcmn_err(getzoneid(), CE_WARN,
 			    "Failed to convert socket in accept queue to TPI. "
 			    "Pid = %d\n", curproc->p_pid);
+			next = list_next(&so->so_acceptq_list, nso);
+			list_remove(&so->so_acceptq_list, nso);
+			so->so_acceptq_len--;
+
+			(void) socket_close(nso, 0, CRED());
+			socket_destroy(nso);
+			nso = next;
+		} else {
+			nso = list_next(&so->so_acceptq_list, nso);
 		}
-		nso = nso->so_acceptq_next;
 	}
 
 	/*
@@ -2352,6 +2411,14 @@
 	 * the STREAMS head).
 	 */
 	pollwakeup(&so->so_poll_list, POLLERR);
+
+	/*
+	 * When this non-STREAM socket was created we placed an extra ref on
+	 * the associated vnode to support asynchronous close. Drop that ref
+	 * here.
+	 */
+	ASSERT(SOTOV(so)->v_count >= 2);
+	VN_RELE(SOTOV(so));
 out:
 	so_end_fallback(so);
 
@@ -2365,6 +2432,10 @@
 		if (newsp != NULL)
 			SOCKPARAMS_DEC_REF(newsp);
 	}
+	if (arg.soqa_exdata_mp != NULL)
+		freemsg(arg.soqa_exdata_mp);
+	if (arg.soqa_urgmark_mp != NULL)
+		freemsg(arg.soqa_urgmark_mp);
 
 	return (error);
 }