Add support for receiving fds in replies
[free-sw/xcb/libxcb] / src / xcb_in.c
1 /* Copyright (C) 2001-2004 Bart Massey and Jamey Sharp.
2  *
3  * Permission is hereby granted, free of charge, to any person obtaining a
4  * copy of this software and associated documentation files (the "Software"),
5  * to deal in the Software without restriction, including without limitation
6  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
7  * and/or sell copies of the Software, and to permit persons to whom the
8  * Software is furnished to do so, subject to the following conditions:
9  * 
10  * The above copyright notice and this permission notice shall be included in
11  * all copies or substantial portions of the Software.
12  * 
13  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16  * AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
17  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
18  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
19  * 
20  * Except as contained in this notice, the names of the authors or their
21  * institutions shall not be used in advertising or otherwise to promote the
22  * sale, use or other dealings in this Software without prior written
23  * authorization from the authors.
24  */
25
26 /* Stuff that reads stuff from the server. */
27
28 #ifdef HAVE_CONFIG_H
29 #include "config.h"
30 #endif
31
32 #include <assert.h>
33 #include <string.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <stdio.h>
37 #include <errno.h>
38
39 #include "xcb.h"
40 #include "xcbext.h"
41 #include "xcbint.h"
42 #if USE_POLL
43 #include <poll.h>
44 #endif
45 #ifndef _WIN32
46 #include <sys/select.h>
47 #include <sys/socket.h>
48 #endif
49
50 #ifdef _WIN32
51 #include "xcb_windefs.h"
52 #endif /* _WIN32 */
53
54 #define XCB_ERROR 0
55 #define XCB_REPLY 1
56 #define XCB_XGE_EVENT 35
57
58 struct event_list {
59     xcb_generic_event_t *event;
60     struct event_list *next;
61 };
62
63 struct reply_list {
64     void *reply;
65     struct reply_list *next;
66 };
67
68 typedef struct pending_reply {
69     uint64_t first_request;
70     uint64_t last_request;
71     enum workarounds workaround;
72     int flags;
73     struct pending_reply *next;
74 } pending_reply;
75
76 typedef struct reader_list {
77     uint64_t request;
78     pthread_cond_t *data;
79     struct reader_list *next;
80 } reader_list;
81
82 static void remove_finished_readers(reader_list **prev_reader, uint64_t completed)
83 {
84     while(*prev_reader && XCB_SEQUENCE_COMPARE((*prev_reader)->request, <=, completed))
85     {
86         /* If you don't have what you're looking for now, you never
87          * will. Wake up and leave me alone. */
88         pthread_cond_signal((*prev_reader)->data);
89         *prev_reader = (*prev_reader)->next;
90     }
91 }
92
93 #if HAVE_SENDMSG
94 static int read_fds(xcb_connection_t *c, int *fds, int nfd)
95 {
96     int *ifds = &c->in.in_fd.fd[c->in.in_fd.ifd];
97     int infd = c->in.in_fd.nfd - c->in.in_fd.ifd;
98
99     if (nfd > infd)
100         return 0;
101     memcpy(fds, ifds, nfd * sizeof (int));
102     c->in.in_fd.ifd += nfd;
103     return 1;
104 }
105 #endif
106
107 static int read_packet(xcb_connection_t *c)
108 {
109     xcb_generic_reply_t genrep;
110     uint64_t length = 32;
111     uint64_t eventlength = 0; /* length after first 32 bytes for GenericEvents */
112     int nfd = 0;         /* Number of file descriptors attached to the reply */
113     uint64_t bufsize;
114     void *buf;
115     pending_reply *pend = 0;
116     struct event_list *event;
117
118     /* Wait for there to be enough data for us to read a whole packet */
119     if(c->in.queue_len < length)
120         return 0;
121
122     /* Get the response type, length, and sequence number. */
123     memcpy(&genrep, c->in.queue, sizeof(genrep));
124
125     /* Compute 32-bit sequence number of this packet. */
126     if((genrep.response_type & 0x7f) != XCB_KEYMAP_NOTIFY)
127     {
128         uint64_t lastread = c->in.request_read;
129         c->in.request_read = (lastread & UINT64_C(0xffffffffffff0000)) | genrep.sequence;
130         if(XCB_SEQUENCE_COMPARE(c->in.request_read, <, lastread))
131             c->in.request_read += 0x10000;
132         if(XCB_SEQUENCE_COMPARE(c->in.request_read, >, c->in.request_expected))
133             c->in.request_expected = c->in.request_read;
134
135         if(c->in.request_read != lastread)
136         {
137             if(c->in.current_reply)
138             {
139                 _xcb_map_put(c->in.replies, lastread, c->in.current_reply);
140                 c->in.current_reply = 0;
141                 c->in.current_reply_tail = &c->in.current_reply;
142             }
143             c->in.request_completed = c->in.request_read - 1;
144         }
145
146         while(c->in.pending_replies && 
147               c->in.pending_replies->workaround != WORKAROUND_EXTERNAL_SOCKET_OWNER &&
148               XCB_SEQUENCE_COMPARE (c->in.pending_replies->last_request, <=, c->in.request_completed))
149         {
150             pending_reply *oldpend = c->in.pending_replies;
151             c->in.pending_replies = oldpend->next;
152             if(!oldpend->next)
153                 c->in.pending_replies_tail = &c->in.pending_replies;
154             free(oldpend);
155         }
156
157         if(genrep.response_type == XCB_ERROR)
158             c->in.request_completed = c->in.request_read;
159
160         remove_finished_readers(&c->in.readers, c->in.request_completed);
161     }
162
163     if(genrep.response_type == XCB_ERROR || genrep.response_type == XCB_REPLY)
164     {
165         pend = c->in.pending_replies;
166         if(pend &&
167            !(XCB_SEQUENCE_COMPARE(pend->first_request, <=, c->in.request_read) &&
168              (pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER ||
169               XCB_SEQUENCE_COMPARE(c->in.request_read, <=, pend->last_request))))
170             pend = 0;
171     }
172
173     /* For reply packets, check that the entire packet is available. */
174     if(genrep.response_type == XCB_REPLY)
175     {
176         if(pend && pend->workaround == WORKAROUND_GLX_GET_FB_CONFIGS_BUG)
177         {
178             uint32_t *p = (uint32_t *) c->in.queue;
179             genrep.length = p[2] * p[3] * 2;
180         }
181         length += genrep.length * 4;
182
183         /* XXX a bit of a hack -- we "know" that all FD replys place
184          * the number of fds in the pad0 byte */
185         if (pend && pend->flags & XCB_REQUEST_REPLY_FDS)
186             nfd = genrep.pad0;
187     }
188
189     /* XGE events may have sizes > 32 */
190     if ((genrep.response_type & 0x7f) == XCB_XGE_EVENT)
191         eventlength = genrep.length * 4;
192
193     bufsize = length + eventlength + nfd * sizeof(int)  +
194         (genrep.response_type == XCB_REPLY ? 0 : sizeof(uint32_t));
195     if (bufsize < INT32_MAX)
196         buf = malloc((size_t) bufsize);
197     else
198         buf = NULL;
199     if(!buf)
200     {
201         _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT);
202         return 0;
203     }
204
205     if(_xcb_in_read_block(c, buf, length) <= 0)
206     {
207         free(buf);
208         return 0;
209     }
210
211     /* pull in XGE event data if available, append after event struct */
212     if (eventlength)
213     {
214         if(_xcb_in_read_block(c, &((xcb_generic_event_t*)buf)[1], eventlength) <= 0)
215         {
216             free(buf);
217             return 0;
218         }
219     }
220
221 #if HAVE_SENDMSG
222     if (nfd)
223     {
224         if (!read_fds(c, (int *) &((char *) buf)[length], nfd))
225         {
226             free(buf);
227             return 0;
228         }
229     }
230 #endif
231
232     if(pend && (pend->flags & XCB_REQUEST_DISCARD_REPLY))
233     {
234         free(buf);
235         return 1;
236     }
237
238     if(genrep.response_type != XCB_REPLY)
239         ((xcb_generic_event_t *) buf)->full_sequence = c->in.request_read;
240
241     /* reply, or checked error */
242     if( genrep.response_type == XCB_REPLY ||
243        (genrep.response_type == XCB_ERROR && pend && (pend->flags & XCB_REQUEST_CHECKED)))
244     {
245         struct reply_list *cur = malloc(sizeof(struct reply_list));
246         if(!cur)
247         {
248             _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT);
249             free(buf);
250             return 0;
251         }
252         cur->reply = buf;
253         cur->next = 0;
254         *c->in.current_reply_tail = cur;
255         c->in.current_reply_tail = &cur->next;
256         if(c->in.readers && c->in.readers->request == c->in.request_read)
257             pthread_cond_signal(c->in.readers->data);
258         return 1;
259     }
260
261     /* event, or unchecked error */
262     event = malloc(sizeof(struct event_list));
263     if(!event)
264     {
265         _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT);
266         free(buf);
267         return 0;
268     }
269     event->event = buf;
270     event->next = 0;
271     *c->in.events_tail = event;
272     c->in.events_tail = &event->next;
273     pthread_cond_signal(&c->in.event_cond);
274     return 1; /* I have something for you... */
275 }
276
277 static xcb_generic_event_t *get_event(xcb_connection_t *c)
278 {
279     struct event_list *cur = c->in.events;
280     xcb_generic_event_t *ret;
281     if(!c->in.events)
282         return 0;
283     ret = cur->event;
284     c->in.events = cur->next;
285     if(!cur->next)
286         c->in.events_tail = &c->in.events;
287     free(cur);
288     return ret;
289 }
290
291 static void free_reply_list(struct reply_list *head)
292 {
293     while(head)
294     {
295         struct reply_list *cur = head;
296         head = cur->next;
297         free(cur->reply);
298         free(cur);
299     }
300 }
301
302 static int read_block(const int fd, void *buf, const ssize_t len)
303 {
304     int done = 0;
305     while(done < len)
306     {
307         int ret = recv(fd, ((char *) buf) + done, len - done, 0);
308         if(ret > 0)
309             done += ret;
310 #ifndef _WIN32
311         if(ret < 0 && errno == EAGAIN)
312 #else
313         if(ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK)
314 #endif /* !_Win32 */
315         {
316 #if USE_POLL
317             struct pollfd pfd;
318             pfd.fd = fd;
319             pfd.events = POLLIN;
320             pfd.revents = 0;
321             do {
322                 ret = poll(&pfd, 1, -1);
323             } while (ret == -1 && errno == EINTR);
324 #else
325             fd_set fds;
326             FD_ZERO(&fds);
327             FD_SET(fd, &fds);
328
329             /* Initializing errno here makes sure that for Win32 this loop will execute only once */
330             errno = 0;  
331             do {
332                 ret = select(fd + 1, &fds, 0, 0, 0);
333             } while (ret == -1 && errno == EINTR);
334 #endif /* USE_POLL */
335         }
336         if(ret <= 0)
337             return ret;
338     }
339     return len;
340 }
341
342 static int poll_for_reply(xcb_connection_t *c, uint64_t request, void **reply, xcb_generic_error_t **error)
343 {
344     struct reply_list *head;
345
346     /* If an error occurred when issuing the request, fail immediately. */
347     if(!request)
348         head = 0;
349     /* We've read requests past the one we want, so if it has replies we have
350      * them all and they're in the replies map. */
351     else if(XCB_SEQUENCE_COMPARE(request, <, c->in.request_read))
352     {
353         head = _xcb_map_remove(c->in.replies, request);
354         if(head && head->next)
355             _xcb_map_put(c->in.replies, request, head->next);
356     }
357     /* We're currently processing the responses to the request we want, and we
358      * have a reply ready to return. So just return it without blocking. */
359     else if(request == c->in.request_read && c->in.current_reply)
360     {
361         head = c->in.current_reply;
362         c->in.current_reply = head->next;
363         if(!head->next)
364             c->in.current_reply_tail = &c->in.current_reply;
365     }
366     /* We know this request can't have any more replies, and we've already
367      * established it doesn't have a reply now. Don't bother blocking. */
368     else if(request == c->in.request_completed)
369         head = 0;
370     /* We may have more replies on the way for this request: block until we're
371      * sure. */
372     else
373         return 0;
374
375     if(error)
376         *error = 0;
377     *reply = 0;
378
379     if(head)
380     {
381         if(((xcb_generic_reply_t *) head->reply)->response_type == XCB_ERROR)
382         {
383             if(error)
384                 *error = head->reply;
385             else
386                 free(head->reply);
387         }
388         else
389             *reply = head->reply;
390
391         free(head);
392     }
393
394     return 1;
395 }
396
397 static void insert_reader(reader_list **prev_reader, reader_list *reader, uint64_t request, pthread_cond_t *cond)
398 {
399     while(*prev_reader && XCB_SEQUENCE_COMPARE((*prev_reader)->request, <=, request))
400         prev_reader = &(*prev_reader)->next;
401     reader->request = request;
402     reader->data = cond;
403     reader->next = *prev_reader;
404     *prev_reader = reader;
405 }
406
407 static void remove_reader(reader_list **prev_reader, reader_list *reader)
408 {
409     while(*prev_reader && XCB_SEQUENCE_COMPARE((*prev_reader)->request, <=, reader->request))
410         if(*prev_reader == reader)
411         {
412             *prev_reader = (*prev_reader)->next;
413             break;
414         }
415 }
416
417 static void *wait_for_reply(xcb_connection_t *c, uint64_t request, xcb_generic_error_t **e)
418 {
419     void *ret = 0;
420
421     /* If this request has not been written yet, write it. */
422     if(c->out.return_socket || _xcb_out_flush_to(c, request))
423     {
424         pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
425         reader_list reader;
426
427         insert_reader(&c->in.readers, &reader, request, &cond);
428
429         while(!poll_for_reply(c, request, &ret, e))
430             if(!_xcb_conn_wait(c, &cond, 0, 0))
431                 break;
432
433         remove_reader(&c->in.readers, &reader);
434         pthread_cond_destroy(&cond);
435     }
436
437     _xcb_in_wake_up_next_reader(c);
438     return ret;
439 }
440
441 static uint64_t widen(xcb_connection_t *c, unsigned int request)
442 {
443     uint64_t widened_request = (c->out.request & UINT64_C(0xffffffff00000000)) | request;
444     if(widened_request > c->out.request)
445         widened_request -= UINT64_C(1) << 32;
446     return widened_request;
447 }
448
449 /* Public interface */
450
451 void *xcb_wait_for_reply(xcb_connection_t *c, unsigned int request, xcb_generic_error_t **e)
452 {
453     void *ret;
454     if(e)
455         *e = 0;
456     if(c->has_error)
457         return 0;
458
459     pthread_mutex_lock(&c->iolock);
460     ret = wait_for_reply(c, widen(c, request), e);
461     pthread_mutex_unlock(&c->iolock);
462     return ret;
463 }
464
465 int *xcb_get_reply_fds(xcb_connection_t *c, void *reply, size_t reply_size)
466 {
467     return (int *) (&((char *) reply)[reply_size]);
468 }
469
470 static void insert_pending_discard(xcb_connection_t *c, pending_reply **prev_next, uint64_t seq)
471 {
472     pending_reply *pend;
473     pend = malloc(sizeof(*pend));
474     if(!pend)
475     {
476         _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT);
477         return;
478     }
479
480     pend->first_request = seq;
481     pend->last_request = seq;
482     pend->workaround = 0;
483     pend->flags = XCB_REQUEST_DISCARD_REPLY;
484     pend->next = *prev_next;
485     *prev_next = pend;
486
487     if(!pend->next)
488         c->in.pending_replies_tail = &pend->next;
489 }
490
491 static void discard_reply(xcb_connection_t *c, uint64_t request)
492 {
493     void *reply;
494     pending_reply **prev_pend;
495
496     /* Free any replies or errors that we've already read. Stop if
497      * xcb_wait_for_reply would block or we've run out of replies. */
498     while(poll_for_reply(c, request, &reply, 0) && reply)
499         free(reply);
500
501     /* If we've proven there are no more responses coming, we're done. */
502     if(XCB_SEQUENCE_COMPARE(request, <=, c->in.request_completed))
503         return;
504
505     /* Walk the list of pending requests. Mark the first match for deletion. */
506     for(prev_pend = &c->in.pending_replies; *prev_pend; prev_pend = &(*prev_pend)->next)
507     {
508         if(XCB_SEQUENCE_COMPARE((*prev_pend)->first_request, >, request))
509             break;
510
511         if((*prev_pend)->first_request == request)
512         {
513             /* Pending reply found. Mark for discard: */
514             (*prev_pend)->flags |= XCB_REQUEST_DISCARD_REPLY;
515             return;
516         }
517     }
518
519     /* Pending reply not found (likely due to _unchecked request). Create one: */
520     insert_pending_discard(c, prev_pend, request);
521 }
522
523 void xcb_discard_reply(xcb_connection_t *c, unsigned int sequence)
524 {
525     if(c->has_error)
526         return;
527
528     /* If an error occurred when issuing the request, fail immediately. */
529     if(!sequence)
530         return;
531
532     pthread_mutex_lock(&c->iolock);
533     discard_reply(c, widen(c, sequence));
534     pthread_mutex_unlock(&c->iolock);
535 }
536
537 int xcb_poll_for_reply(xcb_connection_t *c, unsigned int request, void **reply, xcb_generic_error_t **error)
538 {
539     int ret;
540     if(c->has_error)
541     {
542         *reply = 0;
543         if(error)
544             *error = 0;
545         return 1; /* would not block */
546     }
547     assert(reply != 0);
548     pthread_mutex_lock(&c->iolock);
549     ret = poll_for_reply(c, widen(c, request), reply, error);
550     pthread_mutex_unlock(&c->iolock);
551     return ret;
552 }
553
554 xcb_generic_event_t *xcb_wait_for_event(xcb_connection_t *c)
555 {
556     xcb_generic_event_t *ret;
557     if(c->has_error)
558         return 0;
559     pthread_mutex_lock(&c->iolock);
560     /* get_event returns 0 on empty list. */
561     while(!(ret = get_event(c)))
562         if(!_xcb_conn_wait(c, &c->in.event_cond, 0, 0))
563             break;
564
565     _xcb_in_wake_up_next_reader(c);
566     pthread_mutex_unlock(&c->iolock);
567     return ret;
568 }
569
570 static xcb_generic_event_t *poll_for_next_event(xcb_connection_t *c, int queued)
571 {
572     xcb_generic_event_t *ret = 0;
573     if(!c->has_error)
574     {
575         pthread_mutex_lock(&c->iolock);
576         /* FIXME: follow X meets Z architecture changes. */
577         ret = get_event(c);
578         if(!ret && !queued && c->in.reading == 0 && _xcb_in_read(c)) /* _xcb_in_read shuts down the connection on error */
579             ret = get_event(c);
580         pthread_mutex_unlock(&c->iolock);
581     }
582     return ret;
583 }
584
585 xcb_generic_event_t *xcb_poll_for_event(xcb_connection_t *c)
586 {
587     return poll_for_next_event(c, 0);
588 }
589
590 xcb_generic_event_t *xcb_poll_for_queued_event(xcb_connection_t *c)
591 {
592     return poll_for_next_event(c, 1);
593 }
594
595 xcb_generic_error_t *xcb_request_check(xcb_connection_t *c, xcb_void_cookie_t cookie)
596 {
597     uint64_t request;
598     xcb_generic_error_t *ret = 0;
599     void *reply;
600     if(c->has_error)
601         return 0;
602     pthread_mutex_lock(&c->iolock);
603     request = widen(c, cookie.sequence);
604     if(XCB_SEQUENCE_COMPARE(request, >=, c->in.request_expected)
605        && XCB_SEQUENCE_COMPARE(request, >, c->in.request_completed))
606     {
607         _xcb_out_send_sync(c);
608         _xcb_out_flush_to(c, c->out.request);
609     }
610     reply = wait_for_reply(c, request, &ret);
611     assert(!reply);
612     pthread_mutex_unlock(&c->iolock);
613     return ret;
614 }
615
616 static xcb_generic_event_t *get_special_event(xcb_connection_t *c,
617                                               xcb_special_event_t *se)
618 {
619     xcb_generic_event_t *event = NULL;
620     struct event_list *events;
621
622     if ((events = se->events) != NULL) {
623         event = events->event;
624         if (!(se->events = events->next))
625             se->events_tail = &se->events;
626         free (events);
627     }
628     return event;
629 }
630
631 xcb_generic_event_t *xcb_poll_for_special_event(xcb_connection_t *c,
632                                                 xcb_special_event_t *se)
633 {
634     xcb_generic_event_t *event;
635
636     if(c->has_error)
637         return 0;
638     pthread_mutex_lock(&c->iolock);
639     event = get_special_event(c, se);
640     pthread_mutex_unlock(&c->iolock);
641     return event;
642 }
643
644 xcb_generic_event_t *xcb_wait_for_special_event(xcb_connection_t *c,
645                                                 xcb_special_event_t *se)
646 {
647     xcb_generic_event_t *event;
648
649     if(c->has_error)
650         return 0;
651     pthread_mutex_lock(&c->iolock);
652
653     /* get_special_event returns 0 on empty list. */
654     while(!(event = get_special_event(c, se)))
655         if(!_xcb_conn_wait(c, &se->special_event_cond, 0, 0))
656             break;
657
658     pthread_mutex_unlock(&c->iolock);
659     return event;
660 }
661
662 xcb_special_event_t *
663 xcb_register_for_special_xge(xcb_connection_t *c,
664                              uint8_t extension,
665                              uint32_t eid,
666                              uint32_t *stamp)
667 {
668     xcb_special_event_t *se;
669
670     if(c->has_error)
671         return NULL;
672     pthread_mutex_lock(&c->iolock);
673     for (se = c->in.special_events; se; se = se->next) {
674         if (se->extension == extension &&
675             se->eid == eid) {
676             pthread_mutex_unlock(&c->iolock);
677             return NULL;
678         }
679     }
680     se = calloc(1, sizeof(xcb_special_event_t));
681     if (!se) {
682         pthread_mutex_unlock(&c->iolock);
683         return NULL;
684     }
685             
686     se->extension = extension;
687     se->eid = eid;
688
689     se->events = NULL;
690     se->events_tail = &se->events;
691     se->stamp = stamp;
692
693     pthread_cond_init(&se->special_event_cond, 0);
694
695     se->next = c->in.special_events;
696     c->in.special_events = se;
697
698     pthread_mutex_unlock(&c->iolock);
699     return se;
700 }
701
702 void
703 xcb_unregister_for_special_event(xcb_connection_t *c,
704                                  xcb_special_event_t *se)
705 {
706     xcb_special_event_t *s, **prev;
707     struct event_list   *events, *next;
708
709     if (c->has_error)
710         return;
711
712     pthread_mutex_lock(&c->iolock);
713
714     for (prev = &c->in.special_events; (s = *prev) != NULL; prev = &(s->next)) {
715         if (s == se) {
716             *prev = se->next;
717             for (events = se->events; events; events = next) {
718                 next = events->next;
719                 free (events->event);
720                 free (events);
721             }
722             pthread_cond_destroy(&se->special_event_cond);
723             free (se);
724             break;
725         }
726     }
727     pthread_mutex_unlock(&c->iolock);
728 }
729
730 /* Private interface */
731
732 int _xcb_in_init(_xcb_in *in)
733 {
734     if(pthread_cond_init(&in->event_cond, 0))
735         return 0;
736     in->reading = 0;
737
738     in->queue_len = 0;
739
740     in->request_read = 0;
741     in->request_completed = 0;
742
743     in->replies = _xcb_map_new();
744     if(!in->replies)
745         return 0;
746
747     in->current_reply_tail = &in->current_reply;
748     in->events_tail = &in->events;
749     in->pending_replies_tail = &in->pending_replies;
750
751     return 1;
752 }
753
754 void _xcb_in_destroy(_xcb_in *in)
755 {
756     pthread_cond_destroy(&in->event_cond);
757     free_reply_list(in->current_reply);
758     _xcb_map_delete(in->replies, (void (*)(void *)) free_reply_list);
759     while(in->events)
760     {
761         struct event_list *e = in->events;
762         in->events = e->next;
763         free(e->event);
764         free(e);
765     }
766     while(in->pending_replies)
767     {
768         pending_reply *pend = in->pending_replies;
769         in->pending_replies = pend->next;
770         free(pend);
771     }
772 }
773
774 void _xcb_in_wake_up_next_reader(xcb_connection_t *c)
775 {
776     int pthreadret;
777     if(c->in.readers)
778         pthreadret = pthread_cond_signal(c->in.readers->data);
779     else
780         pthreadret = pthread_cond_signal(&c->in.event_cond);
781     assert(pthreadret == 0);
782 }
783
784 int _xcb_in_expect_reply(xcb_connection_t *c, uint64_t request, enum workarounds workaround, int flags)
785 {
786     pending_reply *pend = malloc(sizeof(pending_reply));
787     assert(workaround != WORKAROUND_NONE || flags != 0);
788     if(!pend)
789     {
790         _xcb_conn_shutdown(c, XCB_CONN_CLOSED_MEM_INSUFFICIENT);
791         return 0;
792     }
793     pend->first_request = pend->last_request = request;
794     pend->workaround = workaround;
795     pend->flags = flags;
796     pend->next = 0;
797     *c->in.pending_replies_tail = pend;
798     c->in.pending_replies_tail = &pend->next;
799     return 1;
800 }
801
802 void _xcb_in_replies_done(xcb_connection_t *c)
803 {
804     struct pending_reply *pend;
805     if (c->in.pending_replies_tail != &c->in.pending_replies)
806     {
807         pend = container_of(c->in.pending_replies_tail, struct pending_reply, next);
808         if(pend->workaround == WORKAROUND_EXTERNAL_SOCKET_OWNER)
809         {
810             pend->last_request = c->out.request;
811             pend->workaround = WORKAROUND_NONE;
812         }
813     }
814 }
815
816 int _xcb_in_read(xcb_connection_t *c)
817 {
818     int n;
819
820 #if HAVE_SENDMSG
821     struct iovec    iov = {
822         .iov_base = c->in.queue + c->in.queue_len,
823         .iov_len = sizeof(c->in.queue) - c->in.queue_len,
824     };
825     struct {
826         struct cmsghdr  cmsghdr;
827         int fd[XCB_MAX_PASS_FD];
828     } fds;
829     struct msghdr msg = {
830         .msg_name = NULL,
831         .msg_namelen = 0,
832         .msg_iov = &iov,
833         .msg_iovlen = 1,
834         .msg_control = &fds,
835         .msg_controllen = sizeof (struct cmsghdr) + sizeof(int) * (XCB_MAX_PASS_FD - c->in.in_fd.nfd),
836     };
837     n = recvmsg(c->fd, &msg, 0);
838
839     /* Check for truncation errors. Only MSG_CTRUNC is
840      * probably possible here, which would indicate that
841      * the sender tried to transmit more than XCB_MAX_PASS_FD
842      * file descriptors.
843      */
844     if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) {
845         _xcb_conn_shutdown(c, XCB_CONN_CLOSED_FDPASSING_FAILED);
846         return 0;
847     }
848 #else
849     n = recv(c->fd, c->in.queue + c->in.queue_len, sizeof(c->in.queue) - c->in.queue_len, 0);
850 #endif
851     if(n > 0) {
852 #if HAVE_SENDMSG
853         if (msg.msg_controllen > sizeof (struct cmsghdr))
854         {
855             if (fds.cmsghdr.cmsg_level == SOL_SOCKET &&
856                 fds.cmsghdr.cmsg_type == SCM_RIGHTS)
857             {
858                 int nfd = (msg.msg_controllen - sizeof (struct cmsghdr)) / sizeof (int);
859                 memmove(&c->in.in_fd.fd[c->in.in_fd.nfd],
860                         fds.fd,
861                         nfd);
862                 c->in.in_fd.nfd += nfd;
863             }
864         }
865 #endif
866         c->in.queue_len += n;
867     }
868     while(read_packet(c))
869         /* empty */;
870 #if HAVE_SENDMSG
871     if (c->in.in_fd.nfd) {
872         c->in.in_fd.nfd -= c->in.in_fd.ifd;
873         memmove(&c->in.in_fd.fd[0],
874                 &c->in.in_fd.fd[c->in.in_fd.ifd],
875                 c->in.in_fd.nfd * sizeof (int));
876         c->in.in_fd.ifd = 0;
877
878         /* If we have any left-over file descriptors after emptying
879          * the input buffer, then the server sent some that we weren't
880          * expecting.  Close them and mark the connection as broken;
881          */
882         if (c->in.queue_len == 0 && c->in.in_fd.nfd != 0) {
883             int i;
884             for (i = 0; i < c->in.in_fd.nfd; i++)
885                 close(c->in.in_fd.fd[i]);
886             _xcb_conn_shutdown(c, XCB_CONN_CLOSED_FDPASSING_FAILED);
887             return 0;
888         }
889     }
890 #endif
891 #ifndef _WIN32
892     if((n > 0) || (n < 0 && errno == EAGAIN))
893 #else
894     if((n > 0) || (n < 0 && WSAGetLastError() == WSAEWOULDBLOCK))
895 #endif /* !_WIN32 */
896         return 1;
897     _xcb_conn_shutdown(c, XCB_CONN_ERROR);
898     return 0;
899 }
900
901 int _xcb_in_read_block(xcb_connection_t *c, void *buf, int len)
902 {
903     int done = c->in.queue_len;
904     if(len < done)
905         done = len;
906
907     memcpy(buf, c->in.queue, done);
908     c->in.queue_len -= done;
909     memmove(c->in.queue, c->in.queue + done, c->in.queue_len);
910
911     if(len > done)
912     {
913         int ret = read_block(c->fd, (char *) buf + done, len - done);
914         if(ret <= 0)
915         {
916             _xcb_conn_shutdown(c, XCB_CONN_ERROR);
917             return ret;
918         }
919     }
920
921     return len;
922 }