blob: 61e8ca168fcedab1681c963878e8b608807bdf64 [file] [log] [blame]
/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License (the "License").
* You may not use this file except in compliance with the License.
*
* You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
* or http://www.opensolaris.org/os/licensing.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at usr/src/OPENSOLARIS.LICENSE.
* If applicable, add the following below this CDDL HEADER, with the
* fields enclosed by brackets "[]" replaced with your own identifying
* information: Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*/
/* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
/* All Rights Reserved */
/*
* Copyright 2009 Sun Microsystems, Inc. All rights reserved.
* Use is subject to license terms.
*/
#include <sys/types.h>
#include <sys/sysmacros.h>
#include <sys/param.h>
#include <sys/errno.h>
#include <sys/signal.h>
#include <sys/proc.h>
#include <sys/conf.h>
#include <sys/cred.h>
#include <sys/user.h>
#include <sys/vnode.h>
#include <sys/file.h>
#include <sys/session.h>
#include <sys/stream.h>
#include <sys/strsubr.h>
#include <sys/stropts.h>
#include <sys/poll.h>
#include <sys/systm.h>
#include <sys/cpuvar.h>
#include <sys/uio.h>
#include <sys/cmn_err.h>
#include <sys/priocntl.h>
#include <sys/procset.h>
#include <sys/vmem.h>
#include <sys/bitmap.h>
#include <sys/kmem.h>
#include <sys/siginfo.h>
#include <sys/vtrace.h>
#include <sys/callb.h>
#include <sys/debug.h>
#include <sys/modctl.h>
#include <sys/vmsystm.h>
#include <vm/page.h>
#include <sys/atomic.h>
#include <sys/suntpi.h>
#include <sys/strlog.h>
#include <sys/promif.h>
#include <sys/project.h>
#include <sys/vm.h>
#include <sys/taskq.h>
#include <sys/sunddi.h>
#include <sys/sunldi_impl.h>
#include <sys/strsun.h>
#include <sys/isa_defs.h>
#include <sys/multidata.h>
#include <sys/pattr.h>
#include <sys/strft.h>
#include <sys/fs/snode.h>
#include <sys/zone.h>
#include <sys/open.h>
#include <sys/sunldi.h>
#include <sys/sad.h>
#include <sys/netstack.h>
#define O_SAMESTR(q) (((q)->q_next) && \
(((q)->q_flag & QREADR) == ((q)->q_next->q_flag & QREADR)))
/*
* WARNING:
* The variables and routines in this file are private, belonging
* to the STREAMS subsystem. These should not be used by modules
* or drivers. Compatibility will not be guaranteed.
*/
/*
* Id value used to distinguish between different multiplexor links.
*/
static int32_t lnk_id = 0;
#define STREAMS_LOPRI MINCLSYSPRI
static pri_t streams_lopri = STREAMS_LOPRI;
#define STRSTAT(x) (str_statistics.x.value.ui64++)
typedef struct str_stat {
kstat_named_t sqenables;
kstat_named_t stenables;
kstat_named_t syncqservice;
kstat_named_t freebs;
kstat_named_t qwr_outer;
kstat_named_t rservice;
kstat_named_t strwaits;
kstat_named_t taskqfails;
kstat_named_t bufcalls;
kstat_named_t qhelps;
kstat_named_t qremoved;
kstat_named_t sqremoved;
kstat_named_t bcwaits;
kstat_named_t sqtoomany;
} str_stat_t;
static str_stat_t str_statistics = {
{ "sqenables", KSTAT_DATA_UINT64 },
{ "stenables", KSTAT_DATA_UINT64 },
{ "syncqservice", KSTAT_DATA_UINT64 },
{ "freebs", KSTAT_DATA_UINT64 },
{ "qwr_outer", KSTAT_DATA_UINT64 },
{ "rservice", KSTAT_DATA_UINT64 },
{ "strwaits", KSTAT_DATA_UINT64 },
{ "taskqfails", KSTAT_DATA_UINT64 },
{ "bufcalls", KSTAT_DATA_UINT64 },
{ "qhelps", KSTAT_DATA_UINT64 },
{ "qremoved", KSTAT_DATA_UINT64 },
{ "sqremoved", KSTAT_DATA_UINT64 },
{ "bcwaits", KSTAT_DATA_UINT64 },
{ "sqtoomany", KSTAT_DATA_UINT64 },
};
static kstat_t *str_kstat;
/*
* qrunflag was used previously to control background scheduling of queues. It
* is not used anymore, but kept here in case some module still wants to access
* it via qready() and setqsched macros.
*/
char qrunflag; /* Unused */
/*
* Most of the streams scheduling is done via task queues. Task queues may fail
* for non-sleep dispatches, so there are two backup threads servicing failed
* requests for queues and syncqs. Both of these threads also service failed
* dispatches freebs requests. Queues are put in the list specified by `qhead'
* and `qtail' pointers, syncqs use `sqhead' and `sqtail' pointers and freebs
* requests are put into `freebs_list' which has no tail pointer. All three
* lists are protected by a single `service_queue' lock and use
* `services_to_run' condition variable for signaling background threads. Use of
* a single lock should not be a problem because it is only used under heavy
* loads when task queues start to fail and at that time it may be a good idea
* to throttle scheduling requests.
*
* NOTE: queues and syncqs should be scheduled by two separate threads because
* queue servicing may be blocked waiting for a syncq which may be also
* scheduled for background execution. This may create a deadlock when only one
* thread is used for both.
*/
static taskq_t *streams_taskq; /* Used for most STREAMS scheduling */
static kmutex_t service_queue; /* protects all of servicing vars */
static kcondvar_t services_to_run; /* wake up background service thread */
static kcondvar_t syncqs_to_run; /* wake up background service thread */
/*
* List of queues scheduled for background processing dueue to lack of resources
* in the task queues. Protected by service_queue lock;
*/
static struct queue *qhead;
static struct queue *qtail;
/*
* Same list for syncqs
*/
static syncq_t *sqhead;
static syncq_t *sqtail;
static mblk_t *freebs_list; /* list of buffers to free */
/*
* Backup threads for servicing queues and syncqs
*/
kthread_t *streams_qbkgrnd_thread;
kthread_t *streams_sqbkgrnd_thread;
/*
* Bufcalls related variables.
*/
struct bclist strbcalls; /* list of waiting bufcalls */
kmutex_t strbcall_lock; /* protects bufcall list (strbcalls) */
kcondvar_t strbcall_cv; /* Signaling when a bufcall is added */
kmutex_t bcall_monitor; /* sleep/wakeup style monitor */
kcondvar_t bcall_cv; /* wait 'till executing bufcall completes */
kthread_t *bc_bkgrnd_thread; /* Thread to service bufcall requests */
kmutex_t strresources; /* protects global resources */
kmutex_t muxifier; /* single-threads multiplexor creation */
static void *str_stack_init(netstackid_t stackid, netstack_t *ns);
static void str_stack_shutdown(netstackid_t stackid, void *arg);
static void str_stack_fini(netstackid_t stackid, void *arg);
extern void time_to_wait(clock_t *, clock_t);
/*
* run_queues is no longer used, but is kept in case some 3-d party
* module/driver decides to use it.
*/
int run_queues = 0;
/*
* sq_max_size is the depth of the syncq (in number of messages) before
* qfill_syncq() starts QFULL'ing destination queues. As its primary
* consumer - IP is no longer D_MTPERMOD, but there may be other
* modules/drivers depend on this syncq flow control, we prefer to
* choose a large number as the default value. For potential
* performance gain, this value is tunable in /etc/system.
*/
int sq_max_size = 10000;
/*
* the number of ciputctrl structures per syncq and stream we create when
* needed.
*/
int n_ciputctrl;
int max_n_ciputctrl = 16;
/*
* if n_ciputctrl is < min_n_ciputctrl don't even create ciputctrl_cache.
*/
int min_n_ciputctrl = 2;
/*
* Per-driver/module syncqs
* ========================
*
* For drivers/modules that use PERMOD or outer syncqs we keep a list of
* perdm structures, new entries being added (and new syncqs allocated) when
* setq() encounters a module/driver with a streamtab that it hasn't seen
* before.
* The reason for this mechanism is that some modules and drivers share a
* common streamtab and it is necessary for those modules and drivers to also
* share a common PERMOD syncq.
*
* perdm_list --> dm_str == streamtab_1
* dm_sq == syncq_1
* dm_ref
* dm_next --> dm_str == streamtab_2
* dm_sq == syncq_2
* dm_ref
* dm_next --> ... NULL
*
* The dm_ref field is incremented for each new driver/module that takes
* a reference to the perdm structure and hence shares the syncq.
* References are held in the fmodsw_impl_t structure for each STREAMS module
* or the dev_impl array (indexed by device major number) for each driver.
*
* perdm_list -> [dm_ref == 1] -> [dm_ref == 2] -> [dm_ref == 1] -> NULL
* ^ ^ ^ ^
* | ______________/ | |
* | / | |
* dev_impl: ...|x|y|... module A module B
*
* When a module/driver is unloaded the reference count is decremented and,
* when it falls to zero, the perdm structure is removed from the list and
* the syncq is freed (see rele_dm()).
*/
perdm_t *perdm_list = NULL;
static krwlock_t perdm_rwlock;
cdevsw_impl_t *devimpl;
extern struct qinit strdata;
extern struct qinit stwdata;
static void runservice(queue_t *);
static void streams_bufcall_service(void);
static void streams_qbkgrnd_service(void);
static void streams_sqbkgrnd_service(void);
static syncq_t *new_syncq(void);
static void free_syncq(syncq_t *);
static void outer_insert(syncq_t *, syncq_t *);
static void outer_remove(syncq_t *, syncq_t *);
static void write_now(syncq_t *);
static void clr_qfull(queue_t *);
static void runbufcalls(void);
static void sqenable(syncq_t *);
static void sqfill_events(syncq_t *, queue_t *, mblk_t *, void (*)());
static void wait_q_syncq(queue_t *);
static void backenable_insertedq(queue_t *);
static void queue_service(queue_t *);
static void stream_service(stdata_t *);
static void syncq_service(syncq_t *);
static void qwriter_outer_service(syncq_t *);
static void mblk_free(mblk_t *);
#ifdef DEBUG
static int qprocsareon(queue_t *);
#endif
static void set_nfsrv_ptr(queue_t *, queue_t *, queue_t *, queue_t *);
static void reset_nfsrv_ptr(queue_t *, queue_t *);
void set_qfull(queue_t *);
static void sq_run_events(syncq_t *);
static int propagate_syncq(queue_t *);
static void blocksq(syncq_t *, ushort_t, int);
static void unblocksq(syncq_t *, ushort_t, int);
static int dropsq(syncq_t *, uint16_t);
static void emptysq(syncq_t *);
static sqlist_t *sqlist_alloc(struct stdata *, int);
static void sqlist_free(sqlist_t *);
static sqlist_t *sqlist_build(queue_t *, struct stdata *, boolean_t);
static void sqlist_insert(sqlist_t *, syncq_t *);
static void sqlist_insertall(sqlist_t *, queue_t *);
static void strsetuio(stdata_t *);
struct kmem_cache *stream_head_cache;
struct kmem_cache *queue_cache;
struct kmem_cache *syncq_cache;
struct kmem_cache *qband_cache;
struct kmem_cache *linkinfo_cache;
struct kmem_cache *ciputctrl_cache = NULL;
static linkinfo_t *linkinfo_list;
/* global esballoc throttling queue */
static esb_queue_t system_esbq;
/*
* esballoc tunable parameters.
*/
int esbq_max_qlen = 0x16; /* throttled queue length */
clock_t esbq_timeout = 0x8; /* timeout to process esb queue */
/*
* routines to handle esballoc queuing.
*/
static void esballoc_process_queue(esb_queue_t *);
static void esballoc_enqueue_mblk(mblk_t *);
static void esballoc_timer(void *);
static void esballoc_set_timer(esb_queue_t *, clock_t);
static void esballoc_mblk_free(mblk_t *);
/*
* Qinit structure and Module_info structures
* for passthru read and write queues
*/
static void pass_wput(queue_t *, mblk_t *);
static queue_t *link_addpassthru(stdata_t *);
static void link_rempassthru(queue_t *);
struct module_info passthru_info = {
0,
"passthru",
0,
INFPSZ,
STRHIGH,
STRLOW
};
struct qinit passthru_rinit = {
(int (*)())putnext,
NULL,
NULL,
NULL,
NULL,
&passthru_info,
NULL
};
struct qinit passthru_winit = {
(int (*)()) pass_wput,
NULL,
NULL,
NULL,
NULL,
&passthru_info,
NULL
};
/*
* Special form of assertion: verify that X implies Y i.e. when X is true Y
* should also be true.
*/
#define IMPLY(X, Y) ASSERT(!(X) || (Y))
/*
* Logical equivalence. Verify that both X and Y are either TRUE or FALSE.
*/
#define EQUIV(X, Y) { IMPLY(X, Y); IMPLY(Y, X); }
/*
* Verify correctness of list head/tail pointers.
*/
#define LISTCHECK(head, tail, link) { \
EQUIV(head, tail); \
IMPLY(tail != NULL, tail->link == NULL); \
}
/*
* Enqueue a list element `el' in the end of a list denoted by `head' and `tail'
* using a `link' field.
*/
#define ENQUEUE(el, head, tail, link) { \
ASSERT(el->link == NULL); \
LISTCHECK(head, tail, link); \
if (head == NULL) \
head = el; \
else \
tail->link = el; \
tail = el; \
}
/*
* Dequeue the first element of the list denoted by `head' and `tail' pointers
* using a `link' field and put result into `el'.
*/
#define DQ(el, head, tail, link) { \
LISTCHECK(head, tail, link); \
el = head; \
if (head != NULL) { \
head = head->link; \
if (head == NULL) \
tail = NULL; \
el->link = NULL; \
} \
}
/*
* Remove `el' from the list using `chase' and `curr' pointers and return result
* in `succeed'.
*/
#define RMQ(el, head, tail, link, chase, curr, succeed) { \
LISTCHECK(head, tail, link); \
chase = NULL; \
succeed = 0; \
for (curr = head; (curr != el) && (curr != NULL); curr = curr->link) \
chase = curr; \
if (curr != NULL) { \
succeed = 1; \
ASSERT(curr == el); \
if (chase != NULL) \
chase->link = curr->link; \
else \
head = curr->link; \
curr->link = NULL; \
if (curr == tail) \
tail = chase; \
} \
LISTCHECK(head, tail, link); \
}
/* Handling of delayed messages on the inner syncq. */
/*
* DEBUG versions should use function versions (to simplify tracing) and
* non-DEBUG kernels should use macro versions.
*/
/*
* Put a queue on the syncq list of queues.
* Assumes SQLOCK held.
*/
#define SQPUT_Q(sq, qp) \
{ \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
if (!(qp->q_sqflags & Q_SQQUEUED)) { \
/* The queue should not be linked anywhere */ \
ASSERT((qp->q_sqprev == NULL) && (qp->q_sqnext == NULL)); \
/* Head and tail may only be NULL simultaneously */ \
EQUIV(sq->sq_head, sq->sq_tail); \
/* Queue may be only enqueyed on its syncq */ \
ASSERT(sq == qp->q_syncq); \
/* Check the correctness of SQ_MESSAGES flag */ \
EQUIV(sq->sq_head, (sq->sq_flags & SQ_MESSAGES)); \
/* Sanity check first/last elements of the list */ \
IMPLY(sq->sq_head != NULL, sq->sq_head->q_sqprev == NULL);\
IMPLY(sq->sq_tail != NULL, sq->sq_tail->q_sqnext == NULL);\
/* \
* Sanity check of priority field: empty queue should \
* have zero priority \
* and nqueues equal to zero. \
*/ \
IMPLY(sq->sq_head == NULL, sq->sq_pri == 0); \
/* Sanity check of sq_nqueues field */ \
EQUIV(sq->sq_head, sq->sq_nqueues); \
if (sq->sq_head == NULL) { \
sq->sq_head = sq->sq_tail = qp; \
sq->sq_flags |= SQ_MESSAGES; \
} else if (qp->q_spri == 0) { \
qp->q_sqprev = sq->sq_tail; \
sq->sq_tail->q_sqnext = qp; \
sq->sq_tail = qp; \
} else { \
/* \
* Put this queue in priority order: higher \
* priority gets closer to the head. \
*/ \
queue_t **qpp = &sq->sq_tail; \
queue_t *qnext = NULL; \
\
while (*qpp != NULL && qp->q_spri > (*qpp)->q_spri) { \
qnext = *qpp; \
qpp = &(*qpp)->q_sqprev; \
} \
qp->q_sqnext = qnext; \
qp->q_sqprev = *qpp; \
if (*qpp != NULL) { \
(*qpp)->q_sqnext = qp; \
} else { \
sq->sq_head = qp; \
sq->sq_pri = sq->sq_head->q_spri; \
} \
*qpp = qp; \
} \
qp->q_sqflags |= Q_SQQUEUED; \
qp->q_sqtstamp = lbolt; \
sq->sq_nqueues++; \
} \
}
/*
* Remove a queue from the syncq list
* Assumes SQLOCK held.
*/
#define SQRM_Q(sq, qp) \
{ \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
ASSERT(qp->q_sqflags & Q_SQQUEUED); \
ASSERT(sq->sq_head != NULL && sq->sq_tail != NULL); \
ASSERT((sq->sq_flags & SQ_MESSAGES) != 0); \
/* Check that the queue is actually in the list */ \
ASSERT(qp->q_sqnext != NULL || sq->sq_tail == qp); \
ASSERT(qp->q_sqprev != NULL || sq->sq_head == qp); \
ASSERT(sq->sq_nqueues != 0); \
if (qp->q_sqprev == NULL) { \
/* First queue on list, make head q_sqnext */ \
sq->sq_head = qp->q_sqnext; \
} else { \
/* Make prev->next == next */ \
qp->q_sqprev->q_sqnext = qp->q_sqnext; \
} \
if (qp->q_sqnext == NULL) { \
/* Last queue on list, make tail sqprev */ \
sq->sq_tail = qp->q_sqprev; \
} else { \
/* Make next->prev == prev */ \
qp->q_sqnext->q_sqprev = qp->q_sqprev; \
} \
/* clear out references on this queue */ \
qp->q_sqprev = qp->q_sqnext = NULL; \
qp->q_sqflags &= ~Q_SQQUEUED; \
/* If there is nothing queued, clear SQ_MESSAGES */ \
if (sq->sq_head != NULL) { \
sq->sq_pri = sq->sq_head->q_spri; \
} else { \
sq->sq_flags &= ~SQ_MESSAGES; \
sq->sq_pri = 0; \
} \
sq->sq_nqueues--; \
ASSERT(sq->sq_head != NULL || sq->sq_evhead != NULL || \
(sq->sq_flags & SQ_QUEUED) == 0); \
}
/* Hide the definition from the header file. */
#ifdef SQPUT_MP
#undef SQPUT_MP
#endif
/*
* Put a message on the queue syncq.
* Assumes QLOCK held.
*/
#define SQPUT_MP(qp, mp) \
{ \
ASSERT(MUTEX_HELD(QLOCK(qp))); \
ASSERT(qp->q_sqhead == NULL || \
(qp->q_sqtail != NULL && \
qp->q_sqtail->b_next == NULL)); \
qp->q_syncqmsgs++; \
ASSERT(qp->q_syncqmsgs != 0); /* Wraparound */ \
if (qp->q_sqhead == NULL) { \
qp->q_sqhead = qp->q_sqtail = mp; \
} else { \
qp->q_sqtail->b_next = mp; \
qp->q_sqtail = mp; \
} \
ASSERT(qp->q_syncqmsgs > 0); \
set_qfull(qp); \
}
#define SQ_PUTCOUNT_SETFAST_LOCKED(sq) { \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
if ((sq)->sq_ciputctrl != NULL) { \
int i; \
int nlocks = (sq)->sq_nciputctrl; \
ciputctrl_t *cip = (sq)->sq_ciputctrl; \
ASSERT((sq)->sq_type & SQ_CIPUT); \
for (i = 0; i <= nlocks; i++) { \
ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
cip[i].ciputctrl_count |= SQ_FASTPUT; \
} \
} \
}
#define SQ_PUTCOUNT_CLRFAST_LOCKED(sq) { \
ASSERT(MUTEX_HELD(SQLOCK(sq))); \
if ((sq)->sq_ciputctrl != NULL) { \
int i; \
int nlocks = (sq)->sq_nciputctrl; \
ciputctrl_t *cip = (sq)->sq_ciputctrl; \
ASSERT((sq)->sq_type & SQ_CIPUT); \
for (i = 0; i <= nlocks; i++) { \
ASSERT(MUTEX_HELD(&cip[i].ciputctrl_lock)); \
cip[i].ciputctrl_count &= ~SQ_FASTPUT; \
} \
} \
}
/*
* Run service procedures for all queues in the stream head.
*/
#define STR_SERVICE(stp, q) { \
ASSERT(MUTEX_HELD(&stp->sd_qlock)); \
while (stp->sd_qhead != NULL) { \
DQ(q, stp->sd_qhead, stp->sd_qtail, q_link); \
ASSERT(stp->sd_nqueues > 0); \
stp->sd_nqueues--; \
ASSERT(!(q->q_flag & QINSERVICE)); \
mutex_exit(&stp->sd_qlock); \
queue_service(q); \
mutex_enter(&stp->sd_qlock); \
} \
ASSERT(stp->sd_nqueues == 0); \
ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL)); \
}
/*
* constructor/destructor routines for the stream head cache
*/
/* ARGSUSED */
static int
stream_head_constructor(void *buf, void *cdrarg, int kmflags)
{
stdata_t *stp = buf;
mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
stp->sd_wrq = NULL;
return (0);
}
/* ARGSUSED */
static void
stream_head_destructor(void *buf, void *cdrarg)
{
stdata_t *stp = buf;
mutex_destroy(&stp->sd_lock);
mutex_destroy(&stp->sd_reflock);
mutex_destroy(&stp->sd_qlock);
cv_destroy(&stp->sd_monitor);
cv_destroy(&stp->sd_iocmonitor);
cv_destroy(&stp->sd_refmonitor);
cv_destroy(&stp->sd_qcv);
cv_destroy(&stp->sd_zcopy_wait);
}
/*
* constructor/destructor routines for the queue cache
*/
/* ARGSUSED */
static int
queue_constructor(void *buf, void *cdrarg, int kmflags)
{
queinfo_t *qip = buf;
queue_t *qp = &qip->qu_rqueue;
queue_t *wqp = &qip->qu_wqueue;
syncq_t *sq = &qip->qu_syncq;
qp->q_first = NULL;
qp->q_link = NULL;
qp->q_count = 0;
qp->q_mblkcnt = 0;
qp->q_sqhead = NULL;
qp->q_sqtail = NULL;
qp->q_sqnext = NULL;
qp->q_sqprev = NULL;
qp->q_sqflags = 0;
qp->q_rwcnt = 0;
qp->q_spri = 0;
mutex_init(QLOCK(qp), NULL, MUTEX_DEFAULT, NULL);
cv_init(&qp->q_wait, NULL, CV_DEFAULT, NULL);
wqp->q_first = NULL;
wqp->q_link = NULL;
wqp->q_count = 0;
wqp->q_mblkcnt = 0;
wqp->q_sqhead = NULL;
wqp->q_sqtail = NULL;
wqp->q_sqnext = NULL;
wqp->q_sqprev = NULL;
wqp->q_sqflags = 0;
wqp->q_rwcnt = 0;
wqp->q_spri = 0;
mutex_init(QLOCK(wqp), NULL, MUTEX_DEFAULT, NULL);
cv_init(&wqp->q_wait, NULL, CV_DEFAULT, NULL);
sq->sq_head = NULL;
sq->sq_tail = NULL;
sq->sq_evhead = NULL;
sq->sq_evtail = NULL;
sq->sq_callbpend = NULL;
sq->sq_outer = NULL;
sq->sq_onext = NULL;
sq->sq_oprev = NULL;
sq->sq_next = NULL;
sq->sq_svcflags = 0;
sq->sq_servcount = 0;
sq->sq_needexcl = 0;
sq->sq_nqueues = 0;
sq->sq_pri = 0;
mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
return (0);
}
/* ARGSUSED */
static void
queue_destructor(void *buf, void *cdrarg)
{
queinfo_t *qip = buf;
queue_t *qp = &qip->qu_rqueue;
queue_t *wqp = &qip->qu_wqueue;
syncq_t *sq = &qip->qu_syncq;
ASSERT(qp->q_sqhead == NULL);
ASSERT(wqp->q_sqhead == NULL);
ASSERT(qp->q_sqnext == NULL);
ASSERT(wqp->q_sqnext == NULL);
ASSERT(qp->q_rwcnt == 0);
ASSERT(wqp->q_rwcnt == 0);
mutex_destroy(&qp->q_lock);
cv_destroy(&qp->q_wait);
mutex_destroy(&wqp->q_lock);
cv_destroy(&wqp->q_wait);
mutex_destroy(&sq->sq_lock);
cv_destroy(&sq->sq_wait);
cv_destroy(&sq->sq_exitwait);
}
/*
* constructor/destructor routines for the syncq cache
*/
/* ARGSUSED */
static int
syncq_constructor(void *buf, void *cdrarg, int kmflags)
{
syncq_t *sq = buf;
bzero(buf, sizeof (syncq_t));
mutex_init(&sq->sq_lock, NULL, MUTEX_DEFAULT, NULL);
cv_init(&sq->sq_wait, NULL, CV_DEFAULT, NULL);
cv_init(&sq->sq_exitwait, NULL, CV_DEFAULT, NULL);
return (0);
}
/* ARGSUSED */
static void
syncq_destructor(void *buf, void *cdrarg)
{
syncq_t *sq = buf;
ASSERT(sq->sq_head == NULL);
ASSERT(sq->sq_tail == NULL);
ASSERT(sq->sq_evhead == NULL);
ASSERT(sq->sq_evtail == NULL);
ASSERT(sq->sq_callbpend == NULL);
ASSERT(sq->sq_callbflags == 0);
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL);
ASSERT(sq->sq_oprev == NULL);
ASSERT(sq->sq_next == NULL);
ASSERT(sq->sq_needexcl == 0);
ASSERT(sq->sq_svcflags == 0);
ASSERT(sq->sq_servcount == 0);
ASSERT(sq->sq_nqueues == 0);
ASSERT(sq->sq_pri == 0);
ASSERT(sq->sq_count == 0);
ASSERT(sq->sq_rmqcount == 0);
ASSERT(sq->sq_cancelid == 0);
ASSERT(sq->sq_ciputctrl == NULL);
ASSERT(sq->sq_nciputctrl == 0);
ASSERT(sq->sq_type == 0);
ASSERT(sq->sq_flags == 0);
mutex_destroy(&sq->sq_lock);
cv_destroy(&sq->sq_wait);
cv_destroy(&sq->sq_exitwait);
}
/* ARGSUSED */
static int
ciputctrl_constructor(void *buf, void *cdrarg, int kmflags)
{
ciputctrl_t *cip = buf;
int i;
for (i = 0; i < n_ciputctrl; i++) {
cip[i].ciputctrl_count = SQ_FASTPUT;
mutex_init(&cip[i].ciputctrl_lock, NULL, MUTEX_DEFAULT, NULL);
}
return (0);
}
/* ARGSUSED */
static void
ciputctrl_destructor(void *buf, void *cdrarg)
{
ciputctrl_t *cip = buf;
int i;
for (i = 0; i < n_ciputctrl; i++) {
ASSERT(cip[i].ciputctrl_count & SQ_FASTPUT);
mutex_destroy(&cip[i].ciputctrl_lock);
}
}
/*
* Init routine run from main at boot time.
*/
void
strinit(void)
{
int ncpus = ((boot_max_ncpus == -1) ? max_ncpus : boot_max_ncpus);
stream_head_cache = kmem_cache_create("stream_head_cache",
sizeof (stdata_t), 0,
stream_head_constructor, stream_head_destructor, NULL,
NULL, NULL, 0);
queue_cache = kmem_cache_create("queue_cache", sizeof (queinfo_t), 0,
queue_constructor, queue_destructor, NULL, NULL, NULL, 0);
syncq_cache = kmem_cache_create("syncq_cache", sizeof (syncq_t), 0,
syncq_constructor, syncq_destructor, NULL, NULL, NULL, 0);
qband_cache = kmem_cache_create("qband_cache",
sizeof (qband_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
linkinfo_cache = kmem_cache_create("linkinfo_cache",
sizeof (linkinfo_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
n_ciputctrl = ncpus;
n_ciputctrl = 1 << highbit(n_ciputctrl - 1);
ASSERT(n_ciputctrl >= 1);
n_ciputctrl = MIN(n_ciputctrl, max_n_ciputctrl);
if (n_ciputctrl >= min_n_ciputctrl) {
ciputctrl_cache = kmem_cache_create("ciputctrl_cache",
sizeof (ciputctrl_t) * n_ciputctrl,
sizeof (ciputctrl_t), ciputctrl_constructor,
ciputctrl_destructor, NULL, NULL, NULL, 0);
}
streams_taskq = system_taskq;
if (streams_taskq == NULL)
panic("strinit: no memory for streams taskq!");
bc_bkgrnd_thread = thread_create(NULL, 0,
streams_bufcall_service, NULL, 0, &p0, TS_RUN, streams_lopri);
streams_qbkgrnd_thread = thread_create(NULL, 0,
streams_qbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
streams_sqbkgrnd_thread = thread_create(NULL, 0,
streams_sqbkgrnd_service, NULL, 0, &p0, TS_RUN, streams_lopri);
/*
* Create STREAMS kstats.
*/
str_kstat = kstat_create("streams", 0, "strstat",
"net", KSTAT_TYPE_NAMED,
sizeof (str_statistics) / sizeof (kstat_named_t),
KSTAT_FLAG_VIRTUAL);
if (str_kstat != NULL) {
str_kstat->ks_data = &str_statistics;
kstat_install(str_kstat);
}
/*
* TPI support routine initialisation.
*/
tpi_init();
/*
* Handle to have autopush and persistent link information per
* zone.
* Note: uses shutdown hook instead of destroy hook so that the
* persistent links can be torn down before the destroy hooks
* in the TCP/IP stack are called.
*/
netstack_register(NS_STR, str_stack_init, str_stack_shutdown,
str_stack_fini);
}
void
str_sendsig(vnode_t *vp, int event, uchar_t band, int error)
{
struct stdata *stp;
ASSERT(vp->v_stream);
stp = vp->v_stream;
/* Have to hold sd_lock to prevent siglist from changing */
mutex_enter(&stp->sd_lock);
if (stp->sd_sigflags & event)
strsendsig(stp->sd_siglist, event, band, error);
mutex_exit(&stp->sd_lock);
}
/*
* Send the "sevent" set of signals to a process.
* This might send more than one signal if the process is registered
* for multiple events. The caller should pass in an sevent that only
* includes the events for which the process has registered.
*/
static void
dosendsig(proc_t *proc, int events, int sevent, k_siginfo_t *info,
uchar_t band, int error)
{
ASSERT(MUTEX_HELD(&proc->p_lock));
info->si_band = 0;
info->si_errno = 0;
if (sevent & S_ERROR) {
sevent &= ~S_ERROR;
info->si_code = POLL_ERR;
info->si_errno = error;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_errno = 0;
}
if (sevent & S_HANGUP) {
sevent &= ~S_HANGUP;
info->si_code = POLL_HUP;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
}
if (sevent & S_HIPRI) {
sevent &= ~S_HIPRI;
info->si_code = POLL_PRI;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
}
if (sevent & S_RDBAND) {
sevent &= ~S_RDBAND;
if (events & S_BANDURG)
sigtoproc(proc, NULL, SIGURG);
else
sigtoproc(proc, NULL, SIGPOLL);
}
if (sevent & S_WRBAND) {
sevent &= ~S_WRBAND;
sigtoproc(proc, NULL, SIGPOLL);
}
if (sevent & S_INPUT) {
sevent &= ~S_INPUT;
info->si_code = POLL_IN;
info->si_band = band;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_band = 0;
}
if (sevent & S_OUTPUT) {
sevent &= ~S_OUTPUT;
info->si_code = POLL_OUT;
info->si_band = band;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_band = 0;
}
if (sevent & S_MSG) {
sevent &= ~S_MSG;
info->si_code = POLL_MSG;
info->si_band = band;
TRACE_2(TR_FAC_STREAMS_FR, TR_STRSENDSIG,
"strsendsig:proc %p info %p", proc, info);
sigaddq(proc, NULL, info, KM_NOSLEEP);
info->si_band = 0;
}
if (sevent & S_RDNORM) {
sevent &= ~S_RDNORM;
sigtoproc(proc, NULL, SIGPOLL);
}
if (sevent != 0) {
panic("strsendsig: unknown event(s) %x", sevent);
}
}
/*
* Send SIGPOLL/SIGURG signal to all processes and process groups
* registered on the given signal list that want a signal for at
* least one of the specified events.
*
* Must be called with exclusive access to siglist (caller holding sd_lock).
*
* strioctl(I_SETSIG/I_ESETSIG) will only change siglist when holding
* sd_lock and the ioctl code maintains a PID_HOLD on the pid structure
* while it is in the siglist.
*
* For performance reasons (MP scalability) the code drops pidlock
* when sending signals to a single process.
* When sending to a process group the code holds
* pidlock to prevent the membership in the process group from changing
* while walking the p_pglink list.
*/
void
strsendsig(strsig_t *siglist, int event, uchar_t band, int error)
{
strsig_t *ssp;
k_siginfo_t info;
struct pid *pidp;
proc_t *proc;
info.si_signo = SIGPOLL;
info.si_errno = 0;
for (ssp = siglist; ssp; ssp = ssp->ss_next) {
int sevent;
sevent = ssp->ss_events & event;
if (sevent == 0)
continue;
if ((pidp = ssp->ss_pidp) == NULL) {
/* pid was released but still on event list */
continue;
}
if (ssp->ss_pid > 0) {
/*
* XXX This unfortunately still generates
* a signal when a fd is closed but
* the proc is active.
*/
ASSERT(ssp->ss_pid == pidp->pid_id);
mutex_enter(&pidlock);
proc = prfind_zone(pidp->pid_id, ALL_ZONES);
if (proc == NULL) {
mutex_exit(&pidlock);
continue;
}
mutex_enter(&proc->p_lock);
mutex_exit(&pidlock);
dosendsig(proc, ssp->ss_events, sevent, &info,
band, error);
mutex_exit(&proc->p_lock);
} else {
/*
* Send to process group. Hold pidlock across
* calls to dosendsig().
*/
pid_t pgrp = -ssp->ss_pid;
mutex_enter(&pidlock);
proc = pgfind_zone(pgrp, ALL_ZONES);
while (proc != NULL) {
mutex_enter(&proc->p_lock);
dosendsig(proc, ssp->ss_events, sevent,
&info, band, error);
mutex_exit(&proc->p_lock);
proc = proc->p_pglink;
}
mutex_exit(&pidlock);
}
}
}
/*
* Attach a stream device or module.
* qp is a read queue; the new queue goes in so its next
* read ptr is the argument, and the write queue corresponding
* to the argument points to this queue. Return 0 on success,
* or a non-zero errno on failure.
*/
int
qattach(queue_t *qp, dev_t *devp, int oflag, cred_t *crp, fmodsw_impl_t *fp,
boolean_t is_insert)
{
major_t major;
cdevsw_impl_t *dp;
struct streamtab *str;
queue_t *rq;
queue_t *wrq;
uint32_t qflag;
uint32_t sqtype;
perdm_t *dmp;
int error;
int sflag;
rq = allocq();
wrq = _WR(rq);
STREAM(rq) = STREAM(wrq) = STREAM(qp);
if (fp != NULL) {
str = fp->f_str;
qflag = fp->f_qflag;
sqtype = fp->f_sqtype;
dmp = fp->f_dmp;
IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
sflag = MODOPEN;
/*
* stash away a pointer to the module structure so we can
* unref it in qdetach.
*/
rq->q_fp = fp;
} else {
ASSERT(!is_insert);
major = getmajor(*devp);
dp = &devimpl[major];
str = dp->d_str;
ASSERT(str == STREAMSTAB(major));
qflag = dp->d_qflag;
ASSERT(qflag & QISDRV);
sqtype = dp->d_sqtype;
/* create perdm_t if needed */
if (NEED_DM(dp->d_dmp, qflag))
dp->d_dmp = hold_dm(str, qflag, sqtype);
dmp = dp->d_dmp;
sflag = 0;
}
TRACE_2(TR_FAC_STREAMS_FR, TR_QATTACH_FLAGS,
"qattach:qflag == %X(%X)", qflag, *devp);
/* setq might sleep in allocator - avoid holding locks. */
setq(rq, str->st_rdinit, str->st_wrinit, dmp, qflag, sqtype, B_FALSE);
/*
* Before calling the module's open routine, set up the q_next
* pointer for inserting a module in the middle of a stream.
*
* Note that we can always set _QINSERTING and set up q_next
* pointer for both inserting and pushing a module. Then there
* is no need for the is_insert parameter. In insertq(), called
* by qprocson(), assume that q_next of the new module always points
* to the correct queue and use it for insertion. Everything should
* work out fine. But in the first release of _I_INSERT, we
* distinguish between inserting and pushing to make sure that
* pushing a module follows the same code path as before.
*/
if (is_insert) {
rq->q_flag |= _QINSERTING;
rq->q_next = qp;
}
/*
* If there is an outer perimeter get exclusive access during
* the open procedure. Bump up the reference count on the queue.
*/
entersq(rq->q_syncq, SQ_OPENCLOSE);
error = (*rq->q_qinfo->qi_qopen)(rq, devp, oflag, sflag, crp);
if (error != 0)
goto failed;
leavesq(rq->q_syncq, SQ_OPENCLOSE);
ASSERT(qprocsareon(rq));
return (0);
failed:
rq->q_flag &= ~_QINSERTING;
if (backq(wrq) != NULL && backq(wrq)->q_next == wrq)
qprocsoff(rq);
leavesq(rq->q_syncq, SQ_OPENCLOSE);
rq->q_next = wrq->q_next = NULL;
qdetach(rq, 0, 0, crp, B_FALSE);
return (error);
}
/*
* Handle second open of stream. For modules, set the
* last argument to MODOPEN and do not pass any open flags.
* Ignore dummydev since this is not the first open.
*/
int
qreopen(queue_t *qp, dev_t *devp, int flag, cred_t *crp)
{
int error;
dev_t dummydev;
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
entersq(qp->q_syncq, SQ_OPENCLOSE);
dummydev = *devp;
if (error = ((*qp->q_qinfo->qi_qopen)(qp, &dummydev,
(wqp->q_next ? 0 : flag), (wqp->q_next ? MODOPEN : 0), crp))) {
leavesq(qp->q_syncq, SQ_OPENCLOSE);
mutex_enter(&STREAM(qp)->sd_lock);
qp->q_stream->sd_flag |= STREOPENFAIL;
mutex_exit(&STREAM(qp)->sd_lock);
return (error);
}
leavesq(qp->q_syncq, SQ_OPENCLOSE);
/*
* successful open should have done qprocson()
*/
ASSERT(qprocsareon(_RD(qp)));
return (0);
}
/*
* Detach a stream module or device.
* If clmode == 1 then the module or driver was opened and its
* close routine must be called. If clmode == 0, the module
* or driver was never opened or the open failed, and so its close
* should not be called.
*/
void
qdetach(queue_t *qp, int clmode, int flag, cred_t *crp, boolean_t is_remove)
{
queue_t *wqp = _WR(qp);
ASSERT(STREAM(qp)->sd_flag & (STRCLOSE|STWOPEN|STRPLUMB));
if (STREAM_NEEDSERVICE(STREAM(qp)))
stream_runservice(STREAM(qp));
if (clmode) {
/*
* Make sure that all the messages on the write side syncq are
* processed and nothing is left. Since we are closing, no new
* messages may appear there.
*/
wait_q_syncq(wqp);
entersq(qp->q_syncq, SQ_OPENCLOSE);
if (is_remove) {
mutex_enter(QLOCK(qp));
qp->q_flag |= _QREMOVING;
mutex_exit(QLOCK(qp));
}
(*qp->q_qinfo->qi_qclose)(qp, flag, crp);
/*
* Check that qprocsoff() was actually called.
*/
ASSERT((qp->q_flag & QWCLOSE) && (wqp->q_flag & QWCLOSE));
leavesq(qp->q_syncq, SQ_OPENCLOSE);
} else {
disable_svc(qp);
}
/*
* Allow any threads blocked in entersq to proceed and discover
* the QWCLOSE is set.
* Note: This assumes that all users of entersq check QWCLOSE.
* Currently runservice is the only entersq that can happen
* after removeq has finished.
* Removeq will have discarded all messages destined to the closing
* pair of queues from the syncq.
* NOTE: Calling a function inside an assert is unconventional.
* However, it does not cause any problem since flush_syncq() does
* not change any state except when it returns non-zero i.e.
* when the assert will trigger.
*/
ASSERT(flush_syncq(qp->q_syncq, qp) == 0);
ASSERT(flush_syncq(wqp->q_syncq, wqp) == 0);
ASSERT((qp->q_flag & QPERMOD) ||
((qp->q_syncq->sq_head == NULL) &&
(wqp->q_syncq->sq_head == NULL)));
/* release any fmodsw_impl_t structure held on behalf of the queue */
ASSERT(qp->q_fp != NULL || qp->q_flag & QISDRV);
if (qp->q_fp != NULL)
fmodsw_rele(qp->q_fp);
/* freeq removes us from the outer perimeter if any */
freeq(qp);
}
/* Prevent service procedures from being called */
void
disable_svc(queue_t *qp)
{
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
mutex_enter(QLOCK(qp));
qp->q_flag |= QWCLOSE;
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
wqp->q_flag |= QWCLOSE;
mutex_exit(QLOCK(wqp));
}
/* allow service procedures to be called again */
void
enable_svc(queue_t *qp)
{
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
mutex_enter(QLOCK(qp));
qp->q_flag &= ~QWCLOSE;
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
wqp->q_flag &= ~QWCLOSE;
mutex_exit(QLOCK(wqp));
}
/*
* Remove queue from qhead/qtail if it is enabled.
* Only reset QENAB if the queue was removed from the runlist.
* A queue goes through 3 stages:
* It is on the service list and QENAB is set.
* It is removed from the service list but QENAB is still set.
* QENAB gets changed to QINSERVICE.
* QINSERVICE is reset (when the service procedure is done)
* Thus we can not reset QENAB unless we actually removed it from the service
* queue.
*/
void
remove_runlist(queue_t *qp)
{
if (qp->q_flag & QENAB && qhead != NULL) {
queue_t *q_chase;
queue_t *q_curr;
int removed;
mutex_enter(&service_queue);
RMQ(qp, qhead, qtail, q_link, q_chase, q_curr, removed);
mutex_exit(&service_queue);
if (removed) {
STRSTAT(qremoved);
qp->q_flag &= ~QENAB;
}
}
}
/*
* wait for any pending service processing to complete.
* The removal of queues from the runlist is not atomic with the
* clearing of the QENABLED flag and setting the INSERVICE flag.
* consequently it is possible for remove_runlist in strclose
* to not find the queue on the runlist but for it to be QENABLED
* and not yet INSERVICE -> hence wait_svc needs to check QENABLED
* as well as INSERVICE.
*/
void
wait_svc(queue_t *qp)
{
queue_t *wqp = _WR(qp);
ASSERT(qp->q_flag & QREADR);
/*
* Try to remove queues from qhead/qtail list.
*/
if (qhead != NULL) {
remove_runlist(qp);
remove_runlist(wqp);
}
/*
* Wait till the syncqs associated with the queue
* will dissapear from background processing list.
* This only needs to be done for non-PERMOD perimeters since
* for PERMOD perimeters the syncq may be shared and will only be freed
* when the last module/driver is unloaded.
* If for PERMOD perimeters queue was on the syncq list, removeq()
* should call propagate_syncq() or drain_syncq() for it. Both of these
* function remove the queue from its syncq list, so sqthread will not
* try to access the queue.
*/
if (!(qp->q_flag & QPERMOD)) {
syncq_t *rsq = qp->q_syncq;
syncq_t *wsq = wqp->q_syncq;
/*
* Disable rsq and wsq and wait for any background processing of
* syncq to complete.
*/
wait_sq_svc(rsq);
if (wsq != rsq)
wait_sq_svc(wsq);
}
mutex_enter(QLOCK(qp));
while (qp->q_flag & (QINSERVICE|QENAB))
cv_wait(&qp->q_wait, QLOCK(qp));
mutex_exit(QLOCK(qp));
mutex_enter(QLOCK(wqp));
while (wqp->q_flag & (QINSERVICE|QENAB))
cv_wait(&wqp->q_wait, QLOCK(wqp));
mutex_exit(QLOCK(wqp));
}
/*
* Put ioctl data from userland buffer `arg' into the mblk chain `bp'.
* `flag' must always contain either K_TO_K or U_TO_K; STR_NOSIG may
* also be set, and is passed through to allocb_cred_wait().
*
* Returns errno on failure, zero on success.
*/
int
putiocd(mblk_t *bp, char *arg, int flag, cred_t *cr)
{
mblk_t *tmp;
ssize_t count;
int error = 0;
ASSERT((flag & (U_TO_K | K_TO_K)) == U_TO_K ||
(flag & (U_TO_K | K_TO_K)) == K_TO_K);
if (bp->b_datap->db_type == M_IOCTL) {
count = ((struct iocblk *)bp->b_rptr)->ioc_count;
} else {
ASSERT(bp->b_datap->db_type == M_COPYIN);
count = ((struct copyreq *)bp->b_rptr)->cq_size;
}
/*
* strdoioctl validates ioc_count, so if this assert fails it
* cannot be due to user error.
*/
ASSERT(count >= 0);
if ((tmp = allocb_cred_wait(count, (flag & STR_NOSIG), &error, cr,
curproc->p_pid)) == NULL) {
return (error);
}
error = strcopyin(arg, tmp->b_wptr, count, flag & (U_TO_K|K_TO_K));
if (error != 0) {
freeb(tmp);
return (error);
}
DB_CPID(tmp) = curproc->p_pid;
tmp->b_wptr += count;
bp->b_cont = tmp;
return (0);
}
/*
* Copy ioctl data to user-land. Return non-zero errno on failure,
* 0 for success.
*/
int
getiocd(mblk_t *bp, char *arg, int copymode)
{
ssize_t count;
size_t n;
int error;
if (bp->b_datap->db_type == M_IOCACK)
count = ((struct iocblk *)bp->b_rptr)->ioc_count;
else {
ASSERT(bp->b_datap->db_type == M_COPYOUT);
count = ((struct copyreq *)bp->b_rptr)->cq_size;
}
ASSERT(count >= 0);
for (bp = bp->b_cont; bp && count;
count -= n, bp = bp->b_cont, arg += n) {
n = MIN(count, bp->b_wptr - bp->b_rptr);
error = strcopyout(bp->b_rptr, arg, n, copymode);
if (error)
return (error);
}
ASSERT(count == 0);
return (0);
}
/*
* Allocate a linkinfo entry given the write queue of the
* bottom module of the top stream and the write queue of the
* stream head of the bottom stream.
*/
linkinfo_t *
alloclink(queue_t *qup, queue_t *qdown, file_t *fpdown)
{
linkinfo_t *linkp;
linkp = kmem_cache_alloc(linkinfo_cache, KM_SLEEP);
linkp->li_lblk.l_qtop = qup;
linkp->li_lblk.l_qbot = qdown;
linkp->li_fpdown = fpdown;
mutex_enter(&strresources);
linkp->li_next = linkinfo_list;
linkp->li_prev = NULL;
if (linkp->li_next)
linkp->li_next->li_prev = linkp;
linkinfo_list = linkp;
linkp->li_lblk.l_index = ++lnk_id;
ASSERT(lnk_id != 0); /* this should never wrap in practice */
mutex_exit(&strresources);
return (linkp);
}
/*
* Free a linkinfo entry.
*/
void
lbfree(linkinfo_t *linkp)
{
mutex_enter(&strresources);
if (linkp->li_next)
linkp->li_next->li_prev = linkp->li_prev;
if (linkp->li_prev)
linkp->li_prev->li_next = linkp->li_next;
else
linkinfo_list = linkp->li_next;
mutex_exit(&strresources);
kmem_cache_free(linkinfo_cache, linkp);
}
/*
* Check for a potential linking cycle.
* Return 1 if a link will result in a cycle,
* and 0 otherwise.
*/
int
linkcycle(stdata_t *upstp, stdata_t *lostp, str_stack_t *ss)
{
struct mux_node *np;
struct mux_edge *ep;
int i;
major_t lomaj;
major_t upmaj;
/*
* if the lower stream is a pipe/FIFO, return, since link
* cycles can not happen on pipes/FIFOs
*/
if (lostp->sd_vnode->v_type == VFIFO)
return (0);
for (i = 0; i < ss->ss_devcnt; i++) {
np = &ss->ss_mux_nodes[i];
MUX_CLEAR(np);
}
lomaj = getmajor(lostp->sd_vnode->v_rdev);
upmaj = getmajor(upstp->sd_vnode->v_rdev);
np = &ss->ss_mux_nodes[lomaj];
for (;;) {
if (!MUX_DIDVISIT(np)) {
if (np->mn_imaj == upmaj)
return (1);
if (np->mn_outp == NULL) {
MUX_VISIT(np);
if (np->mn_originp == NULL)
return (0);
np = np->mn_originp;
continue;
}
MUX_VISIT(np);
np->mn_startp = np->mn_outp;
} else {
if (np->mn_startp == NULL) {
if (np->mn_originp == NULL)
return (0);
else {
np = np->mn_originp;
continue;
}
}
/*
* If ep->me_nodep is a FIFO (me_nodep == NULL),
* ignore the edge and move on. ep->me_nodep gets
* set to NULL in mux_addedge() if it is a FIFO.
*
*/
ep = np->mn_startp;
np->mn_startp = ep->me_nextp;
if (ep->me_nodep == NULL)
continue;
ep->me_nodep->mn_originp = np;
np = ep->me_nodep;
}
}
}
/*
* Find linkinfo entry corresponding to the parameters.
*/
linkinfo_t *
findlinks(stdata_t *stp, int index, int type, str_stack_t *ss)
{
linkinfo_t *linkp;
struct mux_edge *mep;
struct mux_node *mnp;
queue_t *qup;
mutex_enter(&strresources);
if ((type & LINKTYPEMASK) == LINKNORMAL) {
qup = getendq(stp->sd_wrq);
for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
if ((qup == linkp->li_lblk.l_qtop) &&
(!index || (index == linkp->li_lblk.l_index))) {
mutex_exit(&strresources);
return (linkp);
}
}
} else {
ASSERT((type & LINKTYPEMASK) == LINKPERSIST);
mnp = &ss->ss_mux_nodes[getmajor(stp->sd_vnode->v_rdev)];
mep = mnp->mn_outp;
while (mep) {
if ((index == 0) || (index == mep->me_muxid))
break;
mep = mep->me_nextp;
}
if (!mep) {
mutex_exit(&strresources);
return (NULL);
}
for (linkp = linkinfo_list; linkp; linkp = linkp->li_next) {
if ((!linkp->li_lblk.l_qtop) &&
(mep->me_muxid == linkp->li_lblk.l_index)) {
mutex_exit(&strresources);
return (linkp);
}
}
}
mutex_exit(&strresources);
return (NULL);
}
/*
* Given a queue ptr, follow the chain of q_next pointers until you reach the
* last queue on the chain and return it.
*/
queue_t *
getendq(queue_t *q)
{
ASSERT(q != NULL);
while (_SAMESTR(q))
q = q->q_next;
return (q);
}
/*
* wait for the syncq count to drop to zero.
* sq could be either outer or inner.
*/
static void
wait_syncq(syncq_t *sq)
{
uint16_t count;
mutex_enter(SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
while (count != 0) {
sq->sq_flags |= SQ_WANTWAKEUP;
SQ_PUTLOCKS_EXIT(sq);
cv_wait(&sq->sq_wait, SQLOCK(sq));
count = sq->sq_count;
SQ_PUTLOCKS_ENTER(sq);
SUM_SQ_PUTCOUNTS(sq, count);
}
SQ_PUTLOCKS_EXIT(sq);
mutex_exit(SQLOCK(sq));
}
/*
* Wait while there are any messages for the queue in its syncq.
*/
static void
wait_q_syncq(queue_t *q)
{
if ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
syncq_t *sq = q->q_syncq;
mutex_enter(SQLOCK(sq));
while ((q->q_sqflags & Q_SQQUEUED) || (q->q_syncqmsgs > 0)) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
mutex_exit(SQLOCK(sq));
}
}
int
mlink_file(vnode_t *vp, int cmd, struct file *fpdown, cred_t *crp, int *rvalp,
int lhlink)
{
struct stdata *stp;
struct strioctl strioc;
struct linkinfo *linkp;
struct stdata *stpdown;
struct streamtab *str;
queue_t *passq;
syncq_t *passyncq;
queue_t *rq;
cdevsw_impl_t *dp;
uint32_t qflag;
uint32_t sqtype;
perdm_t *dmp;
int error = 0;
netstack_t *ns;
str_stack_t *ss;
stp = vp->v_stream;
TRACE_1(TR_FAC_STREAMS_FR,
TR_I_LINK, "I_LINK/I_PLINK:stp %p", stp);
/*
* Test for invalid upper stream
*/
if (stp->sd_flag & STRHUP) {
return (ENXIO);
}
if (vp->v_type == VFIFO) {
return (EINVAL);
}
if (stp->sd_strtab == NULL) {
return (EINVAL);
}
if (!stp->sd_strtab->st_muxwinit) {
return (EINVAL);
}
if (fpdown == NULL) {
return (EBADF);
}
ns = netstack_find_by_cred(crp);
ASSERT(ns != NULL);
ss = ns->netstack_str;
ASSERT(ss != NULL);
if (getmajor(stp->sd_vnode->v_rdev) >= ss->ss_devcnt) {
netstack_rele(ss->ss_netstack);
return (EINVAL);
}
mutex_enter(&muxifier);
if (stp->sd_flag & STPLEX) {
mutex_exit(&muxifier);
netstack_rele(ss->ss_netstack);
return (ENXIO);
}
/*
* Test for invalid lower stream.
* The check for the v_type != VFIFO and having a major
* number not >= devcnt is done to avoid problems with
* adding mux_node entry past the end of mux_nodes[].
* For FIFO's we don't add an entry so this isn't a
* problem.
*/
if (((stpdown = fpdown->f_vnode->v_stream) == NULL) ||
(stpdown == stp) || (stpdown->sd_flag &
(STPLEX|STRHUP|STRDERR|STWRERR|IOCWAIT|STRPLUMB)) ||
((stpdown->sd_vnode->v_type != VFIFO) &&
(getmajor(stpdown->sd_vnode->v_rdev) >= ss->ss_devcnt)) ||
linkcycle(stp, stpdown, ss)) {
mutex_exit(&muxifier);
netstack_rele(ss->ss_netstack);
return (EINVAL);
}
TRACE_1(TR_FAC_STREAMS_FR,
TR_STPDOWN, "stpdown:%p", stpdown);
rq = getendq(stp->sd_wrq);
if (cmd == I_PLINK)
rq = NULL;
linkp = alloclink(rq, stpdown->sd_wrq, fpdown);
strioc.ic_cmd = cmd;
strioc.ic_timout = INFTIM;
strioc.ic_len = sizeof (struct linkblk);
strioc.ic_dp = (char *)&linkp->li_lblk;
/*
* STRPLUMB protects plumbing changes and should be set before
* link_addpassthru()/link_rempassthru() are called, so it is set here
* and cleared in the end of mlink when passthru queue is removed.
* Setting of STRPLUMB prevents reopens of the stream while passthru
* queue is in-place (it is not a proper module and doesn't have open
* entry point).
*
* STPLEX prevents any threads from entering the stream from above. It
* can't be set before the call to link_addpassthru() because putnext
* from below may cause stream head I/O routines to be called and these
* routines assert that STPLEX is not set. After link_addpassthru()
* nothing may come from below since the pass queue syncq is blocked.
* Note also that STPLEX should be cleared before the call to
* link_remmpassthru() since when messages start flowing to the stream
* head (e.g. because of message propagation from the pass queue) stream
* head I/O routines may be called with STPLEX flag set.
*
* When STPLEX is set, nothing may come into the stream from above and
* it is safe to do a setq which will change stream head. So, the
* correct sequence of actions is:
*
* 1) Set STRPLUMB
* 2) Call link_addpassthru()
* 3) Set STPLEX
* 4) Call setq and update the stream state
* 5) Clear STPLEX
* 6) Call link_rempassthru()
* 7) Clear STRPLUMB
*
* The same sequence applies to munlink() code.
*/
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag |= STRPLUMB;
mutex_exit(&stpdown->sd_lock);
/*
* Add passthru queue below lower mux. This will block
* syncqs of lower muxs read queue during I_LINK/I_UNLINK.
*/
passq = link_addpassthru(stpdown);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag |= STPLEX;
mutex_exit(&stpdown->sd_lock);
rq = _RD(stpdown->sd_wrq);
/*
* There may be messages in the streamhead's syncq due to messages
* that arrived before link_addpassthru() was done. To avoid
* background processing of the syncq happening simultaneous with
* setq processing, we disable the streamhead syncq and wait until
* existing background thread finishes working on it.
*/
wait_sq_svc(rq->q_syncq);
passyncq = passq->q_syncq;
if (!(passyncq->sq_flags & SQ_BLOCKED))
blocksq(passyncq, SQ_BLOCKED, 0);
ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
rq->q_ptr = _WR(rq)->q_ptr = NULL;
/* setq might sleep in allocator - avoid holding locks. */
/* Note: we are holding muxifier here. */
str = stp->sd_strtab;
dp = &devimpl[getmajor(vp->v_rdev)];
ASSERT(dp->d_str == str);
qflag = dp->d_qflag;
sqtype = dp->d_sqtype;
/* create perdm_t if needed */
if (NEED_DM(dp->d_dmp, qflag))
dp->d_dmp = hold_dm(str, qflag, sqtype);
dmp = dp->d_dmp;
setq(rq, str->st_muxrinit, str->st_muxwinit, dmp, qflag, sqtype,
B_TRUE);
/*
* XXX Remove any "odd" messages from the queue.
* Keep only M_DATA, M_PROTO, M_PCPROTO.
*/
error = strdoioctl(stp, &strioc, FNATIVE,
K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
if (error != 0) {
lbfree(linkp);
if (!(passyncq->sq_flags & SQ_BLOCKED))
blocksq(passyncq, SQ_BLOCKED, 0);
/*
* Restore the stream head queue and then remove
* the passq. Turn off STPLEX before we turn on
* the stream by removing the passq.
*/
rq->q_ptr = _WR(rq)->q_ptr = stpdown;
setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO,
B_TRUE);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STPLEX;
mutex_exit(&stpdown->sd_lock);
link_rempassthru(passq);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
/* Wakeup anyone waiting for STRPLUMB to clear. */
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
mutex_exit(&muxifier);
netstack_rele(ss->ss_netstack);
return (error);
}
mutex_enter(&fpdown->f_tlock);
fpdown->f_count++;
mutex_exit(&fpdown->f_tlock);
/*
* if we've made it here the linkage is all set up so we should also
* set up the layered driver linkages
*/
ASSERT((cmd == I_LINK) || (cmd == I_PLINK));
if (cmd == I_LINK) {
ldi_mlink_fp(stp, fpdown, lhlink, LINKNORMAL);
} else {
ldi_mlink_fp(stp, fpdown, lhlink, LINKPERSIST);
}
link_rempassthru(passq);
mux_addedge(stp, stpdown, linkp->li_lblk.l_index, ss);
/*
* Mark the upper stream as having dependent links
* so that strclose can clean it up.
*/
if (cmd == I_LINK) {
mutex_enter(&stp->sd_lock);
stp->sd_flag |= STRHASLINKS;
mutex_exit(&stp->sd_lock);
}
/*
* Wake up any other processes that may have been
* waiting on the lower stream. These will all
* error out.
*/
mutex_enter(&stpdown->sd_lock);
/* The passthru module is removed so we may release STRPLUMB */
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&rq->q_wait);
cv_broadcast(&_WR(rq)->q_wait);
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
mutex_exit(&muxifier);
*rvalp = linkp->li_lblk.l_index;
netstack_rele(ss->ss_netstack);
return (0);
}
int
mlink(vnode_t *vp, int cmd, int arg, cred_t *crp, int *rvalp, int lhlink)
{
int ret;
struct file *fpdown;
fpdown = getf(arg);
ret = mlink_file(vp, cmd, fpdown, crp, rvalp, lhlink);
if (fpdown != NULL)
releasef(arg);
return (ret);
}
/*
* Unlink a multiplexor link. Stp is the controlling stream for the
* link, and linkp points to the link's entry in the linkinfo list.
* The muxifier lock must be held on entry and is dropped on exit.
*
* NOTE : Currently it is assumed that mux would process all the messages
* sitting on it's queue before ACKing the UNLINK. It is the responsibility
* of the mux to handle all the messages that arrive before UNLINK.
* If the mux has to send down messages on its lower stream before
* ACKing I_UNLINK, then it *should* know to handle messages even
* after the UNLINK is acked (actually it should be able to handle till we
* re-block the read side of the pass queue here). If the mux does not
* open up the lower stream, any messages that arrive during UNLINK
* will be put in the stream head. In the case of lower stream opening
* up, some messages might land in the stream head depending on when
* the message arrived and when the read side of the pass queue was
* re-blocked.
*/
int
munlink(stdata_t *stp, linkinfo_t *linkp, int flag, cred_t *crp, int *rvalp,
str_stack_t *ss)
{
struct strioctl strioc;
struct stdata *stpdown;
queue_t *rq, *wrq;
queue_t *passq;
syncq_t *passyncq;
int error = 0;
file_t *fpdown;
ASSERT(MUTEX_HELD(&muxifier));
stpdown = linkp->li_fpdown->f_vnode->v_stream;
/*
* See the comment in mlink() concerning STRPLUMB/STPLEX flags.
*/
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag |= STRPLUMB;
mutex_exit(&stpdown->sd_lock);
/*
* Add passthru queue below lower mux. This will block
* syncqs of lower muxs read queue during I_LINK/I_UNLINK.
*/
passq = link_addpassthru(stpdown);
if ((flag & LINKTYPEMASK) == LINKNORMAL)
strioc.ic_cmd = I_UNLINK;
else
strioc.ic_cmd = I_PUNLINK;
strioc.ic_timout = INFTIM;
strioc.ic_len = sizeof (struct linkblk);
strioc.ic_dp = (char *)&linkp->li_lblk;
error = strdoioctl(stp, &strioc, FNATIVE,
K_TO_K | STR_NOERROR | STR_NOSIG, crp, rvalp);
/*
* If there was an error and this is not called via strclose,
* return to the user. Otherwise, pretend there was no error
* and close the link.
*/
if (error) {
if (flag & LINKCLOSE) {
cmn_err(CE_WARN, "KERNEL: munlink: could not perform "
"unlink ioctl, closing anyway (%d)\n", error);
} else {
link_rempassthru(passq);
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
mutex_exit(&muxifier);
return (error);
}
}
mux_rmvedge(stp, linkp->li_lblk.l_index, ss);
fpdown = linkp->li_fpdown;
lbfree(linkp);
/*
* We go ahead and drop muxifier here--it's a nasty global lock that
* can slow others down. It's okay to since attempts to mlink() this
* stream will be stopped because STPLEX is still set in the stdata
* structure, and munlink() is stopped because mux_rmvedge() and
* lbfree() have removed it from mux_nodes[] and linkinfo_list,
* respectively. Note that we defer the closef() of fpdown until
* after we drop muxifier since strclose() can call munlinkall().
*/
mutex_exit(&muxifier);
wrq = stpdown->sd_wrq;
rq = _RD(wrq);
/*
* Get rid of outstanding service procedure runs, before we make
* it a stream head, since a stream head doesn't have any service
* procedure.
*/
disable_svc(rq);
wait_svc(rq);
/*
* Since we don't disable the syncq for QPERMOD, we wait for whatever
* is queued up to be finished. mux should take care that nothing is
* send down to this queue. We should do it now as we're going to block
* passyncq if it was unblocked.
*/
if (wrq->q_flag & QPERMOD) {
syncq_t *sq = wrq->q_syncq;
mutex_enter(SQLOCK(sq));
while (wrq->q_sqflags & Q_SQQUEUED) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
mutex_exit(SQLOCK(sq));
}
passyncq = passq->q_syncq;
if (!(passyncq->sq_flags & SQ_BLOCKED)) {
syncq_t *sq, *outer;
/*
* Messages could be flowing from underneath. We will
* block the read side of the passq. This would be
* sufficient for QPAIR and QPERQ muxes to ensure
* that no data is flowing up into this queue
* and hence no thread active in this instance of
* lower mux. But for QPERMOD and QMTOUTPERIM there
* could be messages on the inner and outer/inner
* syncqs respectively. We will wait for them to drain.
* Because passq is blocked messages end up in the syncq
* And qfill_syncq could possibly end up setting QFULL
* which will access the rq->q_flag. Hence, we have to
* acquire the QLOCK in setq.
*
* XXX Messages can also flow from top into this
* queue though the unlink is over (Ex. some instance
* in putnext() called from top that has still not
* accessed this queue. And also putq(lowerq) ?).
* Solution : How about blocking the l_qtop queue ?
* Do we really care about such pure D_MP muxes ?
*/
blocksq(passyncq, SQ_BLOCKED, 0);
sq = rq->q_syncq;
if ((outer = sq->sq_outer) != NULL) {
/*
* We have to just wait for the outer sq_count
* drop to zero. As this does not prevent new
* messages to enter the outer perimeter, this
* is subject to starvation.
*
* NOTE :Because of blocksq above, messages could
* be in the inner syncq only because of some
* thread holding the outer perimeter exclusively.
* Hence it would be sufficient to wait for the
* exclusive holder of the outer perimeter to drain
* the inner and outer syncqs. But we will not depend
* on this feature and hence check the inner syncqs
* separately.
*/
wait_syncq(outer);
}
/*
* There could be messages destined for
* this queue. Let the exclusive holder
* drain it.
*/
wait_syncq(sq);
ASSERT((rq->q_flag & QPERMOD) ||
((rq->q_syncq->sq_head == NULL) &&
(_WR(rq)->q_syncq->sq_head == NULL)));
}
/*
* We haven't taken care of QPERMOD case yet. QPERMOD is a special
* case as we don't disable its syncq or remove it off the syncq
* service list.
*/
if (rq->q_flag & QPERMOD) {
syncq_t *sq = rq->q_syncq;
mutex_enter(SQLOCK(sq));
while (rq->q_sqflags & Q_SQQUEUED) {
sq->sq_flags |= SQ_WANTWAKEUP;
cv_wait(&sq->sq_wait, SQLOCK(sq));
}
mutex_exit(SQLOCK(sq));
}
/*
* flush_syncq changes states only when there is some messages to
* free. ie when it returns non-zero value to return.
*/
ASSERT(flush_syncq(rq->q_syncq, rq) == 0);
ASSERT(flush_syncq(wrq->q_syncq, wrq) == 0);
/*
* No body else should know about this queue now.
* If the mux did not process the messages before
* acking the I_UNLINK, free them now.
*/
flushq(rq, FLUSHALL);
flushq(_WR(rq), FLUSHALL);
/*
* Convert the mux lower queue into a stream head queue.
* Turn off STPLEX before we turn on the stream by removing the passq.
*/
rq->q_ptr = wrq->q_ptr = stpdown;
setq(rq, &strdata, &stwdata, NULL, QMTSAFE, SQ_CI|SQ_CO, B_TRUE);
ASSERT((rq->q_flag & QMT_TYPEMASK) == QMTSAFE);
ASSERT(rq->q_syncq == SQ(rq) && _WR(rq)->q_syncq == SQ(rq));
enable_svc(rq);
/*
* Now it is a proper stream, so STPLEX is cleared. But STRPLUMB still
* needs to be set to prevent reopen() of the stream - such reopen may
* try to call non-existent pass queue open routine and panic.
*/
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STPLEX;
mutex_exit(&stpdown->sd_lock);
ASSERT(((flag & LINKTYPEMASK) == LINKNORMAL) ||
((flag & LINKTYPEMASK) == LINKPERSIST));
/* clean up the layered driver linkages */
if ((flag & LINKTYPEMASK) == LINKNORMAL) {
ldi_munlink_fp(stp, fpdown, LINKNORMAL);
} else {
ldi_munlink_fp(stp, fpdown, LINKPERSIST);
}
link_rempassthru(passq);
/*
* Now all plumbing changes are finished and STRPLUMB is no
* longer needed.
*/
mutex_enter(&stpdown->sd_lock);
stpdown->sd_flag &= ~STRPLUMB;
cv_broadcast(&stpdown->sd_monitor);
mutex_exit(&stpdown->sd_lock);
(void) closef(fpdown);
return (0);
}
/*
* Unlink all multiplexor links for which stp is the controlling stream.
* Return 0, or a non-zero errno on failure.
*/
int
munlinkall(stdata_t *stp, int flag, cred_t *crp, int *rvalp, str_stack_t *ss)
{
linkinfo_t *linkp;
int error = 0;
mutex_enter(&muxifier);
while (linkp = findlinks(stp, 0, flag, ss)) {
/*
* munlink() releases the muxifier lock.
*/
if (error = munlink(stp, linkp, flag, crp, rvalp, ss))
return (error);
mutex_enter(&muxifier);
}
mutex_exit(&muxifier);
return (0);
}
/*
* A multiplexor link has been made. Add an
* edge to the directed graph.
*/
void
mux_addedge(stdata_t *upstp, stdata_t *lostp, int muxid, str_stack_t *ss)
{
struct mux_node *np;
struct mux_edge *ep;
major_t upmaj;
major_t lomaj;
upmaj = getmajor(upstp->sd_vnode->v_rdev);
lomaj = getmajor(lostp->sd_vnode->v_rdev);
np = &ss->ss_mux_nodes[upmaj];
if (np->mn_outp) {
ep = np->mn_outp;
while (ep->me_nextp)
ep = ep->me_nextp;
ep->me_nextp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
ep = ep->me_nextp;
} else {
np->mn_outp = kmem_alloc(sizeof (struct mux_edge), KM_SLEEP);
ep = np->mn_outp;
}
ep->me_nextp = NULL;
ep->me_muxid = muxid;
/*
* Save the dev_t for the purposes of str_stack_shutdown.
* str_stack_shutdown assumes that the device allows reopen, since
* this dev_t is the one after any cloning by xx_open().
* Would prefer finding the dev_t from before any cloning,
* but specfs doesn't retain that.
*/
ep->me_dev = upstp->sd_vnode->v_rdev;
if (lostp->sd_vnode->v_type == VFIFO)
ep->me_nodep = NULL;
else
ep->me_nodep = &ss->ss_mux_nodes[lomaj];
}
/*
* A multiplexor link has been removed. Remove the
* edge in the directed graph.
*/
void
mux_rmvedge(stdata_t *upstp, int muxid, str_stack_t *ss)
{
struct mux_node *np;
struct mux_edge *ep;
struct mux_edge *pep = NULL;
major_t upmaj;
upmaj = getmajor(upstp->sd_vnode->v_rdev);
np = &ss->ss_mux_nodes[upmaj];
ASSERT(np->mn_outp != NULL);
ep = np->mn_outp;
while (ep) {
if (ep->me_muxid == muxid) {
if (pep)
pep->me_nextp = ep->me_nextp;
else
np->mn_outp = ep->me_nextp;
kmem_free(ep, sizeof (struct mux_edge));
return;
}
pep = ep;
ep = ep->me_nextp;
}
ASSERT(0); /* should not reach here */
}
/*
* Translate the device flags (from conf.h) to the corresponding
* qflag and sq_flag (type) values.
*/
int
devflg_to_qflag(struct streamtab *stp, uint32_t devflag, uint32_t *qflagp,
uint32_t *sqtypep)
{
uint32_t qflag = 0;
uint32_t sqtype = 0;
if (devflag & _D_OLD)
goto bad;
/* Inner perimeter presence and scope */
switch (devflag & D_MTINNER_MASK) {
case D_MP:
qflag |= QMTSAFE;
sqtype |= SQ_CI;
break;
case D_MTPERQ|D_MP:
qflag |= QPERQ;
break;
case D_MTQPAIR|D_MP:
qflag |= QPAIR;
break;
case D_MTPERMOD|D_MP:
qflag |= QPERMOD;
break;
default:
goto bad;
}
/* Outer perimeter */
if (devflag & D_MTOUTPERIM) {
switch (devflag & D_MTINNER_MASK) {
case D_MP:
case D_MTPERQ|D_MP:
case D_MTQPAIR|D_MP:
break;
default:
goto bad;
}
qflag |= QMTOUTPERIM;
}
/* Inner perimeter modifiers */
if (devflag & D_MTINNER_MOD) {
switch (devflag & D_MTINNER_MASK) {
case D_MP:
goto bad;
default:
break;
}
if (devflag & D_MTPUTSHARED)
sqtype |= SQ_CIPUT;
if (devflag & _D_MTOCSHARED) {
/*
* The code in putnext assumes that it has the
* highest concurrency by not checking sq_count.
* Thus _D_MTOCSHARED can only be supported when
* D_MTPUTSHARED is set.
*/
if (!(devflag & D_MTPUTSHARED))
goto bad;
sqtype |= SQ_CIOC;
}
if (devflag & _D_MTCBSHARED) {
/*
* The code in putnext assumes that it has the
* highest concurrency by not checking sq_count.
* Thus _D_MTCBSHARED can only be supported when
* D_MTPUTSHARED is set.
*/
if (!(devflag & D_MTPUTSHARED))
goto bad;
sqtype |= SQ_CICB;
}
if (devflag & _D_MTSVCSHARED) {
/*
* The code in putnext assumes that it has the
* highest concurrency by not checking sq_count.
* Thus _D_MTSVCSHARED can only be supported when
* D_MTPUTSHARED is set. Also _D_MTSVCSHARED is
* supported only for QPERMOD.
*/
if (!(devflag & D_MTPUTSHARED) || !(qflag & QPERMOD))
goto bad;
sqtype |= SQ_CISVC;
}
}
/* Default outer perimeter concurrency */
sqtype |= SQ_CO;
/* Outer perimeter modifiers */
if (devflag & D_MTOCEXCL) {
if (!(devflag & D_MTOUTPERIM)) {
/* No outer perimeter */
goto bad;
}
sqtype &= ~SQ_COOC;
}
/* Synchronous Streams extended qinit structure */
if (devflag & D_SYNCSTR)
qflag |= QSYNCSTR;
/*
* Private flag used by a transport module to indicate
* to sockfs that it supports direct-access mode without
* having to go through STREAMS or the transport can use
* sodirect_t sharing to bypass STREAMS for receive-side
* M_DATA processing.
*/
if (devflag & (_D_DIRECT|_D_SODIRECT)) {
/* Reject unless the module is fully-MT (no perimeter) */
if ((qflag & QMT_TYPEMASK) != QMTSAFE)
goto bad;
if (devflag & _D_DIRECT)
qflag |= _QDIRECT;
if (devflag & _D_SODIRECT)
qflag |= _QSODIRECT;
}
*qflagp = qflag;
*sqtypep = sqtype;
return (0);
bad:
cmn_err(CE_WARN,
"stropen: bad MT flags (0x%x) in driver '%s'",
(int)(qflag & D_MTSAFETY_MASK),
stp->st_rdinit->qi_minfo->mi_idname);
return (EINVAL);
}
/*
* Set the interface values for a pair of queues (qinit structure,
* packet sizes, water marks).
* setq assumes that the caller does not have a claim (entersq or claimq)
* on the queue.
*/
void
setq(queue_t *rq, struct qinit *rinit, struct qinit *winit,
perdm_t *dmp, uint32_t qflag, uint32_t sqtype, boolean_t lock_needed)
{
queue_t *wq;
syncq_t *sq, *outer;
ASSERT(rq->q_flag & QREADR);
ASSERT((qflag & QMT_TYPEMASK) != 0);
IMPLY((qflag & (QPERMOD | QMTOUTPERIM)), dmp != NULL);
wq = _WR(rq);
rq->q_qinfo = rinit;
rq->q_hiwat = rinit->qi_minfo->mi_hiwat;
rq->q_lowat = rinit->qi_minfo->mi_lowat;
rq->q_minpsz = rinit->qi_minfo->mi_minpsz;
rq->q_maxpsz = rinit->qi_minfo->mi_maxpsz;
wq->q_qinfo = winit;
wq->q_hiwat = winit->qi_minfo->mi_hiwat;
wq->q_lowat = winit->qi_minfo->mi_lowat;
wq->q_minpsz = winit->qi_minfo->mi_minpsz;
wq->q_maxpsz = winit->qi_minfo->mi_maxpsz;
/* Remove old syncqs */
sq = rq->q_syncq;
outer = sq->sq_outer;
if (outer != NULL) {
ASSERT(wq->q_syncq->sq_outer == outer);
outer_remove(outer, rq->q_syncq);
if (wq->q_syncq != rq->q_syncq)
outer_remove(outer, wq->q_syncq);
}
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
if (sq != SQ(rq)) {
if (!(rq->q_flag & QPERMOD))
free_syncq(sq);
if (wq->q_syncq == rq->q_syncq)
wq->q_syncq = NULL;
rq->q_syncq = NULL;
}
if (wq->q_syncq != NULL && wq->q_syncq != sq &&
wq->q_syncq != SQ(rq)) {
free_syncq(wq->q_syncq);
wq->q_syncq = NULL;
}
ASSERT(rq->q_syncq == NULL || (rq->q_syncq->sq_head == NULL &&
rq->q_syncq->sq_tail == NULL));
ASSERT(wq->q_syncq == NULL || (wq->q_syncq->sq_head == NULL &&
wq->q_syncq->sq_tail == NULL));
if (!(rq->q_flag & QPERMOD) &&
rq->q_syncq != NULL && rq->q_syncq->sq_ciputctrl != NULL) {
ASSERT(rq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(rq->q_syncq->sq_ciputctrl,
rq->q_syncq->sq_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, rq->q_syncq->sq_ciputctrl);
rq->q_syncq->sq_ciputctrl = NULL;
rq->q_syncq->sq_nciputctrl = 0;
}
if (!(wq->q_flag & QPERMOD) &&
wq->q_syncq != NULL && wq->q_syncq->sq_ciputctrl != NULL) {
ASSERT(wq->q_syncq->sq_nciputctrl == n_ciputctrl - 1);
SUMCHECK_CIPUTCTRL_COUNTS(wq->q_syncq->sq_ciputctrl,
wq->q_syncq->sq_nciputctrl, 0);
ASSERT(ciputctrl_cache != NULL);
kmem_cache_free(ciputctrl_cache, wq->q_syncq->sq_ciputctrl);
wq->q_syncq->sq_ciputctrl = NULL;
wq->q_syncq->sq_nciputctrl = 0;
}
sq = SQ(rq);
ASSERT(sq->sq_head == NULL && sq->sq_tail == NULL);
ASSERT(sq->sq_outer == NULL);
ASSERT(sq->sq_onext == NULL && sq->sq_oprev == NULL);
/*
* Create syncqs based on qflag and sqtype. Set the SQ_TYPES_IN_FLAGS
* bits in sq_flag based on the sqtype.
*/
ASSERT((sq->sq_flags & ~SQ_TYPES_IN_FLAGS) == 0);
rq->q_syncq = wq->q_syncq = sq;
sq->sq_type = sqtype;
sq->sq_flags = (sqtype & SQ_TYPES_IN_FLAGS);
/*
* We are making sq_svcflags zero,
* resetting SQ_DISABLED in case it was set by
* wait_svc() in the munlink path.
*
*/
ASSERT((sq->sq_svcflags & SQ_SERVICE) == 0);
sq->sq_svcflags = 0;
/*
* We need to acquire the lock here for the mlink and munlink case,
* where canputnext, backenable, etc can access the q_flag.
*/
if (lock_needed) {
mutex_enter(QLOCK(rq));
rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
mutex_exit(QLOCK(rq));
mutex_enter(QLOCK(wq));
wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
mutex_exit(QLOCK(wq));
} else {
rq->q_flag = (rq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
wq->q_flag = (wq->q_flag & ~QMT_TYPEMASK) | QWANTR | qflag;
}
if (qflag & QPERQ) {
/* Allocate a separate syncq for the write side */
sq = new_syncq();
sq->sq_type = rq->q_syncq->sq_type;
sq->sq_flags = rq->q_syncq->sq_flags;
ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL);
wq->q_syncq = sq;
}
if (qflag & QPERMOD) {
sq = dmp->dm_sq;
/*
* Assert that we do have an inner perimeter syncq and that it
* does not have an outer perimeter associated with it.
*/
ASSERT(sq->sq_outer == NULL && sq->sq_onext == NULL &&
sq->sq_oprev == NULL);
rq->q_syncq = wq->q_syncq = sq;
}
if (qflag & QMTOUTPERIM) {
outer = dmp->dm_sq;
ASSERT(outer->sq_outer == NULL);
outer_insert(outer, rq->q_syncq);
if (wq->q_syncq != rq->q_syncq)
outer_insert(outer, wq->q_syncq);
}
ASSERT((rq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
(rq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
ASSERT((wq->q_syncq->sq_flags & SQ_TYPES_IN_FLAGS) ==
(wq->q_syncq->sq_type & SQ_TYPES_IN_FLAGS));
ASSERT((rq->q_flag & QMT_TYPEMASK) == (qflag & QMT_TYPEMASK));
/*
* Initialize struio() types.
*/
rq->q_struiot =
(rq->q_flag & QSYNCSTR) ? rinit->qi_struiot : STRUIOT_NONE;
wq->q_struiot =
(wq->q_flag & QSYNCSTR) ? winit->qi_struiot : STRUIOT_NONE;
}
perdm_t *
hold_dm(struct streamtab *str, uint32_t qflag, uint32_t sqtype)
{
syncq_t *sq;
perdm_t **pp;
perdm_t *p;
perdm_t *dmp;
ASSERT(str != NULL);
ASSERT(qflag & (QPERMOD | QMTOUTPERIM));
rw_enter(&perdm_rwlock, RW_READER);
for (p = perdm_list; p != NULL; p = p->dm_next) {
if (p->dm_str == str) { /* found one */
atomic_add_32(&(p->dm_ref), 1);
rw_exit(&perdm_rwlock);
return (p);
}
}
rw_exit(&perdm_rwlock);
sq = new_syncq();
if (qflag & QPERMOD) {
sq->sq_type = sqtype | SQ_PERMOD;
sq->sq_flags = sqtype & SQ_TYPES_IN_FLAGS;
} else {
ASSERT(qflag & QMTOUTPERIM);
sq->sq_onext = sq->sq_oprev = sq;
}
dmp = kmem_alloc(sizeof (perdm_t), KM_SLEEP);
dmp->dm_sq = sq;
dmp->dm_str = str;
dmp->dm_ref = 1;
dmp->dm_next = NULL;
rw_enter(&perdm_rwlock, RW_WRITER);
for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next)) {
if (p->dm_str == str) { /* already present */
p->dm_ref++;
rw_exit(&perdm_rwlock);
free_syncq(sq);
kmem_free(dmp, sizeof (perdm_t));
return (p);
}
}
*pp = dmp;
rw_exit(&perdm_rwlock);
return (dmp);
}
void
rele_dm(perdm_t *dmp)
{
perdm_t **pp;
perdm_t *p;
rw_enter(&perdm_rwlock, RW_WRITER);
ASSERT(dmp->dm_ref > 0);
if (--dmp->dm_ref > 0) {
rw_exit(&perdm_rwlock);
return;
}
for (pp = &perdm_list; (p = *pp) != NULL; pp = &(p->dm_next))
if (p == dmp)
break;
ASSERT(p == dmp);
*pp = p->dm_next;
rw_exit(&perdm_rwlock);
/*
* Wait for any background processing that relies on the
* syncq to complete before it is freed.
*/
wait_sq_svc(p->dm_sq);
free_syncq(p->dm_sq);
kmem_free(p, sizeof (perdm_t));
}
/*
* Make a protocol message given control and data buffers.
* n.b., this can block; be careful of what locks you hold when calling it.
*
* If sd_maxblk is less than *iosize this routine can fail part way through
* (due to an allocation failure). In this case on return *iosize will contain
* the amount that was consumed. Otherwise *iosize will not be modified
* i.e. it will contain the amount that was consumed.
*/
int
strmakemsg(
struct strbuf *mctl,
ssize_t *iosize,
struct uio *uiop,
stdata_t *stp,
int32_t flag,
mblk_t **mpp)
{
mblk_t *mpctl = NULL;
mblk_t *mpdata = NULL;
int error;
ASSERT(uiop != NULL);
*mpp = NULL;
/* Create control part, if any */
if ((mctl != NULL) && (mctl->len >= 0)) {
error = strmakectl(mctl, flag, uiop->uio_fmode, &mpctl);
if (error)
return (error);
}
/* Create data part, if any */
if (*iosize >= 0) {
error = strmakedata(iosize, uiop, stp, flag, &mpdata);
if (error) {
freemsg(mpctl);
return (error);
}
}
if (mpctl != NULL) {
if (mpdata != NULL)
linkb(mpctl, mpdata);
*mpp = mpctl;
} else {
*mpp = mpdata;
}
return (0);
}
/*
* Make the control part of a protocol message given a control buffer.
* n.b., this can block; be careful of what locks you hold when calling it.