- rewrite of the named pipe code
- allow NtFileFlushBuffers to wait
- allow DisconnectNamedPipe to invalidate client cached fd
- fix the pipe test now that one extra test passes

diff --git a/server/named_pipe.c b/server/named_pipe.c
index 780fcc6..7f51774 100644
--- a/server/named_pipe.c
+++ b/server/named_pipe.c
@@ -19,7 +19,7 @@
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  *
  * TODO:
- *   improve error handling
+ *   message mode
  */
 
 #include "config.h"
@@ -50,26 +50,47 @@
     ps_none,
     ps_idle_server,
     ps_wait_open,
-    ps_wait_connect,
     ps_connected_server,
-    ps_connected_client,
-    ps_disconnected
+    ps_wait_disconnect,
+    ps_disconnected_server,
+    ps_wait_connect
+};
+
+struct wait_info
+{
+    struct thread       *thread;
+    void                *func;
+    void                *overlapped;
 };
 
 struct named_pipe;
 
-struct pipe_user
+struct pipe_server
 {
-    struct object       obj;
-    struct fd          *fd;
-    enum pipe_state     state;
-    struct pipe_user   *other;
-    struct named_pipe  *pipe;
-    struct pipe_user   *next;
-    struct pipe_user   *prev;
-    struct thread      *thread;
-    void               *func;
-    void               *overlapped;
+    struct object        obj;
+    struct fd           *fd;
+    enum pipe_state      state;
+    struct pipe_client  *client;
+    struct named_pipe   *pipe;
+    struct pipe_server  *next;
+    struct pipe_server  *prev;
+    struct timeout_user *flush_poll;
+    struct event        *event;
+    struct wait_info     wait;
+};
+
+struct pipe_client
+{
+    struct object        obj;
+    struct fd           *fd;
+    struct pipe_server  *server;
+    struct wait_info     wait;
+};
+
+struct connect_wait
+{
+    struct wait_info     wait;
+    struct connect_wait *next;
 };
 
 struct named_pipe
@@ -80,11 +101,13 @@
     unsigned int        outsize;
     unsigned int        insize;
     unsigned int        timeout;
-    struct pipe_user   *users;
+    unsigned int        instances;
+    struct pipe_server *servers;
+    struct connect_wait *connect_waiters;
 };
 
 static void named_pipe_dump( struct object *obj, int verbose );
-static void named_pipe_destroy( struct object *obj);
+static void named_pipe_destroy( struct object *obj );
 
 static const struct object_ops named_pipe_ops =
 {
@@ -98,120 +121,356 @@
     named_pipe_destroy            /* destroy */
 };
 
-static void pipe_user_dump( struct object *obj, int verbose );
-static struct fd *pipe_user_get_fd( struct object *obj );
-static void pipe_user_destroy( struct object *obj);
+/* common to clients and servers */
+static int pipe_end_get_poll_events( struct fd *fd );
+static int pipe_end_get_info( struct fd *fd, 
+                  struct get_file_info_reply *reply, int *flags );
 
-static int pipe_user_get_poll_events( struct fd *fd );
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
+/* server end functions */
+static void pipe_server_dump( struct object *obj, int verbose );
+static struct fd *pipe_server_get_fd( struct object *obj );
+static void pipe_server_destroy( struct object *obj);
+static int pipe_server_flush( struct fd *fd, struct event **event );
 
-static const struct object_ops pipe_user_ops =
+static const struct object_ops pipe_server_ops =
 {
-    sizeof(struct pipe_user),     /* size */
-    pipe_user_dump,               /* dump */
+    sizeof(struct pipe_server),   /* size */
+    pipe_server_dump,             /* dump */
     default_fd_add_queue,         /* add_queue */
     default_fd_remove_queue,      /* remove_queue */
     default_fd_signaled,          /* signaled */
     no_satisfied,                 /* satisfied */
-    pipe_user_get_fd,             /* get_fd */
-    pipe_user_destroy             /* destroy */
+    pipe_server_get_fd,           /* get_fd */
+    pipe_server_destroy           /* destroy */
 };
 
-static const struct fd_ops pipe_user_fd_ops =
+static const struct fd_ops pipe_server_fd_ops =
 {
-    pipe_user_get_poll_events,    /* get_poll_events */
+    pipe_end_get_poll_events,     /* get_poll_events */
     default_poll_event,           /* poll_event */
-    no_flush,                     /* flush */
-    pipe_user_get_info,           /* get_file_info */
+    pipe_server_flush,            /* flush */
+    pipe_end_get_info,            /* get_file_info */
+    no_queue_async                /* queue_async */
+};
+
+/* client end functions */
+static void pipe_client_dump( struct object *obj, int verbose );
+static struct fd *pipe_client_get_fd( struct object *obj );
+static void pipe_client_destroy( struct object *obj );
+static int pipe_client_flush( struct fd *fd, struct event **event );
+
+static const struct object_ops pipe_client_ops =
+{
+    sizeof(struct pipe_client),   /* size */
+    pipe_client_dump,             /* dump */
+    default_fd_add_queue,         /* add_queue */
+    default_fd_remove_queue,      /* remove_queue */
+    default_fd_signaled,          /* signaled */
+    no_satisfied,                 /* satisfied */
+    pipe_client_get_fd,           /* get_fd */
+    pipe_client_destroy           /* destroy */
+};
+
+static const struct fd_ops pipe_client_fd_ops =
+{
+    pipe_end_get_poll_events,     /* get_poll_events */
+    default_poll_event,           /* poll_event */
+    pipe_client_flush,            /* flush */
+    pipe_end_get_info,            /* get_file_info */
     no_queue_async                /* queue_async */
 };
 
 static void named_pipe_dump( struct object *obj, int verbose )
 {
-    struct named_pipe *pipe = (struct named_pipe *)obj;
+    struct named_pipe *pipe = (struct named_pipe *) obj;
     assert( obj->ops == &named_pipe_ops );
     fprintf( stderr, "named pipe %p\n" ,pipe);
 }
 
-static void pipe_user_dump( struct object *obj, int verbose )
+static void pipe_server_dump( struct object *obj, int verbose )
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
-    assert( obj->ops == &pipe_user_ops );
-    fprintf( stderr, "named pipe user %p (state %d)\n", user, user->state );
+    struct pipe_server *server = (struct pipe_server *) obj;
+    assert( obj->ops == &pipe_server_ops );
+    fprintf( stderr, "named pipe server %p (state %d)\n",
+             server, server->state );
+}
+
+static void pipe_client_dump( struct object *obj, int verbose )
+{
+    struct pipe_client *client = (struct pipe_client *) obj;
+    assert( obj->ops == &pipe_server_ops );
+    fprintf( stderr, "named pipe client %p (server state %d)\n",
+             client, client->server->state );
 }
 
 static void named_pipe_destroy( struct object *obj)
 {
-    struct named_pipe *pipe = (struct named_pipe *)obj;
-    assert( !pipe->users );
+    struct named_pipe *pipe = (struct named_pipe *) obj;
+    assert( !pipe->servers );
+    assert( !pipe->instances );
 }
 
-static void notify_waiter( struct pipe_user *user, unsigned int status)
+static void notify_waiter( struct wait_info *wait, unsigned int status )
 {
-    if(user->thread && user->func && user->overlapped)
+    if( wait->thread && wait->func && wait->overlapped )
     {
         /* queue a system APC, to notify a waiting thread */
-        thread_queue_apc(user->thread, NULL, user->func, APC_ASYNC, 1,
-                         user->overlapped, (void *)status, NULL);
+        thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC,
+                          1, wait->overlapped, (void *)status, NULL );
     }
-    if (user->thread) release_object(user->thread);
-    user->thread = NULL;
-    user->func = NULL;
-    user->overlapped=NULL;
+    if( wait->thread ) release_object( wait->thread );
+    wait->thread = NULL;
 }
 
-static struct fd *pipe_user_get_fd( struct object *obj )
+static void set_waiter( struct wait_info *wait, void *func, void *ov )
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
-    if (user->fd) return (struct fd *)grab_object( user->fd );
+    wait->thread = (struct thread *) grab_object( current );
+    wait->func = func;
+    wait->overlapped = ov;
+}
+
+static void notify_connect_waiters( struct named_pipe *pipe )
+{
+    struct connect_wait *cw, **x = &pipe->connect_waiters;
+
+    while( *x )
+    {
+        cw = *x;
+        notify_waiter( &cw->wait, STATUS_SUCCESS );
+        release_object( pipe );
+        *x = cw->next;
+        free( cw );
+    }
+}
+
+static void queue_connect_waiter( struct named_pipe *pipe,
+                                  void *func, void *overlapped )
+{
+    struct connect_wait *waiter;
+
+    waiter = mem_alloc( sizeof *waiter );
+    if( waiter )
+    {
+        set_waiter( &waiter->wait, func, overlapped );
+        waiter->next = pipe->connect_waiters;
+        pipe->connect_waiters = waiter;
+        grab_object( pipe );
+    }
+}
+
+static struct fd *pipe_client_get_fd( struct object *obj )
+{
+    struct pipe_client *client = (struct pipe_client *) obj;
+    if( client->fd )
+        return (struct fd *) grab_object( client->fd );
     set_error( STATUS_PIPE_DISCONNECTED );
     return NULL;
 }
 
-static void pipe_user_destroy( struct object *obj)
+static struct fd *pipe_server_get_fd( struct object *obj )
 {
-    struct pipe_user *user = (struct pipe_user *)obj;
+    struct pipe_server *server = (struct pipe_server *) obj;
 
-    assert( obj->ops == &pipe_user_ops );
-
-    if(user->overlapped)
-        notify_waiter(user,STATUS_HANDLES_CLOSED);
-
-    if(user->other)
+    switch(server->state)
     {
-        release_object( user->other->fd );
-        user->other->fd = NULL;
-        switch(user->other->state)
-        {
-        case ps_connected_server:
-            user->other->state = ps_idle_server;
-            break;
-        case ps_connected_client:
-            user->other->state = ps_disconnected;
-            break;
-        default:
-            fprintf(stderr,"connected pipe has strange state %d!\n",
-                            user->other->state);
-        }
-        user->other->other=NULL;
-        user->other = NULL;
-    }
+    case ps_connected_server:
+    case ps_wait_disconnect:
+        assert( server->fd );
+        return (struct fd *) grab_object( server->fd );
 
-    /* remove user from pipe's user list */
-    if (user->next) user->next->prev = user->prev;
-    if (user->prev) user->prev->next = user->next;
-    else user->pipe->users = user->next;
-    if (user->thread) release_object(user->thread);
-    release_object(user->pipe);
-    if (user->fd) release_object( user->fd );
+    case ps_wait_open:
+    case ps_idle_server:
+        set_error( STATUS_PIPE_LISTENING );
+        break;
+
+    case ps_disconnected_server:
+    case ps_wait_connect:
+        set_error( STATUS_PIPE_DISCONNECTED );
+        break;
+
+    default:
+        assert( 0 );
+    }
+    return NULL;
 }
 
-static int pipe_user_get_poll_events( struct fd *fd )
+
+static void notify_empty( struct pipe_server *server )
+{
+    if( !server->flush_poll )
+        return;
+    assert( server->state == ps_connected_server );
+    assert( server->event );
+    remove_timeout_user( server->flush_poll );
+    server->flush_poll = NULL;
+    set_event( server->event );
+    release_object( server->event );
+    server->event = NULL;
+}
+
+static void do_disconnect( struct pipe_server *server )
+{
+    /* we may only have a server fd, if the client disconnected */
+    if( server->client )
+    {
+        assert( server->client->server == server );
+        assert( server->client->fd );
+        release_object( server->client->fd );
+        server->client->fd = NULL;
+    }
+    assert( server->fd );
+    release_object( server->fd );
+    server->fd = NULL;
+}
+
+static void pipe_server_destroy( struct object *obj)
+{
+    struct pipe_server *server = (struct pipe_server *)obj;
+
+    assert( obj->ops == &pipe_server_ops );
+
+    if( server->fd )
+    {
+        notify_empty( server );
+        do_disconnect( server );
+    }
+
+    if( server->client )
+    {
+        server->client->server = NULL;
+        server->client = NULL;
+    }
+
+    notify_waiter( &server->wait, STATUS_HANDLES_CLOSED );
+
+    assert( server->pipe->instances );
+    server->pipe->instances--;
+
+    /* remove server from pipe's server list */
+    if( server->next ) server->next->prev = server->prev;
+    if( server->prev ) server->prev->next = server->next;
+    else server->pipe->servers = server->next;
+    release_object( server->pipe );
+}
+
+static void pipe_client_destroy( struct object *obj)
+{
+    struct pipe_client *client = (struct pipe_client *)obj;
+    struct pipe_server *server = client->server;
+
+    assert( obj->ops == &pipe_client_ops );
+
+    notify_waiter( &client->wait, STATUS_HANDLES_CLOSED );
+
+    if( server )
+    {
+        notify_empty( server );
+
+        switch( server->state )
+        {
+        case ps_connected_server:
+            /* Don't destroy the server's fd here as we can't
+               do a successful flush without it. */
+            server->state = ps_wait_disconnect;
+            release_object( client->fd );
+            client->fd = NULL;
+            break;
+        case ps_disconnected_server:
+            server->state = ps_wait_connect;
+            break;
+        default:
+            assert( 0 );
+        }
+        assert( server->client );
+        server->client = NULL;
+        client->server = NULL;
+    }
+    assert( !client->fd );
+}
+
+static int pipe_end_get_poll_events( struct fd *fd )
 {
     return POLLIN | POLLOUT;  /* FIXME */
 }
 
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags )
+static int pipe_data_remaining( struct pipe_server *server )
+{
+    struct pollfd pfd;
+    int fd;
+
+    assert( server->client );
+
+    fd = get_unix_fd( server->client->fd );
+    if( fd < 0 )
+        return 0;
+    pfd.fd = fd;
+    pfd.events = POLLIN;
+    pfd.revents = 0;
+
+    if( 0 > poll( &pfd, 1, 0 ) )
+        return 0;
+ 
+    return pfd.revents&POLLIN;
+}
+
+static void check_flushed( void *arg )
+{
+    struct pipe_server *server = (struct pipe_server*) arg;
+
+    assert( server->event );
+    if( pipe_data_remaining( server ) )
+    {
+        struct timeval tv;
+
+        gettimeofday( &tv, 0 );
+        add_timeout( &tv, 100 );
+        server->flush_poll = add_timeout_user( &tv, check_flushed, server );
+    }
+    else
+        notify_empty( server );
+}
+
+static int pipe_server_flush( struct fd *fd, struct event **event )
+{
+    struct pipe_server *server = get_fd_user( fd );
+
+    if( !server )
+        return 0;
+
+    if( server->state != ps_connected_server )
+        return 0;
+
+    /* FIXME: if multiple threads flush the same pipe,
+              maybe should create a list of processes to notify */
+    if( server->flush_poll )
+        return 0;
+
+    if( pipe_data_remaining( server ) )
+    {
+        struct timeval tv;
+
+        /* this kind of sux - 
+           there's no unix way to be alerted when a pipe becomes empty */
+        server->event = create_event( NULL, 0, 0, 0 );
+        if( !server->event )
+            return 0;
+        gettimeofday( &tv, 0 );
+        add_timeout( &tv, 100 );
+        server->flush_poll = add_timeout_user( &tv, check_flushed, server );
+        *event = server->event;
+    }
+
+    return 0; 
+}
+
+static int pipe_client_flush( struct fd *fd, struct event **event )
+{
+    /* FIXME: what do we have to do for this? */
+    return 0;
+}
+
+static int pipe_end_get_info( struct fd *fd, 
+                        struct get_file_info_reply *reply, int *flags )
 {
     if (reply)
     {
@@ -234,12 +493,15 @@
 {
     struct named_pipe *pipe;
 
-    if ((pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len )))
+    pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len );
+    if( pipe )
     {
-        if (get_error() != STATUS_OBJECT_NAME_COLLISION)
+        if( get_error() != STATUS_OBJECT_NAME_COLLISION )
         {
             /* initialize it if it didn't already exist */
-            pipe->users = 0;
+            pipe->servers = 0;
+            pipe->instances = 0;
+            pipe->connect_waiters = NULL;
         }
     }
     return pipe;
@@ -260,65 +522,80 @@
     return NULL;
 }
 
-static struct pipe_user *get_pipe_user_obj( struct process *process, obj_handle_t handle,
-                                            unsigned int access )
+static struct pipe_server *get_pipe_server_obj( struct process *process,
+                                obj_handle_t handle, unsigned int access )
 {
-    return (struct pipe_user *)get_handle_obj( process, handle, access, &pipe_user_ops );
+    struct object *obj;
+    obj = get_handle_obj( process, handle, access, &pipe_server_ops );
+    return (struct pipe_server *) obj;
 }
 
-static struct pipe_user *create_pipe_user( struct named_pipe *pipe )
+static struct pipe_server *create_pipe_server( struct named_pipe *pipe )
 {
-    struct pipe_user *user;
+    struct pipe_server *server;
 
-    user = alloc_object( &pipe_user_ops );
-    if(!user)
+    server = alloc_object( &pipe_server_ops );
+    if( !server )
         return NULL;
 
-    user->fd = NULL;
-    user->pipe = pipe;
-    user->state = ps_none;
-    user->other = NULL;
-    user->thread = NULL;
-    user->func = NULL;
-    user->overlapped = NULL;
+    server->fd = NULL;
+    server->pipe = pipe;
+    server->state = ps_none;
+    server->client = NULL;
+    server->flush_poll = NULL;
+    server->wait.thread = NULL;
 
-    /* add to list of pipe users */
-    if ((user->next = pipe->users)) user->next->prev = user;
-    user->prev = NULL;
-    pipe->users = user;
+    /* add to list of pipe servers */
+    if ((server->next = pipe->servers)) server->next->prev = server;
+    server->prev = NULL;
+    pipe->servers = server;
 
-    grab_object(pipe);
+    grab_object( pipe );
 
-    return user;
+    return server;
 }
 
-static struct pipe_user *find_partner(struct named_pipe *pipe, enum pipe_state state)
+static struct pipe_client *create_pipe_client( struct pipe_server *server )
 {
-    struct pipe_user *x;
+    struct pipe_client *client;
 
-    for(x = pipe->users; x; x=x->next)
-    {
-        if(x->state==state)
-        break;
-    }
-
-    if(!x)
+    client = alloc_object( &pipe_client_ops );
+    if( !client )
         return NULL;
 
-    return (struct pipe_user *)grab_object( x );
+    client->fd = NULL;
+    client->server = server;
+    client->wait.thread = NULL;
+
+    return client;
+}
+
+static struct pipe_server *find_server( struct named_pipe *pipe,
+                                        enum pipe_state state )
+{
+    struct pipe_server *x;
+
+    for( x = pipe->servers; x; x = x->next )
+        if( x->state == state )
+            break;
+
+    if( !x )
+        return NULL;
+
+    return (struct pipe_server *) grab_object( x );
 }
 
 DECL_HANDLER(create_named_pipe)
 {
     struct named_pipe *pipe;
-    struct pipe_user *user;
+    struct pipe_server *server;
 
     reply->handle = 0;
     pipe = create_named_pipe( get_req_data(), get_req_data_size() );
-    if(!pipe)
+    if( !pipe )
         return;
 
-    if (get_error() != STATUS_OBJECT_NAME_COLLISION)
+    if( get_error() != STATUS_OBJECT_NAME_COLLISION )
     {
         pipe->insize = req->insize;
         pipe->outsize = req->outsize;
@@ -326,14 +603,33 @@
         pipe->timeout = req->timeout;
         pipe->pipemode = req->pipemode;
     }
-
-    user = create_pipe_user( pipe );
-
-    if(user)
+    else
     {
-        user->state = ps_idle_server;
-        reply->handle = alloc_handle( current->process, user, GENERIC_READ|GENERIC_WRITE, 0 );
-        release_object( user );
+        set_error( 0 );  /* clear the name collision */
+        if( pipe->maxinstances <= pipe->instances )
+        {
+            set_error( STATUS_PIPE_BUSY );
+            release_object( pipe );
+            return;
+        }
+        if( ( pipe->maxinstances != req->maxinstances ) ||
+            ( pipe->timeout != req->timeout ) ||
+            ( pipe->pipemode != req->pipemode ) )
+        {
+            set_error( STATUS_ACCESS_DENIED );
+            release_object( pipe );
+            return;
+        }
+    }
+
+    server = create_pipe_server( pipe );
+    if(server)
+    {
+        server->state = ps_idle_server;
+        reply->handle = alloc_handle( current->process, server,
+                                      GENERIC_READ|GENERIC_WRITE, 0 );
+        server->pipe->instances++;
+        release_object( server );
     }
 
     release_object( pipe );
@@ -341,147 +637,173 @@
 
 DECL_HANDLER(open_named_pipe)
 {
-    struct pipe_user *user, *partner;
+    struct pipe_server *server;
+    struct pipe_client *client;
     struct named_pipe *pipe;
+    int fds[2];
 
     reply->handle = 0;
 
-    if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
+    pipe = open_named_pipe( get_req_data(), get_req_data_size() );
+    if ( !pipe )
     {
         set_error( STATUS_NO_SUCH_FILE );
         return;
     }
-    if (!(partner = find_partner(pipe, ps_wait_open)))
+
+    for( server = pipe->servers; server; server = server->next )
+        if( ( server->state==ps_idle_server ) ||
+            ( server->state==ps_wait_open ) )
+            break;
+    release_object( pipe );
+
+    if ( !server )
     {
-        release_object(pipe);
         set_error( STATUS_PIPE_NOT_AVAILABLE );
         return;
     }
-    if ((user = create_pipe_user( pipe )))
-    {
-        int fds[2];
 
-        if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds))
+    client = create_pipe_client( server );
+    if( client )
+    {
+        if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) )
         {
-            user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj );
-            partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj );
-            if (user->fd && partner->fd)
+            assert( !client->fd );
+            assert( !server->fd );
+            client->fd = create_anonymous_fd( &pipe_server_fd_ops,
+                                            fds[1], &client->obj );
+            server->fd = create_anonymous_fd( &pipe_server_fd_ops,
+                                            fds[0], &server->obj );
+            if (client->fd && server->fd)
             {
-                notify_waiter(partner,STATUS_SUCCESS);
-                partner->state = ps_connected_server;
-                partner->other = user;
-                user->state = ps_connected_client;
-                user->other = partner;
-                reply->handle = alloc_handle( current->process, user, req->access, 0 );
+                if( server->state == ps_wait_open )
+                    notify_waiter( &server->wait, STATUS_SUCCESS );
+                assert( !server->wait.thread );
+                server->state = ps_connected_server;
+                server->client = client;
+                client->server = server;
+                reply->handle = alloc_handle( current->process, client,
+                                              req->access, 0 );
             }
         }
-        else file_set_error();
+        else
+            file_set_error();
 
-        release_object( user );
+        release_object( client );
     }
-    release_object( partner );
-    release_object( pipe );
 }
 
 DECL_HANDLER(connect_named_pipe)
 {
-    struct pipe_user *user, *partner;
+    struct pipe_server *server;
 
-    user = get_pipe_user_obj(current->process, req->handle, 0);
-    if(!user)
+    server = get_pipe_server_obj(current->process, req->handle, 0);
+    if(!server)
         return;
 
-    if( user->state != ps_idle_server )
+    switch( server->state )
     {
-        set_error(STATUS_PORT_ALREADY_SET);
-    }
-    else
-    {
-        user->state = ps_wait_open;
-        user->thread = (struct thread *)grab_object(current);
-        user->func = req->func;
-        user->overlapped = req->overlapped;
-
-        /* notify all waiters that a pipe just became available */
-        while( (partner = find_partner(user->pipe,ps_wait_connect)) )
-        {
-            notify_waiter(partner,STATUS_SUCCESS);
-            release_object(partner);
-        }
+    case ps_idle_server:
+    case ps_wait_connect:
+        assert( !server->fd );
+        server->state = ps_wait_open;
+        set_waiter( &server->wait, req->func, req->overlapped );
+        notify_connect_waiters( server->pipe );
+        break;
+    case ps_connected_server:
+        assert( server->fd );
+        set_error( STATUS_PIPE_CONNECTED );
+        break;
+    case ps_disconnected_server:
+        set_error( STATUS_PIPE_BUSY );
+        break;
+    case ps_wait_disconnect:
+        set_error( STATUS_NO_DATA_DETECTED );
+        break;
+    default:
+        set_error( STATUS_INVALID_HANDLE );
+        break;
     }
 
-    release_object(user);
+    release_object(server);
 }
 
 DECL_HANDLER(wait_named_pipe)
 {
     struct named_pipe *pipe;
-    struct pipe_user *partner;
+    struct pipe_server *server;
 
     if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
     {
         set_error( STATUS_PIPE_NOT_AVAILABLE );
         return;
     }
-    if( (partner = find_partner(pipe,ps_wait_open)) )
+    server = find_server( pipe, ps_wait_open );
+    if( server )
     {
-        /* this should use notify_waiter,
-           but no pipe_user object exists now... */
-        thread_queue_apc(current,NULL,req->func,
-                         APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL);
-        release_object(partner);
+        /* there's already a server waiting for a client to connect */
+        struct wait_info wait;
+        set_waiter( &wait, req->func, req->overlapped );
+        notify_waiter( &wait, STATUS_SUCCESS );
+        release_object( server );
     }
     else
-    {
-        struct pipe_user *user;
+        queue_connect_waiter( pipe, req->func, req->overlapped );
 
-        if( (user = create_pipe_user( pipe )) )
-        {
-            user->state = ps_wait_connect;
-            user->thread = (struct thread *)grab_object(current);
-            user->func = req->func;
-            user->overlapped = req->overlapped;
-            /* don't release it */
-        }
-    }
-    release_object(pipe);
+    release_object( pipe );
 }
 
 DECL_HANDLER(disconnect_named_pipe)
 {
-    struct pipe_user *user;
+    struct pipe_server *server;
 
-    user = get_pipe_user_obj(current->process, req->handle, 0);
-    if(!user)
+    reply->fd = -1;
+    server = get_pipe_server_obj( current->process, req->handle, 0 );
+    if( !server )
         return;
-    if( (user->state == ps_connected_server) &&
-        (user->other->state == ps_connected_client) )
+    switch( server->state )
     {
-        release_object( user->other->fd );
-        user->other->fd = NULL;
-        user->other->state = ps_disconnected;
-        user->other->other = NULL;
+    case ps_connected_server:
+        assert( server->fd );
+        assert( server->client );
+        assert( server->client->fd );
 
-        release_object( user->fd );
-        user->fd = NULL;
-        user->state = ps_idle_server;
-        user->other = NULL;
+        notify_empty( server );
+        notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED );
+
+        /* Dump the client and server fds, but keep the pointers
+           around - client loses all waiting data */
+        server->state = ps_disconnected_server;
+        do_disconnect( server );
+        reply->fd = flush_cached_fd( current->process, req->handle );
+        break;
+
+    case ps_wait_disconnect:
+        assert( !server->client );
+        assert( server->fd );
+        do_disconnect( server );
+        server->state = ps_wait_connect;
+        reply->fd = flush_cached_fd( current->process, req->handle );
+        break;
+
+    default:
+        set_error( STATUS_PIPE_DISCONNECTED );
     }
-    release_object(user);
+    release_object( server );
 }
 
 DECL_HANDLER(get_named_pipe_info)
 {
-    struct pipe_user *user;
+    struct pipe_server *server;
 
-    user = get_pipe_user_obj(current->process, req->handle, 0);
-    if(!user)
+    server = get_pipe_server_obj( current->process, req->handle, 0 );
+    if(!server)
         return;
 
-    reply->flags        = user->pipe->pipemode;
-    reply->maxinstances = user->pipe->maxinstances;
-    reply->insize       = user->pipe->insize;
-    reply->outsize      = user->pipe->outsize;
+    reply->flags        = server->pipe->pipemode;
+    reply->maxinstances = server->pipe->maxinstances;
+    reply->insize       = server->pipe->insize;
+    reply->outsize      = server->pipe->outsize;
 
-    release_object(user);
+    release_object(server);
 }