Implemented server framework for asynchronous IO on sockets.
diff --git a/server/protocol.def b/server/protocol.def
index e151470..e75373e 100644
--- a/server/protocol.def
+++ b/server/protocol.def
@@ -610,9 +610,10 @@
FD_TYPE_CONSOLE,
FD_TYPE_SMB
};
-#define FD_FLAG_OVERLAPPED 0x01
-#define FD_FLAG_TIMEOUT 0x02
-
+#define FD_FLAG_OVERLAPPED 0x01
+#define FD_FLAG_TIMEOUT 0x02
+#define FD_FLAG_RECV_SHUTDOWN 0x04
+#define FD_FLAG_SEND_SHUTDOWN 0x08
/* Set a file current position */
@REQ(set_file_pointer)
diff --git a/server/sock.c b/server/sock.c
index d9c0be7..06db1ac 100644
--- a/server/sock.c
+++ b/server/sock.c
@@ -86,6 +86,7 @@
static void sock_destroy( struct object *obj );
static int sock_get_error( int err );
static void sock_set_error(void);
+static void sock_queue_async(struct object *obj, void *ptr, unsigned int status, int type, int count);
static const struct object_ops sock_ops =
{
@@ -100,7 +101,7 @@
sock_get_fd, /* get_fd */
no_flush, /* flush */
sock_get_info, /* get_file_info */
- NULL, /* queue_async */
+ sock_queue_async, /* queue_async */
sock_destroy /* destroy */
};
@@ -124,7 +125,7 @@
};
-static void sock_reselect( struct sock *sock )
+static int sock_reselect( struct sock *sock )
{
int ev = sock_get_poll_events( &sock->obj );
@@ -133,21 +134,61 @@
if (sock->obj.select == -1) {
/* previously unconnected socket, is this reselect supposed to connect it? */
- if (!(sock->state & ~FD_WINE_NONBLOCKING)) return;
+ if (!(sock->state & ~FD_WINE_NONBLOCKING)) return 0;
/* ok, it is, attach it to the wineserver's main poll loop */
add_select_user( &sock->obj );
}
/* update condition mask */
set_select_events( &sock->obj, ev );
+ return ev;
+}
+
+/* After POLLHUP is received, the socket will no longer be in the main select loop.
+ This function is used to signal pending events nevertheless */
+static void sock_try_event ( struct sock *sock, int event )
+{
+ struct pollfd pfd;
+
+ pfd.fd = sock->obj.fd;
+ pfd.events = event;
+ pfd.revents = 0;
+ poll (&pfd, 1, 0);
+
+ if ( pfd.revents )
+ {
+ if ( debug_level ) fprintf ( stderr, "sock_try_event: %x\n", pfd.revents );
+ sock_poll_event ( &sock->obj, pfd.revents );
+ }
}
/* wake anybody waiting on the socket event or send the associated message */
-static void sock_wake_up( struct sock *sock )
+static void sock_wake_up( struct sock *sock, int pollev )
{
unsigned int events = sock->pmask & sock->mask;
int i;
+ int async_active = 0;
- if (!events) return;
+ if ( sock->flags & FD_FLAG_OVERLAPPED )
+ {
+ if( pollev & (POLLIN|POLLPRI) && IS_READY( sock->read_q ) )
+ {
+ if (debug_level) fprintf ( stderr, "activating read queue for socket %p\n", sock );
+ async_notify( sock->read_q.head, STATUS_ALERTED );
+ async_active = 1;
+ }
+ if( pollev & POLLOUT && IS_READY( sock->write_q ) )
+ {
+ if (debug_level) fprintf ( stderr, "activating write queue for socket %p\n", sock );
+ async_notify( sock->write_q.head, STATUS_ALERTED );
+ async_active = 1;
+ }
+ }
+
+ /* Do not signal events if there are still pending asynchronous IO requests */
+ /* We need this to delay FD_CLOSE events until all pending overlapped requests are processed */
+ if ( !events || async_active ) return;
+
+ if (events & FD_CLOSE) sock->hmask |= FD_CLOSE;
if (sock->event)
{
@@ -183,6 +224,7 @@
static void sock_poll_event( struct object *obj, int event )
{
struct sock *sock = (struct sock *)obj;
+ int empty_recv = 0;
assert( sock->obj.ops == &sock_ops );
if (debug_level)
@@ -208,6 +250,7 @@
sock->errors[FD_CONNECT_BIT] = sock_error( sock->obj.fd );
if (debug_level)
fprintf(stderr, "socket %d connection failure\n", sock->obj.fd);
+ set_select_events( &sock->obj, -1 );
}
} else
if (sock->state & FD_WINE_LISTENING)
@@ -226,6 +269,7 @@
sock->pmask |= FD_ACCEPT;
sock->errors[FD_ACCEPT_BIT] = sock_error( sock->obj.fd );
sock->hmask |= FD_ACCEPT;
+ set_select_events( &sock->obj, -1 );
}
} else
{
@@ -233,11 +277,12 @@
if (event & POLLIN)
{
char dummy;
+ int nr;
/* Linux 2.4 doesn't report POLLHUP if only one side of the socket
* has been closed, so we need to check for it explicitly here */
- if (!recv( sock->obj.fd, &dummy, 1, MSG_PEEK )) event = POLLHUP;
- else
+ nr = recv( sock->obj.fd, &dummy, 1, MSG_PEEK );
+ if ( nr > 0 )
{
/* incoming data */
sock->pmask |= FD_READ;
@@ -246,6 +291,22 @@
if (debug_level)
fprintf(stderr, "socket %d is readable\n", sock->obj.fd );
}
+ else if ( nr == 0 )
+ empty_recv = 1;
+ else
+ {
+ /* EAGAIN can happen if an async recv() falls between the server's poll()
+ call and the invocation of this routine */
+ if ( errno == EAGAIN )
+ event &= ~POLLIN;
+ else
+ {
+ if ( debug_level )
+ fprintf ( stderr, "recv error on socket %d: %d\n", sock->obj.fd, errno );
+ event = POLLERR;
+ }
+ }
+
}
if (event & POLLOUT)
{
@@ -263,25 +324,25 @@
if (debug_level)
fprintf(stderr, "socket %d got OOB data\n", sock->obj.fd);
}
- if (((event & POLLERR) || ((event & (POLLIN|POLLHUP)) == POLLHUP))
- && (sock->state & (FD_READ|FD_WRITE))) {
- /* socket closing */
+ /* According to WS2 specs, FD_CLOSE is only delivered when there is
+ no more data to be read (i.e. empty_recv = 1) */
+ else if ( empty_recv && (sock->state & (FD_READ|FD_WRITE) ))
+ {
sock->errors[FD_CLOSE_BIT] = sock_error( sock->obj.fd );
- sock->state &= ~(FD_WINE_CONNECTED|FD_READ|FD_WRITE);
+ if ( event & ( POLLERR|POLLHUP ) )
+ sock->state &= ~(FD_WINE_CONNECTED|FD_WRITE);
sock->pmask |= FD_CLOSE;
if (debug_level)
- fprintf(stderr, "socket %d aborted by error %d\n",
- sock->obj.fd, sock->errors[FD_CLOSE_BIT]);
+ fprintf(stderr, "socket %d aborted by error %d, event: %x - removing from select loop\n",
+ sock->obj.fd, sock->errors[FD_CLOSE_BIT], event);
+ set_select_events( &sock->obj, -1 );
}
}
- if (event & (POLLERR|POLLHUP))
- set_select_events( &sock->obj, -1 );
- else
- sock_reselect( sock );
+ sock_reselect( sock );
/* wake up anyone waiting for whatever just happened */
- if (sock->pmask & sock->mask) sock_wake_up( sock );
+ if ( sock->pmask & sock->mask || sock->flags & FD_FLAG_OVERLAPPED ) sock_wake_up( sock, event );
/* if anyone is stupid enough to wait on the socket object itself,
* maybe we should wake them up too, just in case? */
@@ -320,8 +381,14 @@
/* listening, wait for readable */
return (sock->hmask & FD_ACCEPT) ? 0 : POLLIN;
- if (mask & FD_READ) ev |= POLLIN | POLLPRI;
- if (mask & FD_WRITE) ev |= POLLOUT;
+ if (mask & (FD_READ) || (sock->flags & WSA_FLAG_OVERLAPPED && IS_READY (sock->read_q)))
+ ev |= POLLIN | POLLPRI;
+ if (mask & FD_WRITE || (sock->flags & WSA_FLAG_OVERLAPPED && IS_READY (sock->write_q)))
+ ev |= POLLOUT;
+ /* We use POLLIN with 0 bytes recv() as FD_CLOSE indication. */
+ if (sock->mask & ~sock->hmask & FD_CLOSE && !(sock->hmask & FD_READ) )
+ ev |= POLLIN;
+
return ev;
}
@@ -352,9 +419,68 @@
}
*flags = 0;
if (sock->flags & WSA_FLAG_OVERLAPPED) *flags |= FD_FLAG_OVERLAPPED;
+ if ( !(sock->state & FD_READ ) ) *flags |= FD_FLAG_RECV_SHUTDOWN;
+ if ( !(sock->state & FD_WRITE ) ) *flags |= FD_FLAG_SEND_SHUTDOWN;
return FD_TYPE_DEFAULT;
}
+static void sock_queue_async(struct object *obj, void *ptr, unsigned int status, int type, int count)
+{
+ struct sock *sock = (struct sock *)obj;
+ struct async_queue *q;
+ struct async *async;
+ int pollev;
+
+ assert( obj->ops == &sock_ops );
+
+ if ( !(sock->flags & WSA_FLAG_OVERLAPPED) )
+ {
+ set_error ( STATUS_INVALID_HANDLE );
+ return;
+ }
+
+ switch( type )
+ {
+ case ASYNC_TYPE_READ:
+ q = &sock->read_q;
+ break;
+ case ASYNC_TYPE_WRITE:
+ q = &sock->write_q;
+ break;
+ default:
+ set_error( STATUS_INVALID_PARAMETER );
+ return;
+ }
+
+ async = find_async ( q, current, ptr );
+
+ if ( status == STATUS_PENDING )
+ {
+ if ( ( !( sock->state & FD_READ ) && type == ASYNC_TYPE_READ ) ||
+ ( !( sock->state & FD_WRITE ) && type == ASYNC_TYPE_WRITE ) )
+ {
+ set_error ( STATUS_PIPE_DISCONNECTED );
+ if ( async ) destroy_async ( async );
+ }
+ else
+ {
+ if ( !async )
+ async = create_async ( obj, current, ptr );
+ if ( !async )
+ return;
+
+ async->status = STATUS_PENDING;
+ if ( !async->q )
+ async_insert ( q, async );
+ }
+ }
+ else if ( async ) destroy_async ( async );
+ else set_error ( STATUS_INVALID_PARAMETER );
+
+ pollev = sock_reselect ( sock );
+ if ( pollev ) sock_try_event ( sock, pollev );
+}
+
static void sock_destroy( struct object *obj )
{
struct sock *sock = (struct sock *)obj;
@@ -578,6 +704,7 @@
{
struct sock *sock;
struct event *old_event;
+ int pollev;
if (!(sock = (struct sock*)get_handle_obj( current->process, req->handle,
GENERIC_READ|GENERIC_WRITE|SYNCHRONIZE, &sock_ops)))
@@ -591,7 +718,10 @@
if (req->event) sock->event = get_event_obj( current->process, req->event, EVENT_MODIFY_STATE );
if (debug_level && sock->event) fprintf(stderr, "event ptr: %p\n", sock->event);
- sock_reselect( sock );
+
+ pollev = sock_reselect( sock );
+ if ( pollev ) sock_try_event ( sock, pollev );
+
if (sock->mask)
sock->state |= FD_WINE_NONBLOCKING;
@@ -599,7 +729,7 @@
it is possible that FD_CONNECT or FD_ACCEPT network events has happened
before a WSAEventSelect() was done on it.
(when dealing with Asynchronous socket) */
- if (sock->pmask & sock->mask) sock_wake_up( sock );
+ if (sock->pmask & sock->mask) sock_wake_up( sock, pollev );
if (old_event) release_object( old_event ); /* we're through with it */
release_object( &sock->obj );
@@ -646,6 +776,7 @@
DECL_HANDLER(enable_socket_event)
{
struct sock *sock;
+ int pollev;
if (!(sock = (struct sock*)get_handle_obj( current->process, req->handle,
GENERIC_READ|GENERIC_WRITE|SYNCHRONIZE, &sock_ops)))
@@ -655,7 +786,10 @@
sock->hmask &= ~req->mask;
sock->state |= req->sstate;
sock->state &= ~req->cstate;
- sock_reselect( sock );
+
+ pollev = sock_reselect( sock );
+ if ( pollev ) sock_try_event ( sock, pollev );
+
release_object( &sock->obj );
}