server: Add support for restarting an async I/O when the client side couldn't finish it right away.
diff --git a/server/async.c b/server/async.c
index e0877b2..8b3049c 100644
--- a/server/async.c
+++ b/server/async.c
@@ -38,6 +38,7 @@
struct thread *thread; /* owning thread */
struct list queue_entry; /* entry in async queue list */
struct async_queue *queue; /* queue containing this async */
+ unsigned int status; /* current status */
struct timeout_user *timeout;
unsigned int timeout_status; /* status to report upon timeout */
struct event *event;
@@ -92,6 +93,11 @@
};
+static inline void async_reselect( struct async *async )
+{
+ if (async->queue->fd) fd_reselect_async( async->queue->fd, async->queue );
+}
+
static void async_dump( struct object *obj, int verbose )
{
struct async *async = (struct async *)obj;
@@ -104,10 +110,12 @@
struct async *async = (struct async *)obj;
assert( obj->ops == &async_ops );
+ list_remove( &async->queue_entry );
+ async_reselect( async );
+
if (async->timeout) remove_timeout_user( async->timeout );
if (async->event) release_object( async->event );
release_object( async->queue );
- async->queue = NULL;
release_object( async->thread );
}
@@ -119,11 +127,19 @@
}
/* notifies client thread of new status of its async request */
-/* destroys the server side of it */
static void async_terminate( struct async *async, unsigned int status )
{
apc_call_t data;
+ assert( status != STATUS_PENDING );
+
+ if (async->status != STATUS_PENDING)
+ {
+ /* already terminated, just update status */
+ async->status = status;
+ return;
+ }
+
memset( &data, 0, sizeof(data) );
data.type = APC_ASYNC_IO;
data.async_io.func = async->data.callback;
@@ -131,11 +147,9 @@
data.async_io.sb = async->data.iosb;
data.async_io.status = status;
thread_queue_apc( async->thread, &async->obj, &data );
-
- if (async->timeout) remove_timeout_user( async->timeout );
- async->timeout = NULL;
- list_remove( &async->queue_entry );
- release_object( async );
+ async->status = status;
+ async_reselect( async );
+ release_object( async ); /* so that it gets destroyed when the async is done */
}
/* callback for timeout on an async request */
@@ -184,11 +198,12 @@
return NULL;
}
- async->thread = (struct thread *)grab_object( thread );
- async->event = event;
- async->data = *data;
+ async->thread = (struct thread *)grab_object( thread );
+ async->event = event;
+ async->status = STATUS_PENDING;
+ async->data = *data;
async->timeout = NULL;
- async->queue = (struct async_queue *)grab_object( queue );
+ async->queue = (struct async_queue *)grab_object( queue );
list_add_tail( &queue->queue, &async->queue_entry );
grab_object( async );
@@ -214,12 +229,24 @@
if (obj->ops != &async_ops) return; /* in case the client messed up the APC results */
- if (status == STATUS_PENDING)
+ assert( async->status != STATUS_PENDING ); /* it must have been woken up if we get a result */
+
+ if (status == STATUS_PENDING) /* restart it */
{
- /* FIXME: restart the async operation */
+ status = async->status;
+ async->status = STATUS_PENDING;
+ grab_object( async );
+
+ if (status != STATUS_ALERTED) /* it was terminated in the meantime */
+ async_terminate( async, status );
+ else
+ async_reselect( async );
}
else
{
+ if (async->timeout) remove_timeout_user( async->timeout );
+ async->timeout = NULL;
+ async->status = status;
if (async->data.apc)
{
apc_call_t data;
@@ -238,7 +265,13 @@
/* check if an async operation is waiting to be alerted */
int async_waiting( struct async_queue *queue )
{
- return queue && !list_empty( &queue->queue );
+ struct list *ptr;
+ struct async *async;
+
+ if (!queue) return 0;
+ if (!(ptr = list_head( &queue->queue ))) return 0;
+ async = LIST_ENTRY( ptr, struct async, queue_entry );
+ return async->status == STATUS_PENDING;
}
/* wake up async operations on the queue */