server: Make async I/O queues into real objects.
diff --git a/server/sock.c b/server/sock.c
index 009d37e..b9938f7 100644
--- a/server/sock.c
+++ b/server/sock.c
@@ -83,8 +83,8 @@
obj_handle_t wparam; /* message wparam (socket handle) */
int errors[FD_MAX_EVENTS]; /* event errors */
struct sock *deferred; /* socket that waits for a deferred accept */
- struct list read_q; /* queue for asynchronous reads */
- struct list write_q; /* queue for asynchronous writes */
+ struct async_queue *read_q; /* queue for asynchronous reads */
+ struct async_queue *write_q; /* queue for asynchronous writes */
};
static void sock_dump( struct object *obj, int verbose );
@@ -245,16 +245,16 @@
if ( sock->flags & WSA_FLAG_OVERLAPPED )
{
- if ( pollev & (POLLIN|POLLPRI) && !list_empty( &sock->read_q ))
+ if ( pollev & (POLLIN|POLLPRI) && async_waiting( sock->read_q ))
{
if (debug_level) fprintf( stderr, "activating read queue for socket %p\n", sock );
- async_terminate_head( &sock->read_q, STATUS_ALERTED );
+ async_wake_up( sock->read_q, STATUS_ALERTED );
async_active = 1;
}
- if ( pollev & POLLOUT && !list_empty( &sock->write_q ))
+ if ( pollev & POLLOUT && async_waiting( sock->write_q ))
{
if (debug_level) fprintf( stderr, "activating write queue for socket %p\n", sock );
- async_terminate_head( &sock->write_q, STATUS_ALERTED );
+ async_wake_up( sock->write_q, STATUS_ALERTED );
async_active = 1;
}
}
@@ -483,9 +483,9 @@
/* listening, wait for readable */
return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;
- if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->read_q )))
+ if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && async_waiting( sock->read_q )))
ev |= POLLIN | POLLPRI;
- if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && !list_empty( &sock->write_q )))
+ if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && async_waiting( sock->write_q )))
ev |= POLLOUT;
/* We use POLLIN with 0 bytes recv() as FD_CLOSE indication for stream sockets. */
if ( sock->type == SOCK_STREAM && ( sock->mask & ~sock->hmask & FD_CLOSE) )
@@ -512,7 +512,7 @@
static void sock_queue_async( struct fd *fd, const async_data_t *data, int type, int count )
{
struct sock *sock = get_fd_user( fd );
- struct list *queue;
+ struct async_queue *queue;
int pollev;
assert( sock->obj.ops == &sock_ops );
@@ -526,11 +526,13 @@
switch (type)
{
case ASYNC_TYPE_READ:
- queue = &sock->read_q;
+ if (!sock->read_q && !(sock->read_q = create_async_queue( sock->fd ))) return;
+ queue = sock->read_q;
sock->hmask &= ~FD_CLOSE;
break;
case ASYNC_TYPE_WRITE:
- queue = &sock->write_q;
+ if (!sock->write_q && !(sock->write_q = create_async_queue( sock->fd ))) return;
+ queue = sock->write_q;
break;
default:
set_error( STATUS_INVALID_PARAMETER );
@@ -557,8 +559,8 @@
struct sock *sock = get_fd_user( fd );
assert( sock->obj.ops == &sock_ops );
- async_terminate_queue( &sock->read_q, STATUS_CANCELLED );
- async_terminate_queue( &sock->write_q, STATUS_CANCELLED );
+ async_wake_up( sock->read_q, STATUS_CANCELLED );
+ async_wake_up( sock->write_q, STATUS_CANCELLED );
}
static struct fd *sock_get_fd( struct object *obj )
@@ -577,11 +579,8 @@
if ( sock->deferred )
release_object( sock->deferred );
- if ( sock->flags & WSA_FLAG_OVERLAPPED )
- {
- async_terminate_queue( &sock->read_q, STATUS_CANCELLED );
- async_terminate_queue( &sock->write_q, STATUS_CANCELLED );
- }
+ if (sock->read_q) release_object( sock->read_q );
+ if (sock->write_q) release_object( sock->write_q );
if (sock->event) release_object( sock->event );
if (sock->fd)
{
@@ -624,13 +623,13 @@
sock->message = 0;
sock->wparam = 0;
sock->deferred = NULL;
+ sock->read_q = NULL;
+ sock->write_q = NULL;
if (!(sock->fd = create_anonymous_fd( &sock_fd_ops, sockfd, &sock->obj )))
{
release_object( sock );
return NULL;
}
- list_init( &sock->read_q );
- list_init( &sock->write_q );
sock_reselect( sock );
clear_error();
return &sock->obj;
@@ -693,14 +692,14 @@
if (sock->event) acceptsock->event = (struct event *)grab_object( sock->event );
acceptsock->flags = sock->flags;
acceptsock->deferred = NULL;
+ acceptsock->read_q = NULL;
+ acceptsock->write_q = NULL;
if (!(acceptsock->fd = create_anonymous_fd( &sock_fd_ops, acceptfd, &acceptsock->obj )))
{
release_object( acceptsock );
release_object( sock );
return NULL;
}
- list_init( &acceptsock->read_q );
- list_init( &acceptsock->write_q );
}
clear_error();
sock->pmask &= ~FD_ACCEPT;