server: Make async I/O queues into real objects.
diff --git a/server/async.c b/server/async.c
index 440cb2e..8307e20 100644
--- a/server/async.c
+++ b/server/async.c
@@ -62,6 +62,35 @@
async_destroy /* destroy */
};
+
+struct async_queue
+{
+ struct object obj; /* object header */
+ struct fd *fd; /* file descriptor owning this queue */
+ struct list queue; /* queue of async objects */
+};
+
+static void async_queue_dump( struct object *obj, int verbose );
+static void async_queue_destroy( struct object *obj );
+
+static const struct object_ops async_queue_ops =
+{
+ sizeof(struct async_queue), /* size */
+ async_queue_dump, /* dump */
+ no_add_queue, /* add_queue */
+ NULL, /* remove_queue */
+ NULL, /* signaled */
+ NULL, /* satisfied */
+ no_signal, /* signal */
+ no_get_fd, /* get_fd */
+ no_map_access, /* map_access */
+ no_lookup_name, /* lookup_name */
+ no_open_file, /* open_file */
+ no_close_handle, /* close_handle */
+ async_queue_destroy /* destroy */
+};
+
+
static void async_dump( struct object *obj, int verbose )
{
struct async *async = (struct async *)obj;
@@ -79,6 +108,21 @@
release_object( async->thread );
}
+static void async_queue_dump( struct object *obj, int verbose )
+{
+ struct async_queue *async_queue = (struct async_queue *)obj;
+ assert( obj->ops == &async_queue_ops );
+ fprintf( stderr, "Async queue fd=%p\n", async_queue->fd );
+}
+
+static void async_queue_destroy( struct object *obj )
+{
+ struct async_queue *async_queue = (struct async_queue *)obj;
+ assert( obj->ops == &async_queue_ops );
+
+ async_wake_up( async_queue, STATUS_HANDLES_CLOSED );
+}
+
/* notifies client thread of new status of its async request */
/* destroys the server side of it */
static void async_terminate( struct async *async, unsigned int status )
@@ -108,9 +152,22 @@
async_terminate( async, STATUS_TIMEOUT );
}
+/* create a new async queue for a given fd */
+struct async_queue *create_async_queue( struct fd *fd )
+{
+ struct async_queue *queue = alloc_object( &async_queue_ops );
+
+ if (queue)
+ {
+ queue->fd = fd;
+ list_init( &queue->queue );
+ }
+ return queue;
+}
+
/* create an async on a given queue of a fd */
struct async *create_async( struct thread *thread, const struct timeval *timeout,
- struct list *queue, const async_data_t *data )
+ struct async_queue *queue, const async_data_t *data )
{
struct event *event = NULL;
struct async *async;
@@ -128,7 +185,7 @@
async->event = event;
async->data = *data;
- list_add_tail( queue, &async->queue_entry );
+ list_add_tail( &queue->queue, &async->queue_entry );
if (timeout) async->timeout = add_timeout_user( timeout, async_timeout, async );
else async->timeout = NULL;
@@ -164,21 +221,23 @@
}
}
-/* terminate the async operation at the head of the queue */
-void async_terminate_head( struct list *queue, unsigned int status )
+/* check if an async operation is waiting to be alerted */
+int async_waiting( struct async_queue *queue )
{
- struct list *ptr = list_head( queue );
- if (ptr) async_terminate( LIST_ENTRY( ptr, struct async, queue_entry ), status );
+ return queue && !list_empty( &queue->queue );
}
-/* terminate all async operations on the queue */
-void async_terminate_queue( struct list *queue, unsigned int status )
+/* wake up async operations on the queue */
+void async_wake_up( struct async_queue *queue, unsigned int status )
{
struct list *ptr, *next;
- LIST_FOR_EACH_SAFE( ptr, next, queue )
+ if (!queue) return;
+
+ LIST_FOR_EACH_SAFE( ptr, next, &queue->queue )
{
struct async *async = LIST_ENTRY( ptr, struct async, queue_entry );
async_terminate( async, status );
+ if (status == STATUS_ALERTED) break; /* only wake up the first one */
}
}
diff --git a/server/change.c b/server/change.c
index 70eb54e..c4696a0 100644
--- a/server/change.c
+++ b/server/change.c
@@ -572,7 +572,7 @@
list_add_tail( &dir->change_records, &record->entry );
}
- fd_async_terminate_head( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
+ fd_async_wake_up( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
}
static unsigned int filter_from_event( struct inotify_event *ie )
@@ -1097,7 +1097,7 @@
/* if there's already a change in the queue, send it */
if (!list_empty( &dir->change_records ))
- fd_async_terminate_head( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
+ fd_async_wake_up( dir->fd, ASYNC_TYPE_WAIT, STATUS_ALERTED );
/* setup the real notification */
if (!inotify_adjust_changes( dir ))
diff --git a/server/fd.c b/server/fd.c
index baab135..ffaf404 100644
--- a/server/fd.c
+++ b/server/fd.c
@@ -171,9 +171,9 @@
int fs_locks :1; /* can we use filesystem locks for this fd? */
int unmounted :1;/* has the device been unmounted? */
int poll_index; /* index of fd in poll array */
- struct list read_q; /* async readers of this fd */
- struct list write_q; /* async writers of this fd */
- struct list wait_q; /* other async waiters of this fd */
+ struct async_queue *read_q; /* async readers of this fd */
+ struct async_queue *write_q; /* async writers of this fd */
+ struct async_queue *wait_q; /* other async waiters of this fd */
};
static void fd_dump( struct object *obj, int verbose );
@@ -1286,9 +1286,9 @@
{
struct fd *fd = (struct fd *)obj;
- async_terminate_queue( &fd->read_q, STATUS_CANCELLED );
- async_terminate_queue( &fd->write_q, STATUS_CANCELLED );
- async_terminate_queue( &fd->wait_q, STATUS_CANCELLED );
+ if (fd->read_q) release_object( fd->read_q );
+ if (fd->write_q) release_object( fd->write_q );
+ if (fd->wait_q) release_object( fd->wait_q );
remove_fd_locks( fd );
list_remove( &fd->inode_entry );
@@ -1330,8 +1330,8 @@
{
assert( fd->inode );
- async_terminate_queue( &fd->read_q, STATUS_VOLUME_DISMOUNTED );
- async_terminate_queue( &fd->write_q, STATUS_VOLUME_DISMOUNTED );
+ async_wake_up( fd->read_q, STATUS_VOLUME_DISMOUNTED );
+ async_wake_up( fd->write_q, STATUS_VOLUME_DISMOUNTED );
if (fd->poll_index != -1) set_fd_events( fd, -1 );
@@ -1363,11 +1363,11 @@
fd->fs_locks = 1;
fd->unmounted = 0;
fd->poll_index = -1;
+ fd->read_q = NULL;
+ fd->write_q = NULL;
+ fd->wait_q = NULL;
list_init( &fd->inode_entry );
list_init( &fd->locks );
- list_init( &fd->read_q );
- list_init( &fd->write_q );
- list_init( &fd->wait_q );
if ((fd->poll_index = add_poll_user( fd )) == -1)
{
@@ -1394,11 +1394,11 @@
fd->fs_locks = 0;
fd->unmounted = 0;
fd->poll_index = -1;
+ fd->read_q = NULL;
+ fd->write_q = NULL;
+ fd->wait_q = NULL;
list_init( &fd->inode_entry );
list_init( &fd->locks );
- list_init( &fd->read_q );
- list_init( &fd->write_q );
- list_init( &fd->wait_q );
return fd;
}
@@ -1692,27 +1692,16 @@
{
int events = 0;
- if (!list_empty( &fd->read_q ))
- events |= POLLIN;
- if (!list_empty( &fd->write_q ))
- events |= POLLOUT;
-
+ if (async_waiting( fd->read_q )) events |= POLLIN;
+ if (async_waiting( fd->write_q )) events |= POLLOUT;
return events;
}
/* default handler for poll() events */
void default_poll_event( struct fd *fd, int event )
{
- if (!list_empty( &fd->read_q ) && (POLLIN & event) )
- {
- async_terminate_head( &fd->read_q, STATUS_ALERTED );
- return;
- }
- if (!list_empty( &fd->write_q ) && (POLLOUT & event) )
- {
- async_terminate_head( &fd->write_q, STATUS_ALERTED );
- return;
- }
+ if (event & POLLIN) async_wake_up( fd->read_q, STATUS_ALERTED );
+ if (event & POLLOUT) async_wake_up( fd->write_q, STATUS_ALERTED );
/* if an error occurred, stop polling this fd to avoid busy-looping */
if (event & (POLLERR | POLLHUP)) set_fd_events( fd, -1 );
@@ -1722,18 +1711,21 @@
int fd_queue_async_timeout( struct fd *fd, const async_data_t *data, int type, int count,
const struct timeval *timeout )
{
- struct list *queue;
+ struct async_queue *queue;
switch (type)
{
case ASYNC_TYPE_READ:
- queue = &fd->read_q;
+ if (!fd->read_q && !(fd->read_q = create_async_queue( fd ))) return 0;
+ queue = fd->read_q;
break;
case ASYNC_TYPE_WRITE:
- queue = &fd->write_q;
+ if (!fd->write_q && !(fd->write_q = create_async_queue( fd ))) return 0;
+ queue = fd->write_q;
break;
case ASYNC_TYPE_WAIT:
- queue = &fd->wait_q;
+ if (!fd->wait_q && !(fd->wait_q = create_async_queue( fd ))) return 0;
+ queue = fd->wait_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
@@ -1746,41 +1738,23 @@
if (!fd->inode)
set_fd_events( fd, fd->fd_ops->get_poll_events( fd ) );
else /* regular files are always ready for read and write */
- if (type != ASYNC_TYPE_WAIT) async_terminate_head( queue, STATUS_ALERTED );
+ if (type != ASYNC_TYPE_WAIT) async_wake_up( queue, STATUS_ALERTED );
return 1;
}
-void fd_async_terminate_head( struct fd *fd, int type, unsigned int status )
+void fd_async_wake_up( struct fd *fd, int type, unsigned int status )
{
switch (type)
{
case ASYNC_TYPE_READ:
- async_terminate_head( &fd->read_q, status );
+ async_wake_up( fd->read_q, status );
break;
case ASYNC_TYPE_WRITE:
- async_terminate_head( &fd->write_q, status );
+ async_wake_up( fd->write_q, status );
break;
case ASYNC_TYPE_WAIT:
- async_terminate_head( &fd->wait_q, status );
- break;
- default:
- assert(0);
- }
-}
-
-void fd_async_terminate_queue( struct fd *fd, int type, unsigned int status )
-{
- switch (type)
- {
- case ASYNC_TYPE_READ:
- async_terminate_queue( &fd->read_q, status );
- break;
- case ASYNC_TYPE_WRITE:
- async_terminate_queue( &fd->write_q, status );
- break;
- case ASYNC_TYPE_WAIT:
- async_terminate_queue( &fd->wait_q, status );
+ async_wake_up( fd->wait_q, status );
break;
default:
assert(0);
@@ -1802,9 +1776,9 @@
void default_fd_cancel_async( struct fd *fd )
{
- async_terminate_queue( &fd->read_q, STATUS_CANCELLED );
- async_terminate_queue( &fd->write_q, STATUS_CANCELLED );
- async_terminate_queue( &fd->wait_q, STATUS_CANCELLED );
+ async_wake_up( fd->read_q, STATUS_CANCELLED );
+ async_wake_up( fd->write_q, STATUS_CANCELLED );
+ async_wake_up( fd->wait_q, STATUS_CANCELLED );
}
/* default flush() routine */
diff --git a/server/file.h b/server/file.h
index f0f3a6b..9bae112 100644
--- a/server/file.h
+++ b/server/file.h
@@ -24,6 +24,7 @@
#include "object.h"
struct fd;
+struct async_queue;
typedef unsigned __int64 file_pos_t;
@@ -70,8 +71,7 @@
extern void default_poll_event( struct fd *fd, int event );
extern int fd_queue_async_timeout( struct fd *fd, const async_data_t *data, int type,
int count, const struct timeval *timeout );
-extern void fd_async_terminate_head( struct fd *fd, int type, unsigned int status );
-extern void fd_async_terminate_queue( struct fd *fd, int type, unsigned int status );
+extern void fd_async_wake_up( struct fd *fd, int type, unsigned int status );
extern void default_fd_queue_async( struct fd *fd, const async_data_t *data, int type, int count );
extern void default_fd_cancel_async( struct fd *fd );
extern void no_flush( struct fd *fd, struct event **event );
@@ -124,11 +124,12 @@
extern struct object *create_serial( struct fd *fd, unsigned int options );
/* async I/O functions */
+extern struct async_queue *create_async_queue( struct fd *fd );
extern struct async *create_async( struct thread *thread, const struct timeval *timeout,
- struct list *queue, const async_data_t *data );
+ struct async_queue *queue, const async_data_t *data );
extern void async_set_result( struct object *obj, unsigned int status );
-extern void async_terminate_head( struct list *queue, unsigned int status );
-extern void async_terminate_queue( struct list *queue, unsigned int status );
+extern int async_waiting( struct async_queue *queue );
+extern void async_wake_up( struct async_queue *queue, unsigned int status );
/* access rights that require Unix read permission */
#define FILE_UNIX_READ_ACCESS (FILE_READ_DATA|FILE_READ_ATTRIBUTES|FILE_READ_EA)
diff --git a/server/named_pipe.c b/server/named_pipe.c
index a763982..d76c242 100644
--- a/server/named_pipe.c
+++ b/server/named_pipe.c
@@ -74,7 +74,7 @@
struct named_pipe *pipe;
struct timeout_user *flush_poll;
struct event *event;
- struct list wait_q; /* only a single one can be queued */
+ struct async_queue *wait_q; /* only a single one can be queued */
unsigned int options; /* pipe options */
};
@@ -96,7 +96,7 @@
unsigned int timeout;
unsigned int instances;
struct list servers; /* list of servers using this pipe */
- struct list waiters; /* list of clients waiting to connect */
+ struct async_queue *waiters; /* list of clients waiting to connect */
};
struct named_pipe_device
@@ -274,7 +274,7 @@
assert( list_empty( &pipe->servers ) );
assert( !pipe->instances );
- async_terminate_queue( &pipe->waiters, STATUS_HANDLES_CLOSED );
+ if (pipe->waiters) release_object( pipe->waiters );
}
static struct fd *pipe_client_get_fd( struct object *obj )
@@ -366,7 +366,7 @@
server->client = NULL;
}
- async_terminate_head( &server->wait_q, STATUS_HANDLES_CLOSED );
+ release_object( server->wait_q );
assert( server->pipe->instances );
server->pipe->instances--;
@@ -634,7 +634,7 @@
server->client = NULL;
server->flush_poll = NULL;
server->options = options;
- list_init( &server->wait_q );
+ server->wait_q = create_async_queue( NULL );
list_add_head( &pipe->servers, &server->entry );
grab_object( pipe );
@@ -718,8 +718,7 @@
if (client->fd && server->fd && res != 1)
{
if (server->state == ps_wait_open)
- async_terminate_head( &server->wait_q, STATUS_SUCCESS );
- assert( list_empty( &server->wait_q ) );
+ async_wake_up( server->wait_q, STATUS_SUCCESS );
server->state = ps_connected_server;
server->client = client;
client->server = server;
@@ -753,8 +752,8 @@
{
/* initialize it if it didn't already exist */
pipe->instances = 0;
+ pipe->waiters = NULL;
list_init( &pipe->servers );
- list_init( &pipe->waiters );
pipe->insize = req->insize;
pipe->outsize = req->outsize;
pipe->maxinstances = req->maxinstances;
@@ -805,8 +804,8 @@
case ps_wait_connect:
assert( !server->fd );
server->state = ps_wait_open;
- create_async( current, NULL, &server->wait_q, &req->async );
- async_terminate_queue( &server->pipe->waiters, STATUS_SUCCESS );
+ create_async( current, NULL, server->wait_q, &req->async );
+ if (server->pipe->waiters) async_wake_up( server->pipe->waiters, STATUS_SUCCESS );
set_error( STATUS_PENDING );
break;
case ps_connected_server:
@@ -849,9 +848,14 @@
server = find_available_server( pipe );
if (!server)
{
+ if (!pipe->waiters && !(pipe->waiters = create_async_queue( NULL )))
+ {
+ release_object( pipe );
+ return;
+ }
if (req->timeout == NMPWAIT_WAIT_FOREVER)
{
- if (create_async( current, NULL, &pipe->waiters, &req->async ))
+ if (create_async( current, NULL, pipe->waiters, &req->async ))
set_error( STATUS_PENDING );
}
else
@@ -859,7 +863,7 @@
struct timeval when = current_time;
if (req->timeout == NMPWAIT_USE_DEFAULT_WAIT) add_timeout( &when, pipe->timeout );
else add_timeout( &when, req->timeout );
- if (create_async( current, &when, &pipe->waiters, &req->async ))
+ if (create_async( current, &when, pipe->waiters, &req->async ))
set_error( STATUS_PENDING );
}
}
diff --git a/server/serial.c b/server/serial.c
index 8cd59e2..12401ac 100644
--- a/server/serial.c
+++ b/server/serial.c
@@ -267,7 +267,7 @@
serial->eventmask = req->eventmask;
if (!serial->eventmask)
{
- fd_async_terminate_queue( serial->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
+ fd_async_wake_up( serial->fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS );
}
}
diff --git a/server/sock.c b/server/sock.c
index 009d37e..b9938f7 100644
--- a/server/sock.c
+++ b/server/sock.c
@@ -83,8 +83,8 @@
obj_handle_t wparam; /* message wparam (socket handle) */
int errors[FD_MAX_EVENTS]; /* event errors */
struct sock *deferred; /* socket that waits for a deferred accept */
- struct list read_q; /* queue for asynchronous reads */
- struct list write_q; /* queue for asynchronous writes */
+ struct async_queue *read_q; /* queue for asynchronous reads */
+ struct async_queue *write_q; /* queue for asynchronous writes */
};
static void sock_dump( struct object *obj, int verbose );
@@ -245,16 +245,16 @@
if ( sock->flags & WSA_FLAG_OVERLAPPED )
{
- if ( pollev & (POLLIN|POLLPRI) && !list_empty( &sock->read_q ))
+ if ( pollev & (POLLIN|POLLPRI) && async_waiting( sock->read_q ))
{
if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock );
- async_terminate_head( &sock->read_q, STATUS_ALERTED );
+ async_wake_up( sock->read_q, STATUS_ALERTED );
async_active = 1;
}
- if ( pollev & POLLOUT && !list_empty( &sock->write_q ))
+ if ( pollev & POLLOUT && async_waiting( sock->write_q ))
{
if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock );
- async_terminate_head( &sock->write_q, STATUS_ALERTED );
+ async_wake_up( sock->write_q, STATUS_ALERTED );
async_active = 1;
}
}
@@ -483,9 +483,9 @@
/* listening, wait for readable */
return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;
- if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->read_q )))
+ if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && async_waiting( sock->read_q )))
ev |= POLLIN | POLLPRI;
- if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->write_q )))
+ if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && async_waiting( sock->write_q )))
ev |= POLLOUT;
/* We use POLLIN with 0 bytes recv() as FD_CLOSE indication for stream sockets. */
if ( sock->type == SOCK_STREAM && ( sock->mask & ~sock->hmask & FD_CLOSE) )
@@ -512,7 +512,7 @@
static void sock_queue_async( struct fd *fd, const async_data_t *data, int type, int count )
{
struct sock *sock = get_fd_user( fd );
- struct list *queue;
+ struct async_queue *queue;
int pollev;
assert( sock->obj.ops == &sock_ops );
@@ -526,11 +526,13 @@
switch (type)
{
case ASYNC_TYPE_READ:
- queue = &sock->read_q;
+ if (!sock->read_q && !(sock->read_q = create_async_queue( sock->fd ))) return;
+ queue = sock->read_q;
sock->hmask &= ~FD_CLOSE;
break;
case ASYNC_TYPE_WRITE:
- queue = &sock->write_q;
+ if (!sock->write_q && !(sock->write_q = create_async_queue( sock->fd ))) return;
+ queue = sock->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
@@ -557,8 +559,8 @@
struct sock *sock = get_fd_user( fd );
assert( sock->obj.ops == &sock_ops );
- async_terminate_queue( &sock->read_q, STATUS_CANCELLED );
- async_terminate_queue( &sock->write_q, STATUS_CANCELLED );
+ async_wake_up( sock->read_q, STATUS_CANCELLED );
+ async_wake_up( sock->write_q, STATUS_CANCELLED );
}
static struct fd *sock_get_fd( struct object *obj )
@@ -577,11 +579,8 @@
if ( sock->deferred )
release_object( sock->deferred );
- if ( sock->flags & WSA_FLAG_OVERLAPPED )
- {
- async_terminate_queue( &sock->read_q, STATUS_CANCELLED );
- async_terminate_queue( &sock->write_q, STATUS_CANCELLED );
- }
+ if (sock->read_q) release_object( sock->read_q );
+ if (sock->write_q) release_object( sock->write_q );
if (sock->event) release_object( sock->event );
if (sock->fd)
{
@@ -624,13 +623,13 @@
sock->message = 0;
sock->wparam = 0;
sock->deferred = NULL;
+ sock->read_q = NULL;
+ sock->write_q = NULL;
if (!(sock->fd = create_anonymous_fd( &sock_fd_ops, sockfd, &sock->obj )))
{
release_object( sock );
return NULL;
}
- list_init( &sock->read_q );
- list_init( &sock->write_q );
sock_reselect( sock );
clear_error();
return &sock->obj;
@@ -693,14 +692,14 @@
if (sock->event) acceptsock->event = (struct event *)grab_object( sock->event );
acceptsock->flags = sock->flags;
acceptsock->deferred = NULL;
+ acceptsock->read_q = NULL;
+ acceptsock->write_q = NULL;
if (!(acceptsock->fd = create_anonymous_fd( &sock_fd_ops, acceptfd, &acceptsock->obj )))
{
release_object( acceptsock );
release_object( sock );
return NULL;
}
- list_init( &acceptsock->read_q );
- list_init( &acceptsock->write_q );
}
clear_error();
sock->pmask &= ~FD_ACCEPT;