- rewrite of the named pipe code
- allow NtFileFlushBuffers to wait
- allow DisconnectNamedPipe to invalidate client cached fd
- fix the pipe test now that one extra test passes
diff --git a/server/named_pipe.c b/server/named_pipe.c
index 780fcc6..7f51774 100644
--- a/server/named_pipe.c
+++ b/server/named_pipe.c
@@ -19,7 +19,7 @@
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
* TODO:
- * improve error handling
+ * message mode
*/
#include "config.h"
@@ -50,26 +50,47 @@
ps_none,
ps_idle_server,
ps_wait_open,
- ps_wait_connect,
ps_connected_server,
- ps_connected_client,
- ps_disconnected
+ ps_wait_disconnect,
+ ps_disconnected_server,
+ ps_wait_connect
+};
+
+struct wait_info
+{
+ struct thread *thread;
+ void *func;
+ void *overlapped;
};
struct named_pipe;
-struct pipe_user
+struct pipe_server
{
- struct object obj;
- struct fd *fd;
- enum pipe_state state;
- struct pipe_user *other;
- struct named_pipe *pipe;
- struct pipe_user *next;
- struct pipe_user *prev;
- struct thread *thread;
- void *func;
- void *overlapped;
+ struct object obj;
+ struct fd *fd;
+ enum pipe_state state;
+ struct pipe_client *client;
+ struct named_pipe *pipe;
+ struct pipe_server *next;
+ struct pipe_server *prev;
+ struct timeout_user *flush_poll;
+ struct event *event;
+ struct wait_info wait;
+};
+
+struct pipe_client
+{
+ struct object obj;
+ struct fd *fd;
+ struct pipe_server *server;
+ struct wait_info wait;
+};
+
+struct connect_wait
+{
+ struct wait_info wait;
+ struct connect_wait *next;
};
struct named_pipe
@@ -80,11 +101,13 @@
unsigned int outsize;
unsigned int insize;
unsigned int timeout;
- struct pipe_user *users;
+ unsigned int instances;
+ struct pipe_server *servers;
+ struct connect_wait *connect_waiters;
};
static void named_pipe_dump( struct object *obj, int verbose );
-static void named_pipe_destroy( struct object *obj);
+static void named_pipe_destroy( struct object *obj );
static const struct object_ops named_pipe_ops =
{
@@ -98,120 +121,356 @@
named_pipe_destroy /* destroy */
};
-static void pipe_user_dump( struct object *obj, int verbose );
-static struct fd *pipe_user_get_fd( struct object *obj );
-static void pipe_user_destroy( struct object *obj);
+/* common to clients and servers */
+static int pipe_end_get_poll_events( struct fd *fd );
+static int pipe_end_get_info( struct fd *fd,
+ struct get_file_info_reply *reply, int *flags );
-static int pipe_user_get_poll_events( struct fd *fd );
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags );
+/* server end functions */
+static void pipe_server_dump( struct object *obj, int verbose );
+static struct fd *pipe_server_get_fd( struct object *obj );
+static void pipe_server_destroy( struct object *obj);
+static int pipe_server_flush( struct fd *fd, struct event **event );
-static const struct object_ops pipe_user_ops =
+static const struct object_ops pipe_server_ops =
{
- sizeof(struct pipe_user), /* size */
- pipe_user_dump, /* dump */
+ sizeof(struct pipe_server), /* size */
+ pipe_server_dump, /* dump */
default_fd_add_queue, /* add_queue */
default_fd_remove_queue, /* remove_queue */
default_fd_signaled, /* signaled */
no_satisfied, /* satisfied */
- pipe_user_get_fd, /* get_fd */
- pipe_user_destroy /* destroy */
+ pipe_server_get_fd, /* get_fd */
+ pipe_server_destroy /* destroy */
};
-static const struct fd_ops pipe_user_fd_ops =
+static const struct fd_ops pipe_server_fd_ops =
{
- pipe_user_get_poll_events, /* get_poll_events */
+ pipe_end_get_poll_events, /* get_poll_events */
default_poll_event, /* poll_event */
- no_flush, /* flush */
- pipe_user_get_info, /* get_file_info */
+ pipe_server_flush, /* flush */
+ pipe_end_get_info, /* get_file_info */
+ no_queue_async /* queue_async */
+};
+
+/* client end functions */
+static void pipe_client_dump( struct object *obj, int verbose );
+static struct fd *pipe_client_get_fd( struct object *obj );
+static void pipe_client_destroy( struct object *obj );
+static int pipe_client_flush( struct fd *fd, struct event **event );
+
+static const struct object_ops pipe_client_ops =
+{
+ sizeof(struct pipe_client), /* size */
+ pipe_client_dump, /* dump */
+ default_fd_add_queue, /* add_queue */
+ default_fd_remove_queue, /* remove_queue */
+ default_fd_signaled, /* signaled */
+ no_satisfied, /* satisfied */
+ pipe_client_get_fd, /* get_fd */
+ pipe_client_destroy /* destroy */
+};
+
+static const struct fd_ops pipe_client_fd_ops =
+{
+ pipe_end_get_poll_events, /* get_poll_events */
+ default_poll_event, /* poll_event */
+ pipe_client_flush, /* flush */
+ pipe_end_get_info, /* get_file_info */
no_queue_async /* queue_async */
};
static void named_pipe_dump( struct object *obj, int verbose )
{
- struct named_pipe *pipe = (struct named_pipe *)obj;
+ struct named_pipe *pipe = (struct named_pipe *) obj;
assert( obj->ops == &named_pipe_ops );
fprintf( stderr, "named pipe %p\n" ,pipe);
}
-static void pipe_user_dump( struct object *obj, int verbose )
+static void pipe_server_dump( struct object *obj, int verbose )
{
- struct pipe_user *user = (struct pipe_user *)obj;
- assert( obj->ops == &pipe_user_ops );
- fprintf( stderr, "named pipe user %p (state %d)\n", user, user->state );
+ struct pipe_server *server = (struct pipe_server *) obj;
+ assert( obj->ops == &pipe_server_ops );
+ fprintf( stderr, "named pipe server %p (state %d)\n",
+ server, server->state );
+}
+
+static void pipe_client_dump( struct object *obj, int verbose )
+{
+ struct pipe_client *client = (struct pipe_client *) obj;
+ assert( obj->ops == &pipe_server_ops );
+ fprintf( stderr, "named pipe client %p (server state %d)\n",
+ client, client->server->state );
}
static void named_pipe_destroy( struct object *obj)
{
- struct named_pipe *pipe = (struct named_pipe *)obj;
- assert( !pipe->users );
+ struct named_pipe *pipe = (struct named_pipe *) obj;
+ assert( !pipe->servers );
+ assert( !pipe->instances );
}
-static void notify_waiter( struct pipe_user *user, unsigned int status)
+static void notify_waiter( struct wait_info *wait, unsigned int status )
{
- if(user->thread && user->func && user->overlapped)
+ if( wait->thread && wait->func && wait->overlapped )
{
/* queue a system APC, to notify a waiting thread */
- thread_queue_apc(user->thread, NULL, user->func, APC_ASYNC, 1,
- user->overlapped, (void *)status, NULL);
+ thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC,
+ 1, wait->overlapped, (void *)status, NULL );
}
- if (user->thread) release_object(user->thread);
- user->thread = NULL;
- user->func = NULL;
- user->overlapped=NULL;
+ if( wait->thread ) release_object( wait->thread );
+ wait->thread = NULL;
}
-static struct fd *pipe_user_get_fd( struct object *obj )
+static void set_waiter( struct wait_info *wait, void *func, void *ov )
{
- struct pipe_user *user = (struct pipe_user *)obj;
- if (user->fd) return (struct fd *)grab_object( user->fd );
+ wait->thread = (struct thread *) grab_object( current );
+ wait->func = func;
+ wait->overlapped = ov;
+}
+
+static void notify_connect_waiters( struct named_pipe *pipe )
+{
+ struct connect_wait *cw, **x = &pipe->connect_waiters;
+
+ while( *x )
+ {
+ cw = *x;
+ notify_waiter( &cw->wait, STATUS_SUCCESS );
+ release_object( pipe );
+ *x = cw->next;
+ free( cw );
+ }
+}
+
+static void queue_connect_waiter( struct named_pipe *pipe,
+ void *func, void *overlapped )
+{
+ struct connect_wait *waiter;
+
+ waiter = mem_alloc( sizeof *waiter );
+ if( waiter )
+ {
+ set_waiter( &waiter->wait, func, overlapped );
+ waiter->next = pipe->connect_waiters;
+ pipe->connect_waiters = waiter;
+ grab_object( pipe );
+ }
+}
+
+static struct fd *pipe_client_get_fd( struct object *obj )
+{
+ struct pipe_client *client = (struct pipe_client *) obj;
+ if( client->fd )
+ return (struct fd *) grab_object( client->fd );
set_error( STATUS_PIPE_DISCONNECTED );
return NULL;
}
-static void pipe_user_destroy( struct object *obj)
+static struct fd *pipe_server_get_fd( struct object *obj )
{
- struct pipe_user *user = (struct pipe_user *)obj;
+ struct pipe_server *server = (struct pipe_server *) obj;
- assert( obj->ops == &pipe_user_ops );
-
- if(user->overlapped)
- notify_waiter(user,STATUS_HANDLES_CLOSED);
-
- if(user->other)
+ switch(server->state)
{
- release_object( user->other->fd );
- user->other->fd = NULL;
- switch(user->other->state)
- {
- case ps_connected_server:
- user->other->state = ps_idle_server;
- break;
- case ps_connected_client:
- user->other->state = ps_disconnected;
- break;
- default:
- fprintf(stderr,"connected pipe has strange state %d!\n",
- user->other->state);
- }
- user->other->other=NULL;
- user->other = NULL;
- }
+ case ps_connected_server:
+ case ps_wait_disconnect:
+ assert( server->fd );
+ return (struct fd *) grab_object( server->fd );
- /* remove user from pipe's user list */
- if (user->next) user->next->prev = user->prev;
- if (user->prev) user->prev->next = user->next;
- else user->pipe->users = user->next;
- if (user->thread) release_object(user->thread);
- release_object(user->pipe);
- if (user->fd) release_object( user->fd );
+ case ps_wait_open:
+ case ps_idle_server:
+ set_error( STATUS_PIPE_LISTENING );
+ break;
+
+ case ps_disconnected_server:
+ case ps_wait_connect:
+ set_error( STATUS_PIPE_DISCONNECTED );
+ break;
+
+ default:
+ assert( 0 );
+ }
+ return NULL;
}
-static int pipe_user_get_poll_events( struct fd *fd )
+
+static void notify_empty( struct pipe_server *server )
+{
+ if( !server->flush_poll )
+ return;
+ assert( server->state == ps_connected_server );
+ assert( server->event );
+ remove_timeout_user( server->flush_poll );
+ server->flush_poll = NULL;
+ set_event( server->event );
+ release_object( server->event );
+ server->event = NULL;
+}
+
+static void do_disconnect( struct pipe_server *server )
+{
+ /* we may only have a server fd, if the client disconnected */
+ if( server->client )
+ {
+ assert( server->client->server == server );
+ assert( server->client->fd );
+ release_object( server->client->fd );
+ server->client->fd = NULL;
+ }
+ assert( server->fd );
+ release_object( server->fd );
+ server->fd = NULL;
+}
+
+static void pipe_server_destroy( struct object *obj)
+{
+ struct pipe_server *server = (struct pipe_server *)obj;
+
+ assert( obj->ops == &pipe_server_ops );
+
+ if( server->fd )
+ {
+ notify_empty( server );
+ do_disconnect( server );
+ }
+
+ if( server->client )
+ {
+ server->client->server = NULL;
+ server->client = NULL;
+ }
+
+ notify_waiter( &server->wait, STATUS_HANDLES_CLOSED );
+
+ assert( server->pipe->instances );
+ server->pipe->instances--;
+
+ /* remove server from pipe's server list */
+ if( server->next ) server->next->prev = server->prev;
+ if( server->prev ) server->prev->next = server->next;
+ else server->pipe->servers = server->next;
+ release_object( server->pipe );
+}
+
+static void pipe_client_destroy( struct object *obj)
+{
+ struct pipe_client *client = (struct pipe_client *)obj;
+ struct pipe_server *server = client->server;
+
+ assert( obj->ops == &pipe_client_ops );
+
+ notify_waiter( &client->wait, STATUS_HANDLES_CLOSED );
+
+ if( server )
+ {
+ notify_empty( server );
+
+ switch( server->state )
+ {
+ case ps_connected_server:
+ /* Don't destroy the server's fd here as we can't
+ do a successful flush without it. */
+ server->state = ps_wait_disconnect;
+ release_object( client->fd );
+ client->fd = NULL;
+ break;
+ case ps_disconnected_server:
+ server->state = ps_wait_connect;
+ break;
+ default:
+ assert( 0 );
+ }
+ assert( server->client );
+ server->client = NULL;
+ client->server = NULL;
+ }
+ assert( !client->fd );
+}
+
+static int pipe_end_get_poll_events( struct fd *fd )
{
return POLLIN | POLLOUT; /* FIXME */
}
-static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags )
+static int pipe_data_remaining( struct pipe_server *server )
+{
+ struct pollfd pfd;
+ int fd;
+
+ assert( server->client );
+
+ fd = get_unix_fd( server->client->fd );
+ if( fd < 0 )
+ return 0;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ if( 0 > poll( &pfd, 1, 0 ) )
+ return 0;
+
+ return pfd.revents&POLLIN;
+}
+
+static void check_flushed( void *arg )
+{
+ struct pipe_server *server = (struct pipe_server*) arg;
+
+ assert( server->event );
+ if( pipe_data_remaining( server ) )
+ {
+ struct timeval tv;
+
+ gettimeofday( &tv, 0 );
+ add_timeout( &tv, 100 );
+ server->flush_poll = add_timeout_user( &tv, check_flushed, server );
+ }
+ else
+ notify_empty( server );
+}
+
+static int pipe_server_flush( struct fd *fd, struct event **event )
+{
+ struct pipe_server *server = get_fd_user( fd );
+
+ if( !server )
+ return 0;
+
+ if( server->state != ps_connected_server )
+ return 0;
+
+ /* FIXME: if multiple threads flush the same pipe,
+ maybe should create a list of processes to notify */
+ if( server->flush_poll )
+ return 0;
+
+ if( pipe_data_remaining( server ) )
+ {
+ struct timeval tv;
+
+ /* this kind of sux -
+ there's no unix way to be alerted when a pipe becomes empty */
+ server->event = create_event( NULL, 0, 0, 0 );
+ if( !server->event )
+ return 0;
+ gettimeofday( &tv, 0 );
+ add_timeout( &tv, 100 );
+ server->flush_poll = add_timeout_user( &tv, check_flushed, server );
+ *event = server->event;
+ }
+
+ return 0;
+}
+
+static int pipe_client_flush( struct fd *fd, struct event **event )
+{
+ /* FIXME: what do we have to do for this? */
+ return 0;
+}
+
+static int pipe_end_get_info( struct fd *fd,
+ struct get_file_info_reply *reply, int *flags )
{
if (reply)
{
@@ -234,12 +493,15 @@
{
struct named_pipe *pipe;
- if ((pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len )))
+ pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len );
+ if( pipe )
{
- if (get_error() != STATUS_OBJECT_NAME_COLLISION)
+ if( get_error() != STATUS_OBJECT_NAME_COLLISION )
{
/* initialize it if it didn't already exist */
- pipe->users = 0;
+ pipe->servers = 0;
+ pipe->instances = 0;
+ pipe->connect_waiters = NULL;
}
}
return pipe;
@@ -260,65 +522,80 @@
return NULL;
}
-static struct pipe_user *get_pipe_user_obj( struct process *process, obj_handle_t handle,
- unsigned int access )
+static struct pipe_server *get_pipe_server_obj( struct process *process,
+ obj_handle_t handle, unsigned int access )
{
- return (struct pipe_user *)get_handle_obj( process, handle, access, &pipe_user_ops );
+ struct object *obj;
+ obj = get_handle_obj( process, handle, access, &pipe_server_ops );
+ return (struct pipe_server *) obj;
}
-static struct pipe_user *create_pipe_user( struct named_pipe *pipe )
+static struct pipe_server *create_pipe_server( struct named_pipe *pipe )
{
- struct pipe_user *user;
+ struct pipe_server *server;
- user = alloc_object( &pipe_user_ops );
- if(!user)
+ server = alloc_object( &pipe_server_ops );
+ if( !server )
return NULL;
- user->fd = NULL;
- user->pipe = pipe;
- user->state = ps_none;
- user->other = NULL;
- user->thread = NULL;
- user->func = NULL;
- user->overlapped = NULL;
+ server->fd = NULL;
+ server->pipe = pipe;
+ server->state = ps_none;
+ server->client = NULL;
+ server->flush_poll = NULL;
+ server->wait.thread = NULL;
- /* add to list of pipe users */
- if ((user->next = pipe->users)) user->next->prev = user;
- user->prev = NULL;
- pipe->users = user;
+ /* add to list of pipe servers */
+ if ((server->next = pipe->servers)) server->next->prev = server;
+ server->prev = NULL;
+ pipe->servers = server;
- grab_object(pipe);
+ grab_object( pipe );
- return user;
+ return server;
}
-static struct pipe_user *find_partner(struct named_pipe *pipe, enum pipe_state state)
+static struct pipe_client *create_pipe_client( struct pipe_server *server )
{
- struct pipe_user *x;
+ struct pipe_client *client;
- for(x = pipe->users; x; x=x->next)
- {
- if(x->state==state)
- break;
- }
-
- if(!x)
+ client = alloc_object( &pipe_client_ops );
+ if( !client )
return NULL;
- return (struct pipe_user *)grab_object( x );
+ client->fd = NULL;
+ client->server = server;
+ client->wait.thread = NULL;
+
+ return client;
+}
+
+static struct pipe_server *find_server( struct named_pipe *pipe,
+ enum pipe_state state )
+{
+ struct pipe_server *x;
+
+ for( x = pipe->servers; x; x = x->next )
+ if( x->state == state )
+ break;
+
+ if( !x )
+ return NULL;
+
+ return (struct pipe_server *) grab_object( x );
}
DECL_HANDLER(create_named_pipe)
{
struct named_pipe *pipe;
- struct pipe_user *user;
+ struct pipe_server *server;
reply->handle = 0;
pipe = create_named_pipe( get_req_data(), get_req_data_size() );
- if(!pipe)
+ if( !pipe )
return;
- if (get_error() != STATUS_OBJECT_NAME_COLLISION)
+ if( get_error() != STATUS_OBJECT_NAME_COLLISION )
{
pipe->insize = req->insize;
pipe->outsize = req->outsize;
@@ -326,14 +603,33 @@
pipe->timeout = req->timeout;
pipe->pipemode = req->pipemode;
}
-
- user = create_pipe_user( pipe );
-
- if(user)
+ else
{
- user->state = ps_idle_server;
- reply->handle = alloc_handle( current->process, user, GENERIC_READ|GENERIC_WRITE, 0 );
- release_object( user );
+ set_error( 0 ); /* clear the name collision */
+ if( pipe->maxinstances <= pipe->instances )
+ {
+ set_error( STATUS_PIPE_BUSY );
+ release_object( pipe );
+ return;
+ }
+ if( ( pipe->maxinstances != req->maxinstances ) ||
+ ( pipe->timeout != req->timeout ) ||
+ ( pipe->pipemode != req->pipemode ) )
+ {
+ set_error( STATUS_ACCESS_DENIED );
+ release_object( pipe );
+ return;
+ }
+ }
+
+ server = create_pipe_server( pipe );
+ if(server)
+ {
+ server->state = ps_idle_server;
+ reply->handle = alloc_handle( current->process, server,
+ GENERIC_READ|GENERIC_WRITE, 0 );
+ server->pipe->instances++;
+ release_object( server );
}
release_object( pipe );
@@ -341,147 +637,173 @@
DECL_HANDLER(open_named_pipe)
{
- struct pipe_user *user, *partner;
+ struct pipe_server *server;
+ struct pipe_client *client;
struct named_pipe *pipe;
+ int fds[2];
reply->handle = 0;
- if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
+ pipe = open_named_pipe( get_req_data(), get_req_data_size() );
+ if ( !pipe )
{
set_error( STATUS_NO_SUCH_FILE );
return;
}
- if (!(partner = find_partner(pipe, ps_wait_open)))
+
+ for( server = pipe->servers; server; server = server->next )
+ if( ( server->state==ps_idle_server ) ||
+ ( server->state==ps_wait_open ) )
+ break;
+ release_object( pipe );
+
+ if ( !server )
{
- release_object(pipe);
set_error( STATUS_PIPE_NOT_AVAILABLE );
return;
}
- if ((user = create_pipe_user( pipe )))
- {
- int fds[2];
- if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds))
+ client = create_pipe_client( server );
+ if( client )
+ {
+ if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) )
{
- user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj );
- partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj );
- if (user->fd && partner->fd)
+ assert( !client->fd );
+ assert( !server->fd );
+ client->fd = create_anonymous_fd( &pipe_server_fd_ops,
+ fds[1], &client->obj );
+ server->fd = create_anonymous_fd( &pipe_server_fd_ops,
+ fds[0], &server->obj );
+ if (client->fd && server->fd)
{
- notify_waiter(partner,STATUS_SUCCESS);
- partner->state = ps_connected_server;
- partner->other = user;
- user->state = ps_connected_client;
- user->other = partner;
- reply->handle = alloc_handle( current->process, user, req->access, 0 );
+ if( server->state == ps_wait_open )
+ notify_waiter( &server->wait, STATUS_SUCCESS );
+ assert( !server->wait.thread );
+ server->state = ps_connected_server;
+ server->client = client;
+ client->server = server;
+ reply->handle = alloc_handle( current->process, client,
+ req->access, 0 );
}
}
- else file_set_error();
+ else
+ file_set_error();
- release_object( user );
+ release_object( client );
}
- release_object( partner );
- release_object( pipe );
}
DECL_HANDLER(connect_named_pipe)
{
- struct pipe_user *user, *partner;
+ struct pipe_server *server;
- user = get_pipe_user_obj(current->process, req->handle, 0);
- if(!user)
+ server = get_pipe_server_obj(current->process, req->handle, 0);
+ if(!server)
return;
- if( user->state != ps_idle_server )
+ switch( server->state )
{
- set_error(STATUS_PORT_ALREADY_SET);
- }
- else
- {
- user->state = ps_wait_open;
- user->thread = (struct thread *)grab_object(current);
- user->func = req->func;
- user->overlapped = req->overlapped;
-
- /* notify all waiters that a pipe just became available */
- while( (partner = find_partner(user->pipe,ps_wait_connect)) )
- {
- notify_waiter(partner,STATUS_SUCCESS);
- release_object(partner);
- }
+ case ps_idle_server:
+ case ps_wait_connect:
+ assert( !server->fd );
+ server->state = ps_wait_open;
+ set_waiter( &server->wait, req->func, req->overlapped );
+ notify_connect_waiters( server->pipe );
+ break;
+ case ps_connected_server:
+ assert( server->fd );
+ set_error( STATUS_PIPE_CONNECTED );
+ break;
+ case ps_disconnected_server:
+ set_error( STATUS_PIPE_BUSY );
+ break;
+ case ps_wait_disconnect:
+ set_error( STATUS_NO_DATA_DETECTED );
+ break;
+ default:
+ set_error( STATUS_INVALID_HANDLE );
+ break;
}
- release_object(user);
+ release_object(server);
}
DECL_HANDLER(wait_named_pipe)
{
struct named_pipe *pipe;
- struct pipe_user *partner;
+ struct pipe_server *server;
if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() )))
{
set_error( STATUS_PIPE_NOT_AVAILABLE );
return;
}
- if( (partner = find_partner(pipe,ps_wait_open)) )
+ server = find_server( pipe, ps_wait_open );
+ if( server )
{
- /* this should use notify_waiter,
- but no pipe_user object exists now... */
- thread_queue_apc(current,NULL,req->func,
- APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL);
- release_object(partner);
+ /* there's already a server waiting for a client to connect */
+ struct wait_info wait;
+ set_waiter( &wait, req->func, req->overlapped );
+ notify_waiter( &wait, STATUS_SUCCESS );
+ release_object( server );
}
else
- {
- struct pipe_user *user;
+ queue_connect_waiter( pipe, req->func, req->overlapped );
- if( (user = create_pipe_user( pipe )) )
- {
- user->state = ps_wait_connect;
- user->thread = (struct thread *)grab_object(current);
- user->func = req->func;
- user->overlapped = req->overlapped;
- /* don't release it */
- }
- }
- release_object(pipe);
+ release_object( pipe );
}
DECL_HANDLER(disconnect_named_pipe)
{
- struct pipe_user *user;
+ struct pipe_server *server;
- user = get_pipe_user_obj(current->process, req->handle, 0);
- if(!user)
+ reply->fd = -1;
+ server = get_pipe_server_obj( current->process, req->handle, 0 );
+ if( !server )
return;
- if( (user->state == ps_connected_server) &&
- (user->other->state == ps_connected_client) )
+ switch( server->state )
{
- release_object( user->other->fd );
- user->other->fd = NULL;
- user->other->state = ps_disconnected;
- user->other->other = NULL;
+ case ps_connected_server:
+ assert( server->fd );
+ assert( server->client );
+ assert( server->client->fd );
- release_object( user->fd );
- user->fd = NULL;
- user->state = ps_idle_server;
- user->other = NULL;
+ notify_empty( server );
+ notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED );
+
+ /* Dump the client and server fds, but keep the pointers
+ around - client loses all waiting data */
+ server->state = ps_disconnected_server;
+ do_disconnect( server );
+ reply->fd = flush_cached_fd( current->process, req->handle );
+ break;
+
+ case ps_wait_disconnect:
+ assert( !server->client );
+ assert( server->fd );
+ do_disconnect( server );
+ server->state = ps_wait_connect;
+ reply->fd = flush_cached_fd( current->process, req->handle );
+ break;
+
+ default:
+ set_error( STATUS_PIPE_DISCONNECTED );
}
- release_object(user);
+ release_object( server );
}
DECL_HANDLER(get_named_pipe_info)
{
- struct pipe_user *user;
+ struct pipe_server *server;
- user = get_pipe_user_obj(current->process, req->handle, 0);
- if(!user)
+ server = get_pipe_server_obj( current->process, req->handle, 0 );
+ if(!server)
return;
- reply->flags = user->pipe->pipemode;
- reply->maxinstances = user->pipe->maxinstances;
- reply->insize = user->pipe->insize;
- reply->outsize = user->pipe->outsize;
+ reply->flags = server->pipe->pipemode;
+ reply->maxinstances = server->pipe->maxinstances;
+ reply->insize = server->pipe->insize;
+ reply->outsize = server->pipe->outsize;
- release_object(user);
+ release_object(server);
}